This is an automated email from the ASF dual-hosted git repository.

zhfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new bb35cc4ec55 CAMEL-15016 support kafka transactions (#7454)
bb35cc4ec55 is described below

commit bb35cc4ec552688a5a7d966da6ef3c41aca4889f
Author: Amos Feng <[email protected]>
AuthorDate: Wed Apr 27 23:05:11 2022 +0800

    CAMEL-15016 support kafka transactions (#7454)
    
    * CAMEL-15016: camel-kafka - support kafka transaction
    
    * CAMEL-15016: camel-kafka - only abort a transaction if the exception is 
not KafkaException
---
 .../camel/component/kafka/KafkaProducer.java       |  62 ++++++
 .../kafka/integration/KafkaTransactionIT.java      | 232 +++++++++++++++++++++
 2 files changed, 294 insertions(+)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 4e6f76a6463..88f1f0a7e81 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -39,15 +39,18 @@ import 
org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
 import org.apache.camel.health.HealthCheckHelper;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.util.KeyValueHolder;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ReflectionHelper;
 import org.apache.camel.util.URISupport;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.slf4j.Logger;
@@ -64,6 +67,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
     private KafkaProducerHealthCheck producerHealthCheck;
     private KafkaHealthCheckRepository healthCheckRepository;
     private String clientId;
+    private String transactionId;
     private final KafkaEndpoint endpoint;
     private final KafkaConfiguration configuration;
     private ExecutorService workerPool;
@@ -155,6 +159,12 @@ public class KafkaProducer extends DefaultAsyncProducer {
             createProducer(props);
         }
 
+        // init kafka transaction
+        transactionId = 
props.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+        if (transactionId != null) {
+            kafkaProducer.initTransactions();
+        }
+
         // if we are in asynchronous mode we need a worker pool
         if (!configuration.isSynchronous() && workerPool == null) {
             workerPool = endpoint.createProducerExecutor();
@@ -365,6 +375,10 @@ public class KafkaProducer extends DefaultAsyncProducer {
         // is the message body a list or something that contains multiple 
values
         Message message = exchange.getIn();
 
+        if (transactionId != null) {
+            startKafkaTransaction(exchange);
+        }
+
         if (isIterable(message.getBody())) {
             processIterableSync(exchange, message);
         } else {
@@ -438,6 +452,10 @@ public class KafkaProducer extends DefaultAsyncProducer {
         Message message = exchange.getMessage();
         Object body = message.getBody();
 
+        if (transactionId != null) {
+            startKafkaTransaction(exchange);
+        }
+
         try {
             // is the message body a list or something that contains multiple 
values
             if (isIterable(body)) {
@@ -492,4 +510,48 @@ public class KafkaProducer extends DefaultAsyncProducer {
             kafkaProducer.send(record, cb);
         }
     }
+
+    private void startKafkaTransaction(Exchange exchange) {
+        exchange.getUnitOfWork().beginTransactedBy(transactionId);
+        kafkaProducer.beginTransaction();
+        exchange.getUnitOfWork().addSynchronization(new 
KafkaTransactionSynchronization(transactionId, kafkaProducer));
+    }
+}
+
+class KafkaTransactionSynchronization extends SynchronizationAdapter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaTransactionSynchronization.class);
+    private String transactionId;
+    private Producer kafkaProducer;
+
+    public KafkaTransactionSynchronization(String transactionId, Producer 
kafkaProducer) {
+        this.transactionId = transactionId;
+        this.kafkaProducer = kafkaProducer;
+    }
+
+    @Override
+    public void onDone(Exchange exchange) {
+        try {
+            if (exchange.getException() != null || exchange.isRollbackOnly()) {
+                if (exchange.getException() instanceof KafkaException) {
+                    LOG.warn("Catch {} and will close kafka producer with 
transaction {} ", exchange.getException(),
+                            transactionId);
+                    kafkaProducer.close();
+                } else {
+                    LOG.warn("Abort kafka transaction {} with exchange {}", 
transactionId, exchange.getExchangeId());
+                    kafkaProducer.abortTransaction();
+                }
+            } else {
+                LOG.debug("Commit kafka transaction {} with exchange {}", 
transactionId, exchange.getExchangeId());
+                kafkaProducer.commitTransaction();
+            }
+        } catch (Throwable t) {
+            exchange.setException(t);
+            if (!(t instanceof KafkaException)) {
+                LOG.warn("Abort kafka transaction {} with exchange {} due to 
{} ", transactionId, exchange.getExchangeId(), t);
+                kafkaProducer.abortTransaction();
+            }
+        } finally {
+            exchange.getUnitOfWork().endTransactedBy(transactionId);
+        }
+    }
 }
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java
new file mode 100644
index 00000000000..eefabf142c8
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KafkaTransactionIT extends BaseEmbeddedKafkaTestSupport {
+    private static final String TOPIC_TRANSACTION = "transaction";
+    private static final String TOPIC_CONCURRENCY_TRANSACTION = 
"concurrency_transaction";
+    private static KafkaConsumer<String, String> stringsConsumerConn;
+    private static final int THREAD_NUM = 5;
+
+    @EndpointInject("kafka:" + TOPIC_TRANSACTION + "?requestRequiredAcks=-1"
+                    + "&additional-properties[transactional.id]=1234"
+                    + "&additional-properties[enable.idempotence]=true"
+                    + "&additional-properties[retries]=5")
+    private Endpoint toTransaction;
+
+    @EndpointInject("kafka:" + TOPIC_CONCURRENCY_TRANSACTION + 
"?requestRequiredAcks=-1&synchronous=true"
+                    + "&additional-properties[transactional.id]=5678"
+                    + "&additional-properties[enable.idempotence]=true"
+                    + "&additional-properties[retries]=5")
+    private Endpoint toConcurrencyTransaction;
+
+    @EndpointInject("mock:kafkaAck")
+    private MockEndpoint mockEndpoint;
+
+    @Produce("direct:startTransaction")
+    private ProducerTemplate testTransaction;
+
+    @Produce("seda:startTransaction")
+    private ProducerTemplate testConcurrencyTransaction;
+
+    public KafkaTransactionIT() {
+
+    }
+
+    @BeforeAll
+    public static void before() {
+        stringsConsumerConn = createStringKafkaConsumer("DemoTransaction");
+    }
+
+    @AfterAll
+    public static void after() {
+        // clean all test topics
+        final List<String> topics = new ArrayList<>();
+        topics.add(TOPIC_TRANSACTION);
+        topics.add(TOPIC_CONCURRENCY_TRANSACTION);
+        kafkaAdminClient.deleteTopics(topics);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:startTransaction").to(toTransaction)
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws 
Exception {
+                                String body = 
exchange.getIn().getBody(String.class);
+                                if (body.contains("fail")) {
+                                    throw new RuntimeException("fail process 
message " + body);
+                                }
+                            }
+                        }).to(mockEndpoint);
+
+                from("seda:startTransaction").to(toConcurrencyTransaction);
+            }
+        };
+    }
+
+    @Test
+    public void concurrencyProducedTransactionMessage() throws 
InterruptedException {
+        Thread[] threads = new Thread[THREAD_NUM];
+        int messageInTopic = 5;
+
+        CountDownLatch messagesLatch = new CountDownLatch(messageInTopic * 
THREAD_NUM);
+
+        for (int i = 0; i < THREAD_NUM; i++) {
+            threads[i] = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    sendMessagesInRoute(messageInTopic, 
testConcurrencyTransaction, "IT test concurrency transaction message",
+                            KafkaConstants.PARTITION_KEY,
+                            "0");
+                }
+            });
+            threads[i].start();
+        }
+
+        for (int i = 0; i < THREAD_NUM; i++) {
+            threads[i].join();
+        }
+
+        createKafkaMessageConsumer(stringsConsumerConn, 
TOPIC_CONCURRENCY_TRANSACTION, messagesLatch);
+
+        boolean allMessagesReceived = messagesLatch.await(200, 
TimeUnit.MILLISECONDS);
+
+        assertTrue(allMessagesReceived,
+                "Not all messages were published to the kafka topics. Not 
received: " + messagesLatch.getCount());
+    }
+
+    @Test
+    public void producedTransactionMassageIsReceivedByKafka() throws 
InterruptedException {
+        int messageInTopic = 10;
+
+        CountDownLatch messagesLatch = new CountDownLatch(messageInTopic);
+
+        sendMessagesInRoute(messageInTopic, testTransaction, "IT test 
transaction message", KafkaConstants.PARTITION_KEY, "0");
+        assertThrows(RuntimeException.class, new Executable() {
+            @Override
+            public void execute() throws Throwable {
+                sendMessagesInRoute(messageInTopic, testTransaction, "IT test 
transaction fail message",
+                        KafkaConstants.PARTITION_KEY,
+                        "0");
+            }
+        });
+
+        createKafkaMessageConsumer(stringsConsumerConn, TOPIC_TRANSACTION, 
messagesLatch);
+
+        boolean allMessagesReceived = messagesLatch.await(200, 
TimeUnit.MILLISECONDS);
+
+        assertTrue(allMessagesReceived,
+                "Not all messages were published to the kafka topics. Not 
received: " + messagesLatch.getCount());
+
+        List<Exchange> exchangeList = mockEndpoint.getExchanges();
+        assertEquals(10, exchangeList.size(), "Ten Exchanges are expected");
+        for (Exchange exchange : exchangeList) {
+            @SuppressWarnings("unchecked")
+            List<RecordMetadata> recordMetaData1
+                    = (List<RecordMetadata>) 
(exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+            assertEquals(1, recordMetaData1.size(), "One RecordMetadata is 
expected.");
+            assertTrue(recordMetaData1.get(0).offset() >= 0, "Offset is 
positive");
+            
assertTrue(recordMetaData1.get(0).topic().startsWith("transaction"), "Topic 
Name start with 'transaction'");
+        }
+
+    }
+
+    private static KafkaConsumer<String, String> 
createStringKafkaConsumer(final String groupId) {
+        Properties stringsProps = new Properties();
+
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
 getBootstrapServers());
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG,
 groupId);
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
 "true");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
 "1000");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
 "30000");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringDeserializer");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringDeserializer");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
 "earliest");
+
+        return new KafkaConsumer<>(stringsProps);
+    }
+
+    private void createKafkaMessageConsumer(
+            KafkaConsumer<String, String> consumerConn, String topic, 
CountDownLatch messagesLatch) {
+
+        consumerConn.subscribe(Arrays.asList(topic));
+        boolean run = true;
+
+        while (run) {
+            ConsumerRecords<String, String> records = 
consumerConn.poll(Duration.ofMillis(100));
+            for (int i = 0; i < records.count(); i++) {
+                messagesLatch.countDown();
+                if (messagesLatch.getCount() == 0) {
+                    run = false;
+                }
+            }
+        }
+    }
+
+    private void sendMessagesInRoute(int messages, ProducerTemplate template, 
Object bodyOther, String... headersWithValue) {
+        Map<String, Object> headerMap = new HashMap<>();
+        if (headersWithValue != null) {
+            for (int i = 0; i < headersWithValue.length; i = i + 2) {
+                headerMap.put(headersWithValue[i], headersWithValue[i + 1]);
+            }
+        }
+        sendMessagesInRoute(messages, template, bodyOther, headerMap);
+    }
+
+    private void sendMessagesInRoute(int messages, ProducerTemplate template, 
Object bodyOther, Map<String, Object> headerMap) {
+        for (int k = 0; k < messages; k++) {
+            template.sendBodyAndHeaders(bodyOther, headerMap);
+        }
+    }
+}

Reply via email to