This is an automated email from the ASF dual-hosted git repository.
zhfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new bb35cc4ec55 CAMEL-15016 support kafka transactions (#7454)
bb35cc4ec55 is described below
commit bb35cc4ec552688a5a7d966da6ef3c41aca4889f
Author: Amos Feng <[email protected]>
AuthorDate: Wed Apr 27 23:05:11 2022 +0800
CAMEL-15016 support kafka transactions (#7454)
* CAMEL-15016: camel-kafka - support kafka transaction
* CAMEL-15016: camel-kafka - only abort a transaction if the exception is
not KafkaException
---
.../camel/component/kafka/KafkaProducer.java | 62 ++++++
.../kafka/integration/KafkaTransactionIT.java | 232 +++++++++++++++++++++
2 files changed, 294 insertions(+)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 4e6f76a6463..88f1f0a7e81 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -39,15 +39,18 @@ import
org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
import org.apache.camel.health.HealthCheckHelper;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.util.KeyValueHolder;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ReflectionHelper;
import org.apache.camel.util.URISupport;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
@@ -64,6 +67,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
private KafkaProducerHealthCheck producerHealthCheck;
private KafkaHealthCheckRepository healthCheckRepository;
private String clientId;
+ private String transactionId;
private final KafkaEndpoint endpoint;
private final KafkaConfiguration configuration;
private ExecutorService workerPool;
@@ -155,6 +159,12 @@ public class KafkaProducer extends DefaultAsyncProducer {
createProducer(props);
}
+ // init kafka transaction
+ transactionId =
props.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+ if (transactionId != null) {
+ kafkaProducer.initTransactions();
+ }
+
// if we are in asynchronous mode we need a worker pool
if (!configuration.isSynchronous() && workerPool == null) {
workerPool = endpoint.createProducerExecutor();
@@ -365,6 +375,10 @@ public class KafkaProducer extends DefaultAsyncProducer {
// is the message body a list or something that contains multiple
values
Message message = exchange.getIn();
+ if (transactionId != null) {
+ startKafkaTransaction(exchange);
+ }
+
if (isIterable(message.getBody())) {
processIterableSync(exchange, message);
} else {
@@ -438,6 +452,10 @@ public class KafkaProducer extends DefaultAsyncProducer {
Message message = exchange.getMessage();
Object body = message.getBody();
+ if (transactionId != null) {
+ startKafkaTransaction(exchange);
+ }
+
try {
// is the message body a list or something that contains multiple
values
if (isIterable(body)) {
@@ -492,4 +510,48 @@ public class KafkaProducer extends DefaultAsyncProducer {
kafkaProducer.send(record, cb);
}
}
+
+ private void startKafkaTransaction(Exchange exchange) {
+ exchange.getUnitOfWork().beginTransactedBy(transactionId);
+ kafkaProducer.beginTransaction();
+ exchange.getUnitOfWork().addSynchronization(new
KafkaTransactionSynchronization(transactionId, kafkaProducer));
+ }
+}
+
+class KafkaTransactionSynchronization extends SynchronizationAdapter {
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaTransactionSynchronization.class);
+ private String transactionId;
+ private Producer kafkaProducer;
+
+ public KafkaTransactionSynchronization(String transactionId, Producer
kafkaProducer) {
+ this.transactionId = transactionId;
+ this.kafkaProducer = kafkaProducer;
+ }
+
+ @Override
+ public void onDone(Exchange exchange) {
+ try {
+ if (exchange.getException() != null || exchange.isRollbackOnly()) {
+ if (exchange.getException() instanceof KafkaException) {
+ LOG.warn("Catch {} and will close kafka producer with
transaction {} ", exchange.getException(),
+ transactionId);
+ kafkaProducer.close();
+ } else {
+ LOG.warn("Abort kafka transaction {} with exchange {}",
transactionId, exchange.getExchangeId());
+ kafkaProducer.abortTransaction();
+ }
+ } else {
+ LOG.debug("Commit kafka transaction {} with exchange {}",
transactionId, exchange.getExchangeId());
+ kafkaProducer.commitTransaction();
+ }
+ } catch (Throwable t) {
+ exchange.setException(t);
+ if (!(t instanceof KafkaException)) {
+ LOG.warn("Abort kafka transaction {} with exchange {} due to
{} ", transactionId, exchange.getExchangeId(), t);
+ kafkaProducer.abortTransaction();
+ }
+ } finally {
+ exchange.getUnitOfWork().endTransactedBy(transactionId);
+ }
+ }
}
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java
new file mode 100644
index 00000000000..eefabf142c8
--- /dev/null
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KafkaTransactionIT extends BaseEmbeddedKafkaTestSupport {
+ private static final String TOPIC_TRANSACTION = "transaction";
+ private static final String TOPIC_CONCURRENCY_TRANSACTION =
"concurrency_transaction";
+ private static KafkaConsumer<String, String> stringsConsumerConn;
+ private static final int THREAD_NUM = 5;
+
+ @EndpointInject("kafka:" + TOPIC_TRANSACTION + "?requestRequiredAcks=-1"
+ + "&additional-properties[transactional.id]=1234"
+ + "&additional-properties[enable.idempotence]=true"
+ + "&additional-properties[retries]=5")
+ private Endpoint toTransaction;
+
+ @EndpointInject("kafka:" + TOPIC_CONCURRENCY_TRANSACTION +
"?requestRequiredAcks=-1&synchronous=true"
+ + "&additional-properties[transactional.id]=5678"
+ + "&additional-properties[enable.idempotence]=true"
+ + "&additional-properties[retries]=5")
+ private Endpoint toConcurrencyTransaction;
+
+ @EndpointInject("mock:kafkaAck")
+ private MockEndpoint mockEndpoint;
+
+ @Produce("direct:startTransaction")
+ private ProducerTemplate testTransaction;
+
+ @Produce("seda:startTransaction")
+ private ProducerTemplate testConcurrencyTransaction;
+
+ public KafkaTransactionIT() {
+
+ }
+
+ @BeforeAll
+ public static void before() {
+ stringsConsumerConn = createStringKafkaConsumer("DemoTransaction");
+ }
+
+ @AfterAll
+ public static void after() {
+ // clean all test topics
+ final List<String> topics = new ArrayList<>();
+ topics.add(TOPIC_TRANSACTION);
+ topics.add(TOPIC_CONCURRENCY_TRANSACTION);
+ kafkaAdminClient.deleteTopics(topics);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:startTransaction").to(toTransaction)
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws
Exception {
+ String body =
exchange.getIn().getBody(String.class);
+ if (body.contains("fail")) {
+ throw new RuntimeException("fail process
message " + body);
+ }
+ }
+ }).to(mockEndpoint);
+
+ from("seda:startTransaction").to(toConcurrencyTransaction);
+ }
+ };
+ }
+
+ @Test
+ public void concurrencyProducedTransactionMessage() throws
InterruptedException {
+ Thread[] threads = new Thread[THREAD_NUM];
+ int messageInTopic = 5;
+
+ CountDownLatch messagesLatch = new CountDownLatch(messageInTopic *
THREAD_NUM);
+
+ for (int i = 0; i < THREAD_NUM; i++) {
+ threads[i] = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ sendMessagesInRoute(messageInTopic,
testConcurrencyTransaction, "IT test concurrency transaction message",
+ KafkaConstants.PARTITION_KEY,
+ "0");
+ }
+ });
+ threads[i].start();
+ }
+
+ for (int i = 0; i < THREAD_NUM; i++) {
+ threads[i].join();
+ }
+
+ createKafkaMessageConsumer(stringsConsumerConn,
TOPIC_CONCURRENCY_TRANSACTION, messagesLatch);
+
+ boolean allMessagesReceived = messagesLatch.await(200,
TimeUnit.MILLISECONDS);
+
+ assertTrue(allMessagesReceived,
+ "Not all messages were published to the kafka topics. Not
received: " + messagesLatch.getCount());
+ }
+
+ @Test
+ public void producedTransactionMassageIsReceivedByKafka() throws
InterruptedException {
+ int messageInTopic = 10;
+
+ CountDownLatch messagesLatch = new CountDownLatch(messageInTopic);
+
+ sendMessagesInRoute(messageInTopic, testTransaction, "IT test
transaction message", KafkaConstants.PARTITION_KEY, "0");
+ assertThrows(RuntimeException.class, new Executable() {
+ @Override
+ public void execute() throws Throwable {
+ sendMessagesInRoute(messageInTopic, testTransaction, "IT test
transaction fail message",
+ KafkaConstants.PARTITION_KEY,
+ "0");
+ }
+ });
+
+ createKafkaMessageConsumer(stringsConsumerConn, TOPIC_TRANSACTION,
messagesLatch);
+
+ boolean allMessagesReceived = messagesLatch.await(200,
TimeUnit.MILLISECONDS);
+
+ assertTrue(allMessagesReceived,
+ "Not all messages were published to the kafka topics. Not
received: " + messagesLatch.getCount());
+
+ List<Exchange> exchangeList = mockEndpoint.getExchanges();
+ assertEquals(10, exchangeList.size(), "Ten Exchanges are expected");
+ for (Exchange exchange : exchangeList) {
+ @SuppressWarnings("unchecked")
+ List<RecordMetadata> recordMetaData1
+ = (List<RecordMetadata>)
(exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+ assertEquals(1, recordMetaData1.size(), "One RecordMetadata is
expected.");
+ assertTrue(recordMetaData1.get(0).offset() >= 0, "Offset is
positive");
+
assertTrue(recordMetaData1.get(0).topic().startsWith("transaction"), "Topic
Name start with 'transaction'");
+ }
+
+ }
+
+ private static KafkaConsumer<String, String>
createStringKafkaConsumer(final String groupId) {
+ Properties stringsProps = new Properties();
+
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
getBootstrapServers());
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG,
groupId);
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"true");
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
"1000");
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
"30000");
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+
+ return new KafkaConsumer<>(stringsProps);
+ }
+
+ private void createKafkaMessageConsumer(
+ KafkaConsumer<String, String> consumerConn, String topic,
CountDownLatch messagesLatch) {
+
+ consumerConn.subscribe(Arrays.asList(topic));
+ boolean run = true;
+
+ while (run) {
+ ConsumerRecords<String, String> records =
consumerConn.poll(Duration.ofMillis(100));
+ for (int i = 0; i < records.count(); i++) {
+ messagesLatch.countDown();
+ if (messagesLatch.getCount() == 0) {
+ run = false;
+ }
+ }
+ }
+ }
+
+ private void sendMessagesInRoute(int messages, ProducerTemplate template,
Object bodyOther, String... headersWithValue) {
+ Map<String, Object> headerMap = new HashMap<>();
+ if (headersWithValue != null) {
+ for (int i = 0; i < headersWithValue.length; i = i + 2) {
+ headerMap.put(headersWithValue[i], headersWithValue[i + 1]);
+ }
+ }
+ sendMessagesInRoute(messages, template, bodyOther, headerMap);
+ }
+
+ private void sendMessagesInRoute(int messages, ProducerTemplate template,
Object bodyOther, Map<String, Object> headerMap) {
+ for (int k = 0; k < messages; k++) {
+ template.sendBodyAndHeaders(bodyOther, headerMap);
+ }
+ }
+}