This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 891652399e [Hotfix][Connector] Fix kafka consumer log next startup
offset (#7312)
891652399e is described below
commit 891652399e8b97fb5cf1c7e6bdee87e7ec48469e
Author: hailin0 <[email protected]>
AuthorDate: Wed Aug 7 12:46:12 2024 +0800
[Hotfix][Connector] Fix kafka consumer log next startup offset (#7312)
---
.../seatunnel/kafka/source/KafkaSourceReader.java | 10 ++-
.../seatunnel/kafka/source/KafkaSourceSplit.java | 4 ++
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 62 +++++++++++++++++
...group_offset_to_console_with_commit_offset.conf | 77 ++++++++++++++++++++++
4 files changed, 151 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 02c2a9007e..6f4753110b 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -218,8 +218,14 @@ public class KafkaSourceReader implements
SourceReader<SeaTunnelRow, KafkaSource
}
});
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
- finishedSplits.forEach(sourceSplits::remove);
- if (sourceSplits.isEmpty()) {
+ for (KafkaSourceSplit split : finishedSplits) {
+ split.setFinish(true);
+ if (split.getStartOffset() == -1) {
+ // log next running read start offset
+ split.setStartOffset(split.getEndOffset());
+ }
+ }
+ if (sourceSplits.stream().allMatch(KafkaSourceSplit::isFinish)) {
context.signalNoMoreElement();
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
index 1c7cb17678..8f5bc5f2d3 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
@@ -22,6 +22,9 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.kafka.common.TopicPartition;
+import lombok.Getter;
+import lombok.Setter;
+
import java.util.Objects;
public class KafkaSourceSplit implements SourceSplit {
@@ -30,6 +33,7 @@ public class KafkaSourceSplit implements SourceSplit {
private TopicPartition topicPartition;
private long startOffset = -1L;
private long endOffset = -1L;
+ @Setter @Getter private transient volatile boolean finish = false;
public KafkaSourceSplit(TablePath tablePath, TopicPartition
topicPartition) {
this.tablePath = tablePath;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 36b25928d9..0d9f5d5ef8 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -35,12 +35,15 @@ import
org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.format.avro.AvroDeserializationSchema;
import org.apache.seatunnel.format.text.TextSerializationSchema;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -80,6 +83,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@@ -315,6 +319,23 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
testKafkaGroupOffsetsToConsole(container);
}
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "flink and spark won't commit offset when batch
job finished")
+ @TestTemplate
+ public void testSourceKafkaStartConfigWithCommitOffset(TestContainer
container)
+ throws Exception {
+ DefaultSeaTunnelRowSerializer serializer =
+ DefaultSeaTunnelRowSerializer.create(
+ "test_topic_group_with_commit_offset",
+ SEATUNNEL_ROW_TYPE,
+ DEFAULT_FORMAT,
+ DEFAULT_FIELD_DELIMITER);
+ generateTestData(row -> serializer.serializeRow(row), 0, 100);
+ testKafkaGroupOffsetsToConsoleWithCommitOffset(container);
+ }
+
@TestTemplate
@DisabledOnContainer(value = {TestContainerId.SPARK_2_4})
public void testFakeSourceToKafkaAvroFormat(TestContainer container)
@@ -511,6 +532,40 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
+ public void testKafkaGroupOffsetsToConsoleWithCommitOffset(TestContainer
container)
+ throws IOException, InterruptedException, ExecutionException {
+ Container.ExecResult execResult =
+ container.executeJob(
+
"/kafka/kafkasource_group_offset_to_console_with_commit_offset.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+
+ String consumerGroup = "SeaTunnel-Consumer-Group";
+ TopicPartition topicPartition =
+ new TopicPartition("test_topic_group_with_commit_offset", 0);
+ try (AdminClient adminClient = createKafkaAdmin()) {
+ ListConsumerGroupOffsetsOptions options =
+ new ListConsumerGroupOffsetsOptions()
+ .topicPartitions(Arrays.asList(topicPartition));
+ Map<TopicPartition, Long> topicOffset =
+ adminClient
+ .listConsumerGroupOffsets(consumerGroup, options)
+ .partitionsToOffsetAndMetadata()
+ .thenApply(
+ result -> {
+ Map<TopicPartition, Long> offsets =
new HashMap<>();
+ result.forEach(
+ (tp, oam) -> {
+ if (oam != null) {
+ offsets.put(tp,
oam.offset());
+ }
+ });
+ return offsets;
+ })
+ .get();
+ Assertions.assertEquals(100L, topicOffset.get(topicPartition));
+ }
+ }
+
public void testKafkaTimestampToConsole(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
@@ -518,6 +573,13 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
+ private AdminClient createKafkaAdmin() {
+ Properties props = new Properties();
+ String bootstrapServers = kafkaContainer.getBootstrapServers();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ return AdminClient.create(props);
+ }
+
private void initKafkaProducer() {
Properties props = new Properties();
String bootstrapServers = kafkaContainer.getBootstrapServers();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console_with_commit_offset.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console_with_commit_offset.conf
new file mode 100644
index 0000000000..e054c03964
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console_with_commit_offset.conf
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ Kafka {
+ commit_on_checkpoint = true
+ consumer.group = "SeaTunnel-Consumer-Group"
+
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_group_with_commit_offset"
+ result_table_name = "kafka_table"
+ # The default format is json, which is optional
+ format = json
+ start_mode = group_offsets
+ schema = {
+ fields {
+ id = bigint
+ }
+ }
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
+ # please go to
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
+}
+
+transform {
+}
+
+sink {
+ Assert {
+ source_table_name = "kafka_table"
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = bigint
+ field_value = [
+
+ {
+ rule_type = MIN
+ rule_value = 100
+ },
+ {
+ rule_type = MAX
+ rule_value = 149
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file