This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 92b3253a5b [Hotfix][Connector-V2][kafka] fix kafka sink config 
exactly-once  exception (#7857)
92b3253a5b is described below

commit 92b3253a5b24b71c941974c06c433d9f3bace07a
Author: fcb-xiaobo <[email protected]>
AuthorDate: Mon Dec 9 10:37:56 2024 +0800

    [Hotfix][Connector-V2][kafka] fix kafka sink config exactly-once  exception 
(#7857)
---
 .../kafka/sink/KafkaInternalProducer.java          |  26 ++++-
 .../seatunnel/kafka/sink/KafkaSinkCommitter.java   |   5 +-
 .../kafka/sink/KafkaTransactionSender.java         |  15 ++-
 .../seatunnel/kafka/state/KafkaCommitInfo.java     |   1 +
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     | 114 +++++++++++++++++++++
 .../kafka/kafka_to_kafka_exactly_once_batch.conf   |  42 ++++++++
 .../kafka_to_kafka_exactly_once_streaming.conf     |  44 ++++++++
 7 files changed, 241 insertions(+), 6 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
index 515610e9dd..33d2caeb93 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
@@ -55,11 +55,17 @@ public class KafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
 
     @Override
     public void beginTransaction() throws ProducerFencedException {
+        if (log.isDebugEnabled()) {
+            log.debug("KafkaInternalProducer.beginTransaction. " + 
this.transactionalId);
+        }
         super.beginTransaction();
     }
 
     @Override
     public void commitTransaction() throws ProducerFencedException {
+        if (log.isDebugEnabled()) {
+            log.debug("KafkaInternalProducer.commitTransaction." + 
this.transactionalId);
+        }
         super.commitTransaction();
     }
 
@@ -69,7 +75,18 @@ public class KafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
     }
 
     public void setTransactionalId(String transactionalId) {
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "KafkaInternalProducer.abortTransaction. Target 
transactionalId="
+                            + transactionalId);
+        }
         if (!transactionalId.equals(this.transactionalId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(
+                        "KafkaInternalProducer.abortTransaction. Current 
transactionalId={} not match target transactionalId={}",
+                        this.transactionalId,
+                        transactionalId);
+            }
             Object transactionManager = getTransactionManager();
             synchronized (transactionManager) {
                 ReflectionUtils.setField(transactionManager, 
"transactionalId", transactionalId);
@@ -97,7 +114,7 @@ public class KafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
         return (long) ReflectionUtils.getField(producerIdAndEpoch, 
"producerId").get();
     }
 
-    public void resumeTransaction(long producerId, short epoch) {
+    public void resumeTransaction(long producerId, short epoch, boolean 
txnStarted) {
 
         log.info(
                 "Attempting to resume transaction {} with producerId {} and 
epoch {}",
@@ -125,10 +142,15 @@ public class KafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
             transitionTransactionManagerStateTo(transactionManager, "READY");
 
             transitionTransactionManagerStateTo(transactionManager, 
"IN_TRANSACTION");
-            ReflectionUtils.setField(transactionManager, "transactionStarted", 
true);
+            ReflectionUtils.setField(transactionManager, "transactionStarted", 
txnStarted);
         }
     }
 
+    public boolean isTxnStarted() {
+        Object transactionManager = getTransactionManager();
+        return (boolean) ReflectionUtils.getField(transactionManager, 
"transactionStarted").get();
+    }
+
     private static Object createProducerIdAndEpoch(long producerId, short 
epoch) {
         try {
             Field field =
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
index ed4e280809..4be9fba709 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
@@ -48,7 +48,7 @@ public class KafkaSinkCommitter implements 
SinkCommitter<KafkaCommitInfo> {
         for (KafkaCommitInfo commitInfo : commitInfos) {
             String transactionId = commitInfo.getTransactionId();
             if (log.isDebugEnabled()) {
-                log.debug("Committing transaction {}", transactionId);
+                log.debug("Committing transaction {}, commitInfo {}", 
transactionId, commitInfo);
             }
             KafkaProducer<?, ?> producer = getProducer(commitInfo);
             producer.commitTransaction();
@@ -87,7 +87,8 @@ public class KafkaSinkCommitter implements 
SinkCommitter<KafkaCommitInfo> {
                     new KafkaInternalProducer<>(
                             commitInfo.getKafkaProperties(), 
commitInfo.getTransactionId());
         }
-        kafkaProducer.resumeTransaction(commitInfo.getProducerId(), 
commitInfo.getEpoch());
+        kafkaProducer.resumeTransaction(
+                commitInfo.getProducerId(), commitInfo.getEpoch(), 
commitInfo.isTxnStarted());
         return kafkaProducer;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
index 213bb9db57..1f92bcd5b0 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 
 import lombok.extern.slf4j.Slf4j;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
@@ -46,6 +47,7 @@ public class KafkaTransactionSender<K, V> implements 
KafkaProduceSender<K, V> {
     private String transactionId;
     private final String transactionPrefix;
     private final Properties kafkaProperties;
+    private int recordNumInTransaction = 0;
 
     public KafkaTransactionSender(String transactionPrefix, Properties 
kafkaProperties) {
         this.transactionPrefix = transactionPrefix;
@@ -55,6 +57,7 @@ public class KafkaTransactionSender<K, V> implements 
KafkaProduceSender<K, V> {
     @Override
     public void send(ProducerRecord<K, V> producerRecord) {
         kafkaProducer.send(producerRecord);
+        recordNumInTransaction++;
     }
 
     @Override
@@ -62,6 +65,7 @@ public class KafkaTransactionSender<K, V> implements 
KafkaProduceSender<K, V> {
         this.transactionId = transactionId;
         this.kafkaProducer = getTransactionProducer(kafkaProperties, 
transactionId);
         kafkaProducer.beginTransaction();
+        recordNumInTransaction = 0;
     }
 
     @Override
@@ -71,7 +75,8 @@ public class KafkaTransactionSender<K, V> implements 
KafkaProduceSender<K, V> {
                         transactionId,
                         kafkaProperties,
                         this.kafkaProducer.getProducerId(),
-                        this.kafkaProducer.getEpoch());
+                        this.kafkaProducer.getEpoch(),
+                        this.kafkaProducer.isTxnStarted());
         return Optional.of(kafkaCommitInfo);
     }
 
@@ -108,6 +113,10 @@ public class KafkaTransactionSender<K, V> implements 
KafkaProduceSender<K, V> {
 
     @Override
     public List<KafkaSinkState> snapshotState(long checkpointId) {
+        if (recordNumInTransaction == 0) {
+            // KafkaSinkCommitter does not support emptyTransaction, so we 
commit here.
+            kafkaProducer.commitTransaction();
+        }
         return Lists.newArrayList(
                 new KafkaSinkState(
                         transactionId, transactionPrefix, checkpointId, 
kafkaProperties));
@@ -117,7 +126,9 @@ public class KafkaTransactionSender<K, V> implements 
KafkaProduceSender<K, V> {
     public void close() {
         if (kafkaProducer != null) {
             kafkaProducer.flush();
-            kafkaProducer.close();
+            // kafkaProducer will abort the transaction if you call close() 
without a duration arg
+            // which will cause an exception when Committer commit the 
transaction later.
+            kafkaProducer.close(Duration.ZERO);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java
index 99cc3aaf3c..82ef8af4d3 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java
@@ -31,4 +31,5 @@ public class KafkaCommitInfo implements Serializable {
     private final Properties kafkaProperties;
     private final long producerId;
     private final short epoch;
+    private final boolean txnStarted;
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 986e5f9f2e..f9483fd65f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -60,6 +60,7 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -97,11 +98,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import static org.awaitility.Awaitility.await;
+
 @Slf4j
 public class KafkaIT extends TestSuiteBase implements TestResource {
     private static final String KAFKA_IMAGE_NAME = 
"confluentinc/cp-kafka:7.0.9";
@@ -752,6 +756,94 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                 });
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            type = EngineType.SPARK,
+            value = {})
+    public void testKafkaToKafkaExactlyOnceOnStreaming(TestContainer container)
+            throws InterruptedException {
+        String producerTopic = "kafka_topic_exactly_once_1";
+        String consumerTopic = "kafka_topic_exactly_once_2";
+        String sourceData = "Seatunnel Exactly Once Example";
+        for (int i = 0; i < 10; i++) {
+            ProducerRecord<byte[], byte[]> record =
+                    new ProducerRecord<>(producerTopic, null, 
sourceData.getBytes());
+            producer.send(record);
+            producer.flush();
+        }
+        Long endOffset = 0l;
+        try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(kafkaConsumerConfig())) {
+            consumer.subscribe(Arrays.asList(producerTopic));
+            Map<TopicPartition, Long> offsets =
+                    consumer.endOffsets(Arrays.asList(new 
TopicPartition(producerTopic, 0)));
+            endOffset = offsets.entrySet().iterator().next().getValue();
+        }
+        // async execute
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        
container.executeJob("/kafka/kafka_to_kafka_exactly_once_streaming.conf");
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                    return null;
+                });
+        TimeUnit.MINUTES.sleep(5);
+        // wait for data written to kafka
+        Long finalEndOffset = endOffset;
+        await().atMost(5, TimeUnit.MINUTES)
+                .pollInterval(5000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertTrue(
+                                        checkData(consumerTopic, 
finalEndOffset, sourceData)));
+    }
+
+    @TestTemplate
+    public void testKafkaToKafkaExactlyOnceOnBatch(TestContainer container)
+            throws InterruptedException, IOException {
+        String producerTopic = "kafka_topic_exactly_once_1";
+        String consumerTopic = "kafka_topic_exactly_once_2";
+        String sourceData = "Seatunnel Exactly Once Example";
+        for (int i = 0; i < 10; i++) {
+            ProducerRecord<byte[], byte[]> record =
+                    new ProducerRecord<>(producerTopic, null, 
sourceData.getBytes());
+            producer.send(record);
+            producer.flush();
+        }
+        Long endOffset;
+        try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(kafkaConsumerConfig())) {
+            consumer.subscribe(Arrays.asList(producerTopic));
+            Map<TopicPartition, Long> offsets =
+                    consumer.endOffsets(Arrays.asList(new 
TopicPartition(producerTopic, 0)));
+            endOffset = offsets.entrySet().iterator().next().getValue();
+        }
+        Container.ExecResult execResult =
+                
container.executeJob("/kafka/kafka_to_kafka_exactly_once_batch.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        // wait for data written to kafka
+        Assertions.assertTrue(checkData(consumerTopic, endOffset, sourceData));
+    }
+
+    // Compare the values of data fields obtained from consumers
+    private boolean checkData(String topicName, long endOffset, String data) {
+        List<String> listData = getKafkaConsumerListData(topicName, endOffset);
+        if (listData.isEmpty() || listData.size() != endOffset) {
+            log.error(
+                    "testKafkaToKafkaExactlyOnce get data size is not 
expect,get consumer data size {}",
+                    listData.size());
+            return false;
+        }
+        for (String value : listData) {
+            if (!data.equals(value)) {
+                log.error("testKafkaToKafkaExactlyOnce get data value is not 
expect");
+                return false;
+            }
+        }
+        return true;
+    }
+
     private @NotNull DefaultSeaTunnelRowSerializer 
getDefaultSeaTunnelRowSerializer(
             String topic, SeaTunnelRowType seaTunnelRowType, ReadonlyConfig 
readonlyConfig) {
         // Create serializer
@@ -934,6 +1026,10 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         props.put(
                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                 OffsetResetStrategy.EARLIEST.toString().toLowerCase());
+        // exactly once semantics must set config read_commit
+        props.put(
+                ConsumerConfig.ISOLATION_LEVEL_CONFIG,
+                IsolationLevel.READ_COMMITTED.name().toLowerCase());
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
         return props;
@@ -1067,6 +1163,24 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         return data;
     }
 
+    private List<String> getKafkaConsumerListData(String topicName, long 
endOffset) {
+        List<String> data = new ArrayList<>();
+        try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(kafkaConsumerConfig())) {
+            consumer.subscribe(Arrays.asList(topicName));
+            Long lastProcessedOffset = -1L;
+            do {
+                ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(100));
+                for (ConsumerRecord<String, String> record : records) {
+                    if (lastProcessedOffset < record.offset()) {
+                        data.add(record.value());
+                    }
+                    lastProcessedOffset = record.offset();
+                }
+            } while (lastProcessedOffset < endOffset - 1);
+        }
+        return data;
+    }
+
     private List<SeaTunnelRow> getKafkaSTRow(String topicName, 
ConsumerRecordConverter converter) {
         List<SeaTunnelRow> data = new ArrayList<>();
         try (KafkaConsumer<byte[], byte[]> consumer =
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_to_kafka_exactly_once_batch.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_to_kafka_exactly_once_batch.conf
new file mode 100644
index 0000000000..9965f65d92
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_to_kafka_exactly_once_batch.conf
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+env {
+    parallelism = 1
+    job.mode = "BATCH"
+  }
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "kafka_topic_exactly_once_1"
+    # The default format is json, which is optional
+    format = text
+    start_mode = earliest
+  }
+
+}
+transform {}
+
+
+sink{
+  kafka {
+        format = text
+        topic = "kafka_topic_exactly_once_2"
+        bootstrap.servers = "kafkaCluster:9092"
+        semantics = EXACTLY_ONCE
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_to_kafka_exactly_once_streaming.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_to_kafka_exactly_once_streaming.conf
new file mode 100644
index 0000000000..6d03997268
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_to_kafka_exactly_once_streaming.conf
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+env {
+    parallelism = 1
+    job.mode = "STREAMING"
+    checkpoint.interval = 5000
+    checkpoint.timeout = 60000
+  }
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "kafka_topic_exactly_once_1"
+    # The default format is json, which is optional
+    format = text
+    start_mode = earliest
+  }
+
+}
+transform {}
+
+
+sink{
+  kafka {
+        format = text
+        topic = "kafka_topic_exactly_once_2"
+        bootstrap.servers = "kafkaCluster:9092"
+        semantics = EXACTLY_ONCE
+    }
+}
\ No newline at end of file

Reply via email to