This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 92b3253a5b [Hotfix][Connector-V2][kafka] fix kafka sink config
exactly-once exception (#7857)
92b3253a5b is described below
commit 92b3253a5b24b71c941974c06c433d9f3bace07a
Author: fcb-xiaobo <[email protected]>
AuthorDate: Mon Dec 9 10:37:56 2024 +0800
[Hotfix][Connector-V2][kafka] fix kafka sink config exactly-once exception
(#7857)
---
.../kafka/sink/KafkaInternalProducer.java | 26 ++++-
.../seatunnel/kafka/sink/KafkaSinkCommitter.java | 5 +-
.../kafka/sink/KafkaTransactionSender.java | 15 ++-
.../seatunnel/kafka/state/KafkaCommitInfo.java | 1 +
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 114 +++++++++++++++++++++
.../kafka/kafka_to_kafka_exactly_once_batch.conf | 42 ++++++++
.../kafka_to_kafka_exactly_once_streaming.conf | 44 ++++++++
7 files changed, 241 insertions(+), 6 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
index 515610e9dd..33d2caeb93 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
@@ -55,11 +55,17 @@ public class KafkaInternalProducer<K, V> extends
KafkaProducer<K, V> {
@Override
public void beginTransaction() throws ProducerFencedException {
+ if (log.isDebugEnabled()) {
+ log.debug("KafkaInternalProducer.beginTransaction. " +
this.transactionalId);
+ }
super.beginTransaction();
}
@Override
public void commitTransaction() throws ProducerFencedException {
+ if (log.isDebugEnabled()) {
+ log.debug("KafkaInternalProducer.commitTransaction." +
this.transactionalId);
+ }
super.commitTransaction();
}
@@ -69,7 +75,18 @@ public class KafkaInternalProducer<K, V> extends
KafkaProducer<K, V> {
}
public void setTransactionalId(String transactionalId) {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "KafkaInternalProducer.abortTransaction. Target
transactionalId="
+ + transactionalId);
+ }
if (!transactionalId.equals(this.transactionalId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "KafkaInternalProducer.abortTransaction. Current
transactionalId={} not match target transactionalId={}",
+ this.transactionalId,
+ transactionalId);
+ }
Object transactionManager = getTransactionManager();
synchronized (transactionManager) {
ReflectionUtils.setField(transactionManager,
"transactionalId", transactionalId);
@@ -97,7 +114,7 @@ public class KafkaInternalProducer<K, V> extends
KafkaProducer<K, V> {
return (long) ReflectionUtils.getField(producerIdAndEpoch,
"producerId").get();
}
- public void resumeTransaction(long producerId, short epoch) {
+ public void resumeTransaction(long producerId, short epoch, boolean
txnStarted) {
log.info(
"Attempting to resume transaction {} with producerId {} and
epoch {}",
@@ -125,10 +142,15 @@ public class KafkaInternalProducer<K, V> extends
KafkaProducer<K, V> {
transitionTransactionManagerStateTo(transactionManager, "READY");
transitionTransactionManagerStateTo(transactionManager,
"IN_TRANSACTION");
- ReflectionUtils.setField(transactionManager, "transactionStarted",
true);
+ ReflectionUtils.setField(transactionManager, "transactionStarted",
txnStarted);
}
}
+ public boolean isTxnStarted() {
+ Object transactionManager = getTransactionManager();
+ return (boolean) ReflectionUtils.getField(transactionManager,
"transactionStarted").get();
+ }
+
private static Object createProducerIdAndEpoch(long producerId, short
epoch) {
try {
Field field =
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
index ed4e280809..4be9fba709 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
@@ -48,7 +48,7 @@ public class KafkaSinkCommitter implements
SinkCommitter<KafkaCommitInfo> {
for (KafkaCommitInfo commitInfo : commitInfos) {
String transactionId = commitInfo.getTransactionId();
if (log.isDebugEnabled()) {
- log.debug("Committing transaction {}", transactionId);
+ log.debug("Committing transaction {}, commitInfo {}",
transactionId, commitInfo);
}
KafkaProducer<?, ?> producer = getProducer(commitInfo);
producer.commitTransaction();
@@ -87,7 +87,8 @@ public class KafkaSinkCommitter implements
SinkCommitter<KafkaCommitInfo> {
new KafkaInternalProducer<>(
commitInfo.getKafkaProperties(),
commitInfo.getTransactionId());
}
- kafkaProducer.resumeTransaction(commitInfo.getProducerId(),
commitInfo.getEpoch());
+ kafkaProducer.resumeTransaction(
+ commitInfo.getProducerId(), commitInfo.getEpoch(),
commitInfo.isTxnStarted());
return kafkaProducer;
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
index 213bb9db57..1f92bcd5b0 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import lombok.extern.slf4j.Slf4j;
+import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
@@ -46,6 +47,7 @@ public class KafkaTransactionSender<K, V> implements
KafkaProduceSender<K, V> {
private String transactionId;
private final String transactionPrefix;
private final Properties kafkaProperties;
+ private int recordNumInTransaction = 0;
public KafkaTransactionSender(String transactionPrefix, Properties
kafkaProperties) {
this.transactionPrefix = transactionPrefix;
@@ -55,6 +57,7 @@ public class KafkaTransactionSender<K, V> implements
KafkaProduceSender<K, V> {
@Override
public void send(ProducerRecord<K, V> producerRecord) {
kafkaProducer.send(producerRecord);
+ recordNumInTransaction++;
}
@Override
@@ -62,6 +65,7 @@ public class KafkaTransactionSender<K, V> implements
KafkaProduceSender<K, V> {
this.transactionId = transactionId;
this.kafkaProducer = getTransactionProducer(kafkaProperties,
transactionId);
kafkaProducer.beginTransaction();
+ recordNumInTransaction = 0;
}
@Override
@@ -71,7 +75,8 @@ public class KafkaTransactionSender<K, V> implements
KafkaProduceSender<K, V> {
transactionId,
kafkaProperties,
this.kafkaProducer.getProducerId(),
- this.kafkaProducer.getEpoch());
+ this.kafkaProducer.getEpoch(),
+ this.kafkaProducer.isTxnStarted());
return Optional.of(kafkaCommitInfo);
}
@@ -108,6 +113,10 @@ public class KafkaTransactionSender<K, V> implements
KafkaProduceSender<K, V> {
@Override
public List<KafkaSinkState> snapshotState(long checkpointId) {
+ if (recordNumInTransaction == 0) {
+ // KafkaSinkCommitter does not support emptyTransaction, so we
commit here.
+ kafkaProducer.commitTransaction();
+ }
return Lists.newArrayList(
new KafkaSinkState(
transactionId, transactionPrefix, checkpointId,
kafkaProperties));
@@ -117,7 +126,9 @@ public class KafkaTransactionSender<K, V> implements
KafkaProduceSender<K, V> {
public void close() {
if (kafkaProducer != null) {
kafkaProducer.flush();
- kafkaProducer.close();
+ // kafkaProducer will abort the transaction if you call close()
without a duration arg
+ // which will cause an exception when Committer commit the
transaction later.
+ kafkaProducer.close(Duration.ZERO);
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java
index 99cc3aaf3c..82ef8af4d3 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java
@@ -31,4 +31,5 @@ public class KafkaCommitInfo implements Serializable {
private final Properties kafkaProperties;
private final long producerId;
private final short epoch;
+ private final boolean txnStarted;
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 986e5f9f2e..f9483fd65f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -60,6 +60,7 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -97,11 +98,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import static org.awaitility.Awaitility.await;
+
@Slf4j
public class KafkaIT extends TestSuiteBase implements TestResource {
private static final String KAFKA_IMAGE_NAME =
"confluentinc/cp-kafka:7.0.9";
@@ -752,6 +756,94 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
});
}
+ @TestTemplate
+ @DisabledOnContainer(
+ type = EngineType.SPARK,
+ value = {})
+ public void testKafkaToKafkaExactlyOnceOnStreaming(TestContainer container)
+ throws InterruptedException {
+ String producerTopic = "kafka_topic_exactly_once_1";
+ String consumerTopic = "kafka_topic_exactly_once_2";
+ String sourceData = "Seatunnel Exactly Once Example";
+ for (int i = 0; i < 10; i++) {
+ ProducerRecord<byte[], byte[]> record =
+ new ProducerRecord<>(producerTopic, null,
sourceData.getBytes());
+ producer.send(record);
+ producer.flush();
+ }
+ Long endOffset = 0l;
+ try (KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(kafkaConsumerConfig())) {
+ consumer.subscribe(Arrays.asList(producerTopic));
+ Map<TopicPartition, Long> offsets =
+ consumer.endOffsets(Arrays.asList(new
TopicPartition(producerTopic, 0)));
+ endOffset = offsets.entrySet().iterator().next().getValue();
+ }
+ // async execute
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+
container.executeJob("/kafka/kafka_to_kafka_exactly_once_streaming.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+ TimeUnit.MINUTES.sleep(5);
+ // wait for data written to kafka
+ Long finalEndOffset = endOffset;
+ await().atMost(5, TimeUnit.MINUTES)
+ .pollInterval(5000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertTrue(
+ checkData(consumerTopic,
finalEndOffset, sourceData)));
+ }
+
+ @TestTemplate
+ public void testKafkaToKafkaExactlyOnceOnBatch(TestContainer container)
+ throws InterruptedException, IOException {
+ String producerTopic = "kafka_topic_exactly_once_1";
+ String consumerTopic = "kafka_topic_exactly_once_2";
+ String sourceData = "Seatunnel Exactly Once Example";
+ for (int i = 0; i < 10; i++) {
+ ProducerRecord<byte[], byte[]> record =
+ new ProducerRecord<>(producerTopic, null,
sourceData.getBytes());
+ producer.send(record);
+ producer.flush();
+ }
+ Long endOffset;
+ try (KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(kafkaConsumerConfig())) {
+ consumer.subscribe(Arrays.asList(producerTopic));
+ Map<TopicPartition, Long> offsets =
+ consumer.endOffsets(Arrays.asList(new
TopicPartition(producerTopic, 0)));
+ endOffset = offsets.entrySet().iterator().next().getValue();
+ }
+ Container.ExecResult execResult =
+
container.executeJob("/kafka/kafka_to_kafka_exactly_once_batch.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ // wait for data written to kafka
+ Assertions.assertTrue(checkData(consumerTopic, endOffset, sourceData));
+ }
+
+ // Compare the values of data fields obtained from consumers
+ private boolean checkData(String topicName, long endOffset, String data) {
+ List<String> listData = getKafkaConsumerListData(topicName, endOffset);
+ if (listData.isEmpty() || listData.size() != endOffset) {
+ log.error(
+ "testKafkaToKafkaExactlyOnce get data size is not
expect,get consumer data size {}",
+ listData.size());
+ return false;
+ }
+ for (String value : listData) {
+ if (!data.equals(value)) {
+ log.error("testKafkaToKafkaExactlyOnce get data value is not
expect");
+ return false;
+ }
+ }
+ return true;
+ }
+
private @NotNull DefaultSeaTunnelRowSerializer
getDefaultSeaTunnelRowSerializer(
String topic, SeaTunnelRowType seaTunnelRowType, ReadonlyConfig
readonlyConfig) {
// Create serializer
@@ -934,6 +1026,10 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
props.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.EARLIEST.toString().toLowerCase());
+ // exactly once semantics must set config read_commit
+ props.put(
+ ConsumerConfig.ISOLATION_LEVEL_CONFIG,
+ IsolationLevel.READ_COMMITTED.name().toLowerCase());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return props;
@@ -1067,6 +1163,24 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
return data;
}
+ private List<String> getKafkaConsumerListData(String topicName, long
endOffset) {
+ List<String> data = new ArrayList<>();
+ try (KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(kafkaConsumerConfig())) {
+ consumer.subscribe(Arrays.asList(topicName));
+ Long lastProcessedOffset = -1L;
+ do {
+ ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
+ for (ConsumerRecord<String, String> record : records) {
+ if (lastProcessedOffset < record.offset()) {
+ data.add(record.value());
+ }
+ lastProcessedOffset = record.offset();
+ }
+ } while (lastProcessedOffset < endOffset - 1);
+ }
+ return data;
+ }
+
private List<SeaTunnelRow> getKafkaSTRow(String topicName,
ConsumerRecordConverter converter) {
List<SeaTunnelRow> data = new ArrayList<>();
try (KafkaConsumer<byte[], byte[]> consumer =
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_to_kafka_exactly_once_batch.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_to_kafka_exactly_once_batch.conf
new file mode 100644
index 0000000000..9965f65d92
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_to_kafka_exactly_once_batch.conf
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ }
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "kafka_topic_exactly_once_1"
+ # The default format is json, which is optional
+ format = text
+ start_mode = earliest
+ }
+
+}
+transform {}
+
+
+sink{
+ kafka {
+ format = text
+ topic = "kafka_topic_exactly_once_2"
+ bootstrap.servers = "kafkaCluster:9092"
+ semantics = EXACTLY_ONCE
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_to_kafka_exactly_once_streaming.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_to_kafka_exactly_once_streaming.conf
new file mode 100644
index 0000000000..6d03997268
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_to_kafka_exactly_once_streaming.conf
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ checkpoint.timeout = 60000
+ }
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "kafka_topic_exactly_once_1"
+ # The default format is json, which is optional
+ format = text
+ start_mode = earliest
+ }
+
+}
+transform {}
+
+
+sink{
+ kafka {
+ format = text
+ topic = "kafka_topic_exactly_once_2"
+ bootstrap.servers = "kafkaCluster:9092"
+ semantics = EXACTLY_ONCE
+ }
+}
\ No newline at end of file