This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new bbde7f6339 [Fix][Connector-V2] Optimize start mode of kafka recovery 
job (#9736)
bbde7f6339 is described below

commit bbde7f6339be7501996f24ac9a1a7cb7f690bb69
Author: corgy-w <[email protected]>
AuthorDate: Wed Aug 27 10:53:19 2025 +0800

    [Fix][Connector-V2] Optimize start mode of kafka recovery job (#9736)
---
 .../kafka/source/KafkaPartitionSplitReader.java    |  57 +--
 .../seatunnel/kafka/source/KafkaSource.java        |   2 +
 .../kafka/source/KafkaSourceSplitEnumerator.java   |  42 +-
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     | 478 ++++++++++++++++++++-
 .../kafka/kafka_dynamic_partition_discovery.conf   |  53 +++
 .../kafkasource_restore_with_earliest_mode.conf    |  45 ++
 .../kafkasource_restore_with_latest_mode.conf      |  45 ++
 ...asource_restore_with_specific_offsets_mode.conf |  48 +++
 .../kafkasource_restore_with_timestamp_mode.conf   |  46 ++
 9 files changed, 764 insertions(+), 52 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java
index d7f0dd0d8d..8f17be47e3 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java
@@ -25,7 +25,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWit
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitsAddition;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitsChange;
-import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -173,9 +172,7 @@ public class KafkaPartitionSplitReader
         // Assignment.
         List<TopicPartition> newPartitionAssignments = new ArrayList<>();
         // Starting offsets.
-        Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets = new 
HashMap<>();
-        List<TopicPartition> partitionsStartingFromEarliest = new 
ArrayList<>();
-        List<TopicPartition> partitionsStartingFromLatest = new ArrayList<>();
+        Map<TopicPartition, Long> partitionsStartingOffsets = new HashMap<>();
         // Stopping offsets.
         List<TopicPartition> partitionsStoppingAtLatest = new ArrayList<>();
 
@@ -185,11 +182,7 @@ public class KafkaPartitionSplitReader
                 .forEach(
                         s -> {
                             newPartitionAssignments.add(s.getTopicPartition());
-                            parseStartingOffsets(
-                                    s,
-                                    partitionsStartingFromEarliest,
-                                    partitionsStartingFromLatest,
-                                    partitionsStartingFromSpecifiedOffsets);
+                            parseStartingOffsets(s, partitionsStartingOffsets);
                             parseStoppingOffsets(s, 
partitionsStoppingAtLatest);
                         });
 
@@ -198,10 +191,7 @@ public class KafkaPartitionSplitReader
         consumer.assign(newPartitionAssignments);
 
         // Seek on the newly assigned partitions to their stating offsets.
-        seekToStartingOffsets(
-                partitionsStartingFromEarliest,
-                partitionsStartingFromLatest,
-                partitionsStartingFromSpecifiedOffsets);
+        seekToStartingOffsets(partitionsStartingOffsets);
         // Setup the stopping offsets.
         acquireAndSetStoppingOffsets(partitionsStoppingAtLatest);
 
@@ -270,26 +260,11 @@ public class KafkaPartitionSplitReader
         stoppingOffsets.putAll(endOffset);
     }
 
-    private void seekToStartingOffsets(
-            List<TopicPartition> partitionsStartingFromEarliest,
-            List<TopicPartition> partitionsStartingFromLatest,
-            Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets) {
-
-        if (!partitionsStartingFromEarliest.isEmpty()) {
-            LOG.trace("Seeking starting offsets to beginning: {}", 
partitionsStartingFromEarliest);
-            consumer.seekToBeginning(partitionsStartingFromEarliest);
-        }
-
-        if (!partitionsStartingFromLatest.isEmpty()) {
-            LOG.trace("Seeking starting offsets to end: {}", 
partitionsStartingFromLatest);
-            consumer.seekToEnd(partitionsStartingFromLatest);
-        }
-
-        if (!partitionsStartingFromSpecifiedOffsets.isEmpty()) {
+    private void seekToStartingOffsets(Map<TopicPartition, Long> 
partitionsStartingOffsets) {
+        if (!partitionsStartingOffsets.isEmpty()) {
             LOG.trace(
-                    "Seeking starting offsets to specified offsets: {}",
-                    partitionsStartingFromSpecifiedOffsets);
-            partitionsStartingFromSpecifiedOffsets.forEach(consumer::seek);
+                    "Seeking starting offsets to specified offsets: {}", 
partitionsStartingOffsets);
+            partitionsStartingOffsets.forEach(consumer::seek);
         }
     }
 
@@ -308,22 +283,10 @@ public class KafkaPartitionSplitReader
     }
 
     private void parseStartingOffsets(
-            KafkaSourceSplit split,
-            List<TopicPartition> partitionsStartingFromEarliest,
-            List<TopicPartition> partitionsStartingFromLatest,
-            Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets) {
+            KafkaSourceSplit split, Map<TopicPartition, Long> 
partitionsStartingOffsets) {
         TopicPartition tp = split.getTopicPartition();
-        // Parse starting offsets.
-        ConsumerMetadata metadata = 
kafkaSourceConfig.getMapMetadata().get(split.getTablePath());
-        if (metadata.getStartMode() == StartMode.EARLIEST) {
-            partitionsStartingFromEarliest.add(tp);
-        } else if (metadata.getStartMode() == StartMode.LATEST) {
-            partitionsStartingFromLatest.add(tp);
-        } else if (metadata.getStartMode() == StartMode.GROUP_OFFSETS) {
-            // Do nothing here, the consumer will first try to get the 
committed offsets of
-            // these partitions by default.
-        } else {
-            partitionsStartingFromSpecifiedOffsets.put(tp, 
split.getStartOffset());
+        if (split.getStartOffset() >= 0) {
+            partitionsStartingOffsets.put(tp, split.getStartOffset());
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index c32e9ba2dd..ba399c5d21 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -108,6 +108,7 @@ public class KafkaSource
                 kafkaSourceConfig,
                 enumeratorContext,
                 null,
+                false,
                 getBoundedness() == Boundedness.UNBOUNDED);
     }
 
@@ -119,6 +120,7 @@ public class KafkaSource
                 kafkaSourceConfig,
                 enumeratorContext,
                 checkpointState,
+                true,
                 getBoundedness() == Boundedness.UNBOUNDED);
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 26c61ddb2e..95c4173512 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -22,6 +22,7 @@ import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTestin
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
@@ -38,6 +39,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -73,11 +75,13 @@ public class KafkaSourceSplitEnumerator
     private final Map<String, TablePath> topicMappingTablePathMap = new 
HashMap<>();
 
     private boolean isStreamingMode;
+    private final boolean isRestored;
 
     KafkaSourceSplitEnumerator(
             KafkaSourceConfig kafkaSourceConfig,
             Context<KafkaSourceSplit> context,
             KafkaSourceState sourceState,
+            boolean isRestored,
             boolean isStreamingMode) {
         this.kafkaSourceConfig = kafkaSourceConfig;
         this.tablePathMetadataMap = kafkaSourceConfig.getMapMetadata();
@@ -87,6 +91,22 @@ public class KafkaSourceSplitEnumerator
         this.adminClient = 
initAdminClient(this.kafkaSourceConfig.getProperties());
         this.discoveryIntervalMillis = 
kafkaSourceConfig.getDiscoveryIntervalMillis();
         this.isStreamingMode = isStreamingMode;
+        this.isRestored = isRestored;
+
+        if (this.isRestored) {
+            log.info("Task is being restored, forcing start mode to 
GROUP_OFFSETS for all topics");
+            this.tablePathMetadataMap.forEach(
+                    (tablePath, metadata) -> {
+                        StartMode originalMode = metadata.getStartMode();
+                        if (originalMode != StartMode.GROUP_OFFSETS) {
+                            log.info(
+                                    "Changing start mode from {} to 
GROUP_OFFSETS for table path: {}",
+                                    originalMode,
+                                    tablePath);
+                            metadata.setStartMode(StartMode.GROUP_OFFSETS);
+                        }
+                    });
+        }
     }
 
     @VisibleForTesting
@@ -102,6 +122,7 @@ public class KafkaSourceSplitEnumerator
         this.kafkaSourceConfig = kafkaSourceConfig;
         this.pendingSplit = pendingSplit;
         this.assignedSplit = assignedSplit;
+        this.isRestored = false;
     }
 
     @VisibleForTesting
@@ -172,7 +193,11 @@ public class KafkaSourceSplitEnumerator
             // Supports topic list fine-grained Settings for kafka consumer 
configurations
             ConsumerMetadata metadata = tablePathMetadataMap.get(tablePath);
             Set<TopicPartition> topicPartitions = 
tablePathPartitionMap.get(tablePath);
-            switch (metadata.getStartMode()) {
+
+            StartMode effectiveStartMode =
+                    isRestored ? StartMode.GROUP_OFFSETS : 
metadata.getStartMode();
+
+            switch (effectiveStartMode) {
                 case EARLIEST:
                     topicPartitionOffsets.putAll(
                             listOffsets(topicPartitions, 
OffsetSpec.earliest()));
@@ -468,6 +493,21 @@ public class KafkaSourceSplitEnumerator
                         split -> {
                             if 
(!assignedSplit.containsKey(split.getTopicPartition())) {
                                 if 
(!pendingSplit.containsKey(split.getTopicPartition())) {
+                                    if (initialized) {
+                                        // For newly discovered partitions, 
set the start offset to
+                                        // start from the earliest
+                                        try {
+                                            split.setStartOffset(
+                                                    listOffsets(
+                                                                    
Collections.singletonList(
+                                                                            
split
+                                                                               
     .getTopicPartition()),
+                                                                    
OffsetSpec.earliest())
+                                                            
.get(split.getTopicPartition()));
+                                        } catch (ExecutionException | 
InterruptedException e) {
+                                            throw new RuntimeException(e);
+                                        }
+                                    }
                                     
pendingSplit.put(split.getTopicPartition(), split);
                                 }
                             }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index bc3d4f2ed5..124ed7ff17 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -54,7 +54,9 @@ import 
org.apache.seatunnel.format.text.TextSerializationSchema;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.NewPartitions;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -81,6 +83,7 @@ import org.testcontainers.containers.Container;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
 import org.testcontainers.utility.DockerImageName;
 import org.testcontainers.utility.DockerLoggerFactory;
 
@@ -110,6 +113,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
 
 @Slf4j
@@ -143,7 +148,7 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         given().ignoreExceptions()
                 .atLeast(100, TimeUnit.MILLISECONDS)
                 .pollInterval(500, TimeUnit.MILLISECONDS)
-                .atMost(180, TimeUnit.SECONDS)
+                .atMost(180, SECONDS)
                 .untilAsserted(this::initKafkaProducer);
 
         Properties adminProps = new Properties();
@@ -472,6 +477,471 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         testKafkaTimestampToConsole(container);
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            type = {EngineType.SPARK, EngineType.FLINK},
+            value = {})
+    public void testDynamicPartitionDiscovery(TestContainer container)
+            throws InterruptedException, ExecutionException {
+
+        final String sourceTopic = "test_topic_dynamic_partition";
+        final String outputTopic = "test_topic_dynamic_partition_output";
+        final String jobId = "18696753645407";
+
+        // Write initial data to the existing partition (partition 0)
+        for (int i = 0; i < 10; i++) {
+            String message =
+                    String.format(
+                            
"{\"id\":%d,\"message\":\"initial_message_%d\",\"timestamp\":%d}",
+                            i, i, System.currentTimeMillis());
+            producer.send(new ProducerRecord<>(sourceTopic, null, 
message.getBytes()));
+        }
+        producer.flush();
+
+        // Start the streaming job asynchronously
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        container.executeJob(
+                                
"/kafka/kafka_dynamic_partition_discovery.conf", jobId);
+                    } catch (Exception e) {
+                        log.error("Dynamic partition discovery job execution 
exception", e);
+                        throw new RuntimeException(e);
+                    }
+                });
+
+        // Wait for job to start and process initial data
+        Awaitility.await().pollDelay(5, SECONDS).atMost(1, MINUTES).until(() 
-> true);
+
+        try (AdminClient adminClient = createKafkaAdmin()) {
+            Map<String, NewPartitions> newPartitions = new HashMap<>();
+            newPartitions.put(sourceTopic, NewPartitions.increaseTo(2));
+            adminClient.createPartitions(newPartitions).all().get();
+            log.info("Successfully created new partition for topic: {}", 
sourceTopic);
+        }
+
+        Awaitility.await().pollDelay(3, SECONDS).atMost(30, SECONDS).until(() 
-> true);
+
+        for (int i = 0; i < 15; i++) {
+            String message =
+                    String.format(
+                            
"{\"id\":%d,\"message\":\"new_partition_message_%d\",\"timestamp\":%d}",
+                            i + 100, i, System.currentTimeMillis());
+            producer.send(new ProducerRecord<>(sourceTopic, 1, null, 
message.getBytes()));
+        }
+        producer.flush();
+
+        Awaitility.await()
+                .pollInterval(2, SECONDS)
+                .atMost(2, MINUTES)
+                .until(
+                        () -> {
+                            try {
+                                // Check the output topic data count
+                                List<String> outputData = 
getKafkaConsumerListData(outputTopic);
+                                log.info("Output topic data count: {}", 
outputData.size());
+                                return outputData.size() >= 15 && 
outputData.size() < 25;
+                            } catch (Exception e) {
+                                log.error("Error checking output topic data", 
e);
+                                return false;
+                            }
+                        });
+
+        try (AdminClient adminClient = createKafkaAdmin()) {
+            Map<String, TopicDescription> topicDescriptions =
+                    
adminClient.describeTopics(Arrays.asList(sourceTopic)).allTopicNames().get();
+            TopicDescription topicDescription = 
topicDescriptions.get(sourceTopic);
+            int partitionCount = topicDescription.partitions().size();
+            log.info("Current partition count for topic {}: {}", sourceTopic, 
partitionCount);
+            Assertions.assertTrue(partitionCount >= 2, "Partition count should 
be at least 2");
+        }
+
+        log.info("Dynamic partition discovery test completed successfully");
+    }
+
+    // ------------------------------ restore --------------------------------
+    // ----------------------------- EARLIEST MODE 
-----------------------------
+    @TestTemplate
+    @DisabledOnContainer(
+            type = {EngineType.SPARK, EngineType.FLINK},
+            value = {})
+    public void testSourceKafkaRestoreWithEarliestMode(TestContainer container)
+            throws IOException, InterruptedException {
+
+        final String sourceTopic = "test_topic_restore_earliest";
+        final String sinkTopic = "test_topic_restore_earliest_output";
+        final String payload = "Seatunnel Restore Test Data";
+        final String jobId = "18696753645408";
+
+        // Write 20 initial records with unique keys (avoid any potential 
dedup logic
+        // elsewhere).
+        for (int i = 0; i < 20; i++) {
+            producer.send(
+                    new ProducerRecord<>(sourceTopic, ("key_" + i).getBytes(), 
payload.getBytes()));
+        }
+        producer.flush();
+
+        // Capture source end offset (LEO) on partition 0 before starting the 
job.
+        long srcEndBeforeStart = endOffsetOnP0(sourceTopic);
+
+        // Start the first streaming job asynchronously.
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        container.executeJob(
+                                
"/kafka/kafkasource_restore_with_earliest_mode.conf", jobId);
+                    } catch (Exception e) {
+                        log.error("First job execution exception", e);
+                        throw new RuntimeException(e);
+                    }
+                });
+
+        // Warm up (simple delay).
+        Awaitility.await().pollDelay(5, SECONDS).atMost(1, MINUTES).until(() 
-> true);
+
+        // Produce 10 additional records after the job starts.
+        for (int i = 0; i < 10; i++) {
+            producer.send(
+                    new ProducerRecord<>(
+                            sourceTopic,
+                            ("key_additional_" + i).getBytes(),
+                            (payload + "_additional").getBytes()));
+        }
+        producer.flush();
+
+        // In earliest mode, first run should consume at least initial 20 + 
additional
+        // 10.
+        final long expectedSinkAfterFirstRun = srcEndBeforeStart + 10;
+        Awaitility.await()
+                .pollInterval(2, SECONDS)
+                .atMost(2, MINUTES)
+                .until(() -> visibleCountOnP0(sinkTopic) == 
expectedSinkAfterFirstRun);
+
+        // Savepoint the running job (so restore should continue from this 
position).
+        container.savepointJob(jobId);
+
+        // Append 15 records after savepoint, used to validate restore 
progress.
+        for (int i = 0; i < 15; i++) {
+            producer.send(
+                    new ProducerRecord<>(
+                            sourceTopic,
+                            ("key_restore_" + i).getBytes(),
+                            (payload + "_restore").getBytes()));
+        }
+        producer.flush();
+
+        // Source end offset should move forward by at least 25 (10 + 15) from 
the
+        // captured point.
+        long srcEndAfterAll = endOffsetOnP0(sourceTopic);
+        Assertions.assertTrue(
+                srcEndAfterAll == srcEndBeforeStart + 25,
+                "Final end offset should advance by at least 25");
+
+        // Restore the job from the savepoint asynchronously.
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        container.restoreJob(
+                                
"/kafka/kafkasource_restore_with_earliest_mode.conf", jobId);
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+
+        // After restore, sink should advance by the 15 newly produced records 
at
+        // minimum.
+        Awaitility.await()
+                .pollDelay(3, SECONDS)
+                .pollInterval(2, SECONDS)
+                .atMost(5, MINUTES)
+                .until(() -> visibleCountOnP0(sinkTopic) == 
expectedSinkAfterFirstRun + 15);
+    }
+
+    // ------------------------------ LATEST MODE 
------------------------------
+
+    @TestTemplate
+    @DisabledOnContainer(
+            type = {EngineType.SPARK, EngineType.FLINK},
+            value = {})
+    public void testSourceKafkaRestoreWithLatestMode(TestContainer container)
+            throws IOException, InterruptedException {
+
+        final String sourceTopic = "test_topic_restore_latest";
+        final String sinkTopic = "test_topic_restore_latest_output";
+        final String payload = "Seatunnel Restore Test Data Latest";
+        final String jobId = "18696753645410";
+
+        // Write 20 initial records before starting the job.
+        for (int i = 0; i < 20; i++) {
+            producer.send(
+                    new ProducerRecord<>(sourceTopic, ("key_" + i).getBytes(), 
payload.getBytes()));
+        }
+        producer.flush();
+
+        long srcEndBeforeStart = endOffsetOnP0(sourceTopic);
+
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        container.executeJob(
+                                
"/kafka/kafkasource_restore_with_latest_mode.conf", jobId);
+                    } catch (Exception e) {
+                        log.error("First job execution exception", e);
+                        throw new RuntimeException(e);
+                    }
+                });
+
+        Awaitility.await().pollDelay(5, SECONDS).atMost(1, MINUTES).until(() 
-> true);
+
+        // Produce 10 records after job start; latest mode should consume only 
these 10
+        // initially.
+        for (int i = 0; i < 10; i++) {
+            producer.send(
+                    new ProducerRecord<>(
+                            sourceTopic,
+                            ("key_additional_" + i).getBytes(),
+                            (payload + "_additional").getBytes()));
+        }
+        producer.flush();
+
+        final long expectedSinkAfterFirstRun = 10;
+        Awaitility.await()
+                .pollInterval(2, SECONDS)
+                .atMost(2, MINUTES)
+                .until(() -> visibleCountOnP0(sinkTopic) == 
expectedSinkAfterFirstRun);
+
+        container.savepointJob(jobId);
+
+        // Append 15 more records after savepoint.
+        for (int i = 0; i < 15; i++) {
+            producer.send(
+                    new ProducerRecord<>(
+                            sourceTopic,
+                            ("key_restore_" + i).getBytes(),
+                            (payload + "_restore").getBytes()));
+        }
+        producer.flush();
+
+        long srcEndAfterAll = endOffsetOnP0(sourceTopic);
+        Assertions.assertTrue(
+                srcEndAfterAll == srcEndBeforeStart + 25,
+                "Final end offset should advance by at least 25");
+
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        container.restoreJob(
+                                
"/kafka/kafkasource_restore_with_latest_mode.conf", jobId);
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+
+        Awaitility.await()
+                .pollDelay(3, SECONDS)
+                .pollInterval(2, SECONDS)
+                .atMost(5, MINUTES)
+                .until(() -> visibleCountOnP0(sinkTopic) == 
expectedSinkAfterFirstRun + 15);
+    }
+
+    // ---------------------------- TIMESTAMP MODE 
-----------------------------
+
+    @TestTemplate
+    @DisabledOnContainer(
+            type = {EngineType.SPARK, EngineType.FLINK},
+            value = {})
+    public void testSourceKafkaRestoreWithTimestampMode(TestContainer 
container)
+            throws IOException, InterruptedException {
+
+        final String sourceTopic = "test_topic_restore_timestamp";
+        final String sinkTopic = "test_topic_restore_timestamp_output";
+        final String payload = "Seatunnel Restore Test Data Timestamp";
+        final String jobId = "18696753645411";
+
+        for (int i = 0; i < 20; i++) {
+            producer.send(
+                    new ProducerRecord<>(sourceTopic, ("key_" + i).getBytes(), 
payload.getBytes()));
+        }
+        producer.flush();
+
+        long srcEndBeforeStart = endOffsetOnP0(sourceTopic);
+
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        container.executeJob(
+                                
"/kafka/kafkasource_restore_with_timestamp_mode.conf", jobId);
+                    } catch (Exception e) {
+                        log.error("First job execution exception", e);
+                        throw new RuntimeException(e);
+                    }
+                });
+
+        Awaitility.await().pollDelay(5, SECONDS).atMost(1, MINUTES).until(() 
-> true);
+
+        // Produce 10 records after job start.
+        for (int i = 0; i < 10; i++) {
+            producer.send(
+                    new ProducerRecord<>(
+                            sourceTopic,
+                            ("key_additional_" + i).getBytes(),
+                            (payload + "_additional").getBytes()));
+        }
+        producer.flush();
+
+        // Keep original semantics: expected sink count depends on 
timestamp-based start
+        // config.
+        final long expectedSinkAfterFirstRun = srcEndBeforeStart + 10;
+        Awaitility.await()
+                .pollInterval(2, SECONDS)
+                .atMost(2, MINUTES)
+                .until(() -> visibleCountOnP0(sinkTopic) == 
expectedSinkAfterFirstRun);
+
+        container.savepointJob(jobId);
+
+        // Append 15 more records after savepoint.
+        for (int i = 0; i < 15; i++) {
+            producer.send(
+                    new ProducerRecord<>(
+                            sourceTopic,
+                            ("key_restore_" + i).getBytes(),
+                            (payload + "_restore").getBytes()));
+        }
+        producer.flush();
+
+        long srcEndAfterAll = endOffsetOnP0(sourceTopic);
+        Assertions.assertTrue(
+                srcEndAfterAll == srcEndBeforeStart + 25,
+                "Final end offset should advance by at least 25");
+
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        container.restoreJob(
+                                
"/kafka/kafkasource_restore_with_timestamp_mode.conf", jobId);
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+
+        Awaitility.await()
+                .pollDelay(3, SECONDS)
+                .pollInterval(2, SECONDS)
+                .atMost(5, MINUTES)
+                .until(() -> visibleCountOnP0(sinkTopic) == 
expectedSinkAfterFirstRun + 15);
+    }
+
+    // ------------------------- SPECIFIC OFFSETS MODE 
-------------------------
+
+    @TestTemplate
+    @DisabledOnContainer(
+            type = {EngineType.SPARK, EngineType.FLINK},
+            value = {})
+    public void testSourceKafkaRestoreWithSpecificOffsetsMode(TestContainer 
container)
+            throws IOException, InterruptedException {
+
+        final String sourceTopic = "test_topic_restore_specific_offsets";
+        final String sinkTopic = "test_topic_restore_specific_offsets_output";
+        final String payload = "Seatunnel Restore Test Data Specific Offsets";
+        final String jobId = "18696753645412";
+
+        for (int i = 0; i < 20; i++) {
+            producer.send(
+                    new ProducerRecord<>(sourceTopic, ("key_" + i).getBytes(), 
payload.getBytes()));
+        }
+        producer.flush();
+
+        long srcEndBeforeStart = endOffsetOnP0(sourceTopic);
+
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        container.executeJob(
+                                
"/kafka/kafkasource_restore_with_specific_offsets_mode.conf",
+                                jobId);
+                    } catch (Exception e) {
+                        log.error("First job execution exception", e);
+                        throw new RuntimeException(e);
+                    }
+                });
+
+        Awaitility.await().pollDelay(5, SECONDS).atMost(1, MINUTES).until(() 
-> true);
+
+        // Produce 10 records after job start.
+        for (int i = 0; i < 10; i++) {
+            producer.send(
+                    new ProducerRecord<>(
+                            sourceTopic,
+                            ("key_additional_" + i).getBytes(),
+                            (payload + "_additional").getBytes()));
+        }
+        producer.flush();
+
+        // Keep original semantics: expected sink count depends on explicit 
offset
+        // config. -> 11
+        final long expectedSinkAfterFirstRun = srcEndBeforeStart + 10;
+        Awaitility.await()
+                .pollInterval(2, SECONDS)
+                .atMost(2, MINUTES)
+                .until(() -> visibleCountOnP0(sinkTopic) == 
expectedSinkAfterFirstRun - 11);
+
+        container.savepointJob(jobId);
+
+        // Append 15 more records after savepoint.
+        for (int i = 0; i < 15; i++) {
+            producer.send(
+                    new ProducerRecord<>(
+                            sourceTopic,
+                            ("key_restore_" + i).getBytes(),
+                            (payload + "_restore").getBytes()));
+        }
+        producer.flush();
+
+        long srcEndAfterAll = endOffsetOnP0(sourceTopic);
+        Assertions.assertTrue(
+                srcEndAfterAll == srcEndBeforeStart + 25,
+                "Final end offset should advance by at least 25");
+
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        container.restoreJob(
+                                
"/kafka/kafkasource_restore_with_specific_offsets_mode.conf",
+                                jobId);
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+
+        Awaitility.await()
+                .pollDelay(3, SECONDS)
+                .pollInterval(2, SECONDS)
+                .atMost(5, MINUTES)
+                .until(() -> visibleCountOnP0(sinkTopic) == 
expectedSinkAfterFirstRun + 15 - 11);
+    }
+
+    /**
+     * Get visible record count on partition-0: endOffset - beginningOffset 
(exclusive upper bound).
+     */
+    private long visibleCountOnP0(String topic) {
+        try (KafkaConsumer<String, String> c = new 
KafkaConsumer<>(kafkaConsumerConfig())) {
+            TopicPartition tp0 = new TopicPartition(topic, 0);
+            c.assign(Collections.singletonList(tp0));
+            long begin = 
c.beginningOffsets(Collections.singletonList(tp0)).get(tp0);
+            long end = c.endOffsets(Collections.singletonList(tp0)).get(tp0);
+            return end - begin;
+        }
+    }
+
+    /** Get the current end offset (LEO) on partition-0. */
+    private long endOffsetOnP0(String topic) {
+        try (KafkaConsumer<String, String> c = new 
KafkaConsumer<>(kafkaConsumerConfig())) {
+            TopicPartition tp0 = new TopicPartition(topic, 0);
+            c.assign(Collections.singletonList(tp0));
+            return c.endOffsets(Collections.singletonList(tp0)).get(tp0);
+        }
+    }
+
     @TestTemplate
     public void testSourceKafkaWithEndTimestamp(TestContainer container)
             throws IOException, InterruptedException {
@@ -973,10 +1443,10 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                 });
         // wait for data written to kafka
         Long finalEndOffset = endOffset;
-        given().pollDelay(30, TimeUnit.SECONDS)
-                .pollInterval(5, TimeUnit.SECONDS)
+        given().pollDelay(30, SECONDS)
+                .pollInterval(5, SECONDS)
                 .await()
-                .atMost(5, TimeUnit.MINUTES)
+                .atMost(5, MINUTES)
                 .untilAsserted(
                         () ->
                                 Assertions.assertTrue(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_dynamic_partition_discovery.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_dynamic_partition_discovery.conf
new file mode 100644
index 0000000000..b1de6df0fa
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_dynamic_partition_discovery.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  job.mode = "STREAMING"
+  parallelism = 2
+  checkpoint.interval = 5000
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic_dynamic_partition"
+    plugin_output = "kafka_table"
+    start_mode = latest
+    partition-discovery.interval-millis = 5000
+    consumer.group = "seatunnel-dynamic-partition-test-group"
+    format = json
+    schema = {
+      fields {
+        id = bigint
+        message = string
+        timestamp = bigint
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic_dynamic_partition_output"
+    plugin_input = "kafka_table"
+    format = json
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_earliest_mode.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_earliest_mode.conf
new file mode 100644
index 0000000000..ac4c7d9117
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_earliest_mode.conf
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    parallelism = 1
+    job.mode = "STREAMING"
+    checkpoint.interval = 5000
+    checkpoint.timeout = 60000
+}
+
+source {
+    Kafka {
+        bootstrap.servers = "kafkaCluster:9092"
+        topic = "test_topic_restore_earliest"
+        format = text
+        start_mode = earliest
+        consumer.group = "test_restore_group"
+        discovery.interval.millis = 10000
+    }
+}
+
+transform {
+}
+
+sink {
+    Kafka {
+        bootstrap.servers = "kafkaCluster:9092"
+        topic = "test_topic_restore_earliest_output"
+        format = text
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_latest_mode.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_latest_mode.conf
new file mode 100644
index 0000000000..00b7b38c0a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_latest_mode.conf
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    parallelism = 1
+    job.mode = "STREAMING"
+    checkpoint.interval = 5000
+    checkpoint.timeout = 60000
+}
+
+source {
+    Kafka {
+        bootstrap.servers = "kafkaCluster:9092"
+        topic = "test_topic_restore_latest"
+        format = text
+        start_mode = latest
+        consumer.group = "test_restore_latest"
+        discovery.interval.millis = 10000
+    }
+}
+
+transform {
+}
+
+sink {
+    Kafka {
+        bootstrap.servers = "kafkaCluster:9092"
+        topic = "test_topic_restore_latest_output"
+        format = text
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_specific_offsets_mode.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_specific_offsets_mode.conf
new file mode 100644
index 0000000000..bc929bb24b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_specific_offsets_mode.conf
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    parallelism = 1
+    job.mode = "STREAMING"
+    checkpoint.interval = 5000
+    checkpoint.timeout = 60000
+}
+
+source {
+    Kafka {
+        bootstrap.servers = "kafkaCluster:9092"
+        topic = "test_topic_restore_specific_offsets"
+        format = text
+        start_mode = specific_offsets
+        start_mode.offsets = {
+            test_topic_restore_specific_offsets-0 = 11
+        }
+        consumer.group = "test_restore_specific_offsets"
+        discovery.interval.millis = 10000
+    }
+}
+
+transform {
+}
+
+sink {
+    Kafka {
+        bootstrap.servers = "kafkaCluster:9092"
+        topic = "test_topic_restore_specific_offsets_output"
+        format = text
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_timestamp_mode.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_timestamp_mode.conf
new file mode 100644
index 0000000000..f42dae81f5
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_timestamp_mode.conf
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    parallelism = 1
+    job.mode = "STREAMING"
+    checkpoint.interval = 5000
+    checkpoint.timeout = 60000
+}
+
+source {
+    Kafka {
+        bootstrap.servers = "kafkaCluster:9092"
+        topic = "test_topic_restore_timestamp"
+        format = text
+        start_mode = timestamp
+        start_mode.timestamp = 1738395840000
+        consumer.group = "test_restore_timestamp"
+        discovery.interval.millis = 10000
+    }
+}
+
+transform {
+}
+
+sink {
+    Kafka {
+        bootstrap.servers = "kafkaCluster:9092"
+        topic = "test_topic_restore_timestamp_output"
+        format = text
+    }
+}


Reply via email to