This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 891652399e [Hotfix][Connector] Fix kafka consumer log next startup 
offset (#7312)
891652399e is described below

commit 891652399e8b97fb5cf1c7e6bdee87e7ec48469e
Author: hailin0 <[email protected]>
AuthorDate: Wed Aug 7 12:46:12 2024 +0800

    [Hotfix][Connector] Fix kafka consumer log next startup offset (#7312)
---
 .../seatunnel/kafka/source/KafkaSourceReader.java  | 10 ++-
 .../seatunnel/kafka/source/KafkaSourceSplit.java   |  4 ++
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     | 62 +++++++++++++++++
 ...group_offset_to_console_with_commit_offset.conf | 77 ++++++++++++++++++++++
 4 files changed, 151 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 02c2a9007e..6f4753110b 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -218,8 +218,14 @@ public class KafkaSourceReader implements 
SourceReader<SeaTunnelRow, KafkaSource
                     }
                 });
         if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
-            finishedSplits.forEach(sourceSplits::remove);
-            if (sourceSplits.isEmpty()) {
+            for (KafkaSourceSplit split : finishedSplits) {
+                split.setFinish(true);
+                if (split.getStartOffset() == -1) {
+                    // log next running read start offset
+                    split.setStartOffset(split.getEndOffset());
+                }
+            }
+            if (sourceSplits.stream().allMatch(KafkaSourceSplit::isFinish)) {
                 context.signalNoMoreElement();
             }
         }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
index 1c7cb17678..8f5bc5f2d3 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
@@ -22,6 +22,9 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 
 import org.apache.kafka.common.TopicPartition;
 
+import lombok.Getter;
+import lombok.Setter;
+
 import java.util.Objects;
 
 public class KafkaSourceSplit implements SourceSplit {
@@ -30,6 +33,7 @@ public class KafkaSourceSplit implements SourceSplit {
     private TopicPartition topicPartition;
     private long startOffset = -1L;
     private long endOffset = -1L;
+    @Setter @Getter private transient volatile boolean finish = false;
 
     public KafkaSourceSplit(TablePath tablePath, TopicPartition 
topicPartition) {
         this.tablePath = tablePath;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 36b25928d9..0d9f5d5ef8 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -35,12 +35,15 @@ import 
org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.container.TestContainerId;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.format.avro.AvroDeserializationSchema;
 import org.apache.seatunnel.format.text.TextSerializationSchema;
 
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -80,6 +83,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
@@ -315,6 +319,23 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         testKafkaGroupOffsetsToConsole(container);
     }
 
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "flink and spark won't commit offset when batch 
job finished")
+    @TestTemplate
+    public void testSourceKafkaStartConfigWithCommitOffset(TestContainer 
container)
+            throws Exception {
+        DefaultSeaTunnelRowSerializer serializer =
+                DefaultSeaTunnelRowSerializer.create(
+                        "test_topic_group_with_commit_offset",
+                        SEATUNNEL_ROW_TYPE,
+                        DEFAULT_FORMAT,
+                        DEFAULT_FIELD_DELIMITER);
+        generateTestData(row -> serializer.serializeRow(row), 0, 100);
+        testKafkaGroupOffsetsToConsoleWithCommitOffset(container);
+    }
+
     @TestTemplate
     @DisabledOnContainer(value = {TestContainerId.SPARK_2_4})
     public void testFakeSourceToKafkaAvroFormat(TestContainer container)
@@ -511,6 +532,40 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
     }
 
+    public void testKafkaGroupOffsetsToConsoleWithCommitOffset(TestContainer 
container)
+            throws IOException, InterruptedException, ExecutionException {
+        Container.ExecResult execResult =
+                container.executeJob(
+                        
"/kafka/kafkasource_group_offset_to_console_with_commit_offset.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+
+        String consumerGroup = "SeaTunnel-Consumer-Group";
+        TopicPartition topicPartition =
+                new TopicPartition("test_topic_group_with_commit_offset", 0);
+        try (AdminClient adminClient = createKafkaAdmin()) {
+            ListConsumerGroupOffsetsOptions options =
+                    new ListConsumerGroupOffsetsOptions()
+                            .topicPartitions(Arrays.asList(topicPartition));
+            Map<TopicPartition, Long> topicOffset =
+                    adminClient
+                            .listConsumerGroupOffsets(consumerGroup, options)
+                            .partitionsToOffsetAndMetadata()
+                            .thenApply(
+                                    result -> {
+                                        Map<TopicPartition, Long> offsets = 
new HashMap<>();
+                                        result.forEach(
+                                                (tp, oam) -> {
+                                                    if (oam != null) {
+                                                        offsets.put(tp, 
oam.offset());
+                                                    }
+                                                });
+                                        return offsets;
+                                    })
+                            .get();
+            Assertions.assertEquals(100L, topicOffset.get(topicPartition));
+        }
+    }
+
     public void testKafkaTimestampToConsole(TestContainer container)
             throws IOException, InterruptedException {
         Container.ExecResult execResult =
@@ -518,6 +573,13 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
     }
 
+    private AdminClient createKafkaAdmin() {
+        Properties props = new Properties();
+        String bootstrapServers = kafkaContainer.getBootstrapServers();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        return AdminClient.create(props);
+    }
+
     private void initKafkaProducer() {
         Properties props = new Properties();
         String bootstrapServers = kafkaContainer.getBootstrapServers();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console_with_commit_offset.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console_with_commit_offset.conf
new file mode 100644
index 0000000000..e054c03964
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console_with_commit_offset.conf
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  read_limit.bytes_per_second=7000000
+  read_limit.rows_per_second=400
+}
+
+source {
+  Kafka {
+    commit_on_checkpoint = true
+    consumer.group = "SeaTunnel-Consumer-Group"
+
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic_group_with_commit_offset"
+    result_table_name = "kafka_table"
+    # The default format is json, which is optional
+    format = json
+    start_mode = group_offsets
+    schema = {
+      fields {
+        id = bigint
+      }
+    }
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
+}
+
+transform {
+}
+
+sink {
+  Assert {
+    source_table_name = "kafka_table"
+    rules =
+      {
+        field_rules = [
+          {
+            field_name = id
+            field_type = bigint
+            field_value = [
+
+              {
+                rule_type = MIN
+                rule_value = 100
+              },
+              {
+                rule_type = MAX
+                rule_value = 149
+              }
+            ]
+          }
+        ]
+      }
+  }
+}
\ No newline at end of file

Reply via email to