This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new bbde7f6339 [Fix][Connector-V2] Optimize start mode of kafka recovery
job (#9736)
bbde7f6339 is described below
commit bbde7f6339be7501996f24ac9a1a7cb7f690bb69
Author: corgy-w <[email protected]>
AuthorDate: Wed Aug 27 10:53:19 2025 +0800
[Fix][Connector-V2] Optimize start mode of kafka recovery job (#9736)
---
.../kafka/source/KafkaPartitionSplitReader.java | 57 +--
.../seatunnel/kafka/source/KafkaSource.java | 2 +
.../kafka/source/KafkaSourceSplitEnumerator.java | 42 +-
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 478 ++++++++++++++++++++-
.../kafka/kafka_dynamic_partition_discovery.conf | 53 +++
.../kafkasource_restore_with_earliest_mode.conf | 45 ++
.../kafkasource_restore_with_latest_mode.conf | 45 ++
...asource_restore_with_specific_offsets_mode.conf | 48 +++
.../kafkasource_restore_with_timestamp_mode.conf | 46 ++
9 files changed, 764 insertions(+), 52 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java
index d7f0dd0d8d..8f17be47e3 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java
@@ -25,7 +25,6 @@ import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWit
import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitsAddition;
import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitsChange;
-import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -173,9 +172,7 @@ public class KafkaPartitionSplitReader
// Assignment.
List<TopicPartition> newPartitionAssignments = new ArrayList<>();
// Starting offsets.
- Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets = new
HashMap<>();
- List<TopicPartition> partitionsStartingFromEarliest = new
ArrayList<>();
- List<TopicPartition> partitionsStartingFromLatest = new ArrayList<>();
+ Map<TopicPartition, Long> partitionsStartingOffsets = new HashMap<>();
// Stopping offsets.
List<TopicPartition> partitionsStoppingAtLatest = new ArrayList<>();
@@ -185,11 +182,7 @@ public class KafkaPartitionSplitReader
.forEach(
s -> {
newPartitionAssignments.add(s.getTopicPartition());
- parseStartingOffsets(
- s,
- partitionsStartingFromEarliest,
- partitionsStartingFromLatest,
- partitionsStartingFromSpecifiedOffsets);
+ parseStartingOffsets(s, partitionsStartingOffsets);
parseStoppingOffsets(s,
partitionsStoppingAtLatest);
});
@@ -198,10 +191,7 @@ public class KafkaPartitionSplitReader
consumer.assign(newPartitionAssignments);
// Seek on the newly assigned partitions to their stating offsets.
- seekToStartingOffsets(
- partitionsStartingFromEarliest,
- partitionsStartingFromLatest,
- partitionsStartingFromSpecifiedOffsets);
+ seekToStartingOffsets(partitionsStartingOffsets);
// Setup the stopping offsets.
acquireAndSetStoppingOffsets(partitionsStoppingAtLatest);
@@ -270,26 +260,11 @@ public class KafkaPartitionSplitReader
stoppingOffsets.putAll(endOffset);
}
- private void seekToStartingOffsets(
- List<TopicPartition> partitionsStartingFromEarliest,
- List<TopicPartition> partitionsStartingFromLatest,
- Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets) {
-
- if (!partitionsStartingFromEarliest.isEmpty()) {
- LOG.trace("Seeking starting offsets to beginning: {}",
partitionsStartingFromEarliest);
- consumer.seekToBeginning(partitionsStartingFromEarliest);
- }
-
- if (!partitionsStartingFromLatest.isEmpty()) {
- LOG.trace("Seeking starting offsets to end: {}",
partitionsStartingFromLatest);
- consumer.seekToEnd(partitionsStartingFromLatest);
- }
-
- if (!partitionsStartingFromSpecifiedOffsets.isEmpty()) {
+ private void seekToStartingOffsets(Map<TopicPartition, Long>
partitionsStartingOffsets) {
+ if (!partitionsStartingOffsets.isEmpty()) {
LOG.trace(
- "Seeking starting offsets to specified offsets: {}",
- partitionsStartingFromSpecifiedOffsets);
- partitionsStartingFromSpecifiedOffsets.forEach(consumer::seek);
+ "Seeking starting offsets to specified offsets: {}",
partitionsStartingOffsets);
+ partitionsStartingOffsets.forEach(consumer::seek);
}
}
@@ -308,22 +283,10 @@ public class KafkaPartitionSplitReader
}
private void parseStartingOffsets(
- KafkaSourceSplit split,
- List<TopicPartition> partitionsStartingFromEarliest,
- List<TopicPartition> partitionsStartingFromLatest,
- Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets) {
+ KafkaSourceSplit split, Map<TopicPartition, Long>
partitionsStartingOffsets) {
TopicPartition tp = split.getTopicPartition();
- // Parse starting offsets.
- ConsumerMetadata metadata =
kafkaSourceConfig.getMapMetadata().get(split.getTablePath());
- if (metadata.getStartMode() == StartMode.EARLIEST) {
- partitionsStartingFromEarliest.add(tp);
- } else if (metadata.getStartMode() == StartMode.LATEST) {
- partitionsStartingFromLatest.add(tp);
- } else if (metadata.getStartMode() == StartMode.GROUP_OFFSETS) {
- // Do nothing here, the consumer will first try to get the
committed offsets of
- // these partitions by default.
- } else {
- partitionsStartingFromSpecifiedOffsets.put(tp,
split.getStartOffset());
+ if (split.getStartOffset() >= 0) {
+ partitionsStartingOffsets.put(tp, split.getStartOffset());
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index c32e9ba2dd..ba399c5d21 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -108,6 +108,7 @@ public class KafkaSource
kafkaSourceConfig,
enumeratorContext,
null,
+ false,
getBoundedness() == Boundedness.UNBOUNDED);
}
@@ -119,6 +120,7 @@ public class KafkaSource
kafkaSourceConfig,
enumeratorContext,
checkpointState,
+ true,
getBoundedness() == Boundedness.UNBOUNDED);
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 26c61ddb2e..95c4173512 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -22,6 +22,7 @@ import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTestin
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
@@ -38,6 +39,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -73,11 +75,13 @@ public class KafkaSourceSplitEnumerator
private final Map<String, TablePath> topicMappingTablePathMap = new
HashMap<>();
private boolean isStreamingMode;
+ private final boolean isRestored;
KafkaSourceSplitEnumerator(
KafkaSourceConfig kafkaSourceConfig,
Context<KafkaSourceSplit> context,
KafkaSourceState sourceState,
+ boolean isRestored,
boolean isStreamingMode) {
this.kafkaSourceConfig = kafkaSourceConfig;
this.tablePathMetadataMap = kafkaSourceConfig.getMapMetadata();
@@ -87,6 +91,22 @@ public class KafkaSourceSplitEnumerator
this.adminClient =
initAdminClient(this.kafkaSourceConfig.getProperties());
this.discoveryIntervalMillis =
kafkaSourceConfig.getDiscoveryIntervalMillis();
this.isStreamingMode = isStreamingMode;
+ this.isRestored = isRestored;
+
+ if (this.isRestored) {
+ log.info("Task is being restored, forcing start mode to
GROUP_OFFSETS for all topics");
+ this.tablePathMetadataMap.forEach(
+ (tablePath, metadata) -> {
+ StartMode originalMode = metadata.getStartMode();
+ if (originalMode != StartMode.GROUP_OFFSETS) {
+ log.info(
+ "Changing start mode from {} to
GROUP_OFFSETS for table path: {}",
+ originalMode,
+ tablePath);
+ metadata.setStartMode(StartMode.GROUP_OFFSETS);
+ }
+ });
+ }
}
@VisibleForTesting
@@ -102,6 +122,7 @@ public class KafkaSourceSplitEnumerator
this.kafkaSourceConfig = kafkaSourceConfig;
this.pendingSplit = pendingSplit;
this.assignedSplit = assignedSplit;
+ this.isRestored = false;
}
@VisibleForTesting
@@ -172,7 +193,11 @@ public class KafkaSourceSplitEnumerator
// Supports topic list fine-grained Settings for kafka consumer
configurations
ConsumerMetadata metadata = tablePathMetadataMap.get(tablePath);
Set<TopicPartition> topicPartitions =
tablePathPartitionMap.get(tablePath);
- switch (metadata.getStartMode()) {
+
+ StartMode effectiveStartMode =
+ isRestored ? StartMode.GROUP_OFFSETS :
metadata.getStartMode();
+
+ switch (effectiveStartMode) {
case EARLIEST:
topicPartitionOffsets.putAll(
listOffsets(topicPartitions,
OffsetSpec.earliest()));
@@ -468,6 +493,21 @@ public class KafkaSourceSplitEnumerator
split -> {
if
(!assignedSplit.containsKey(split.getTopicPartition())) {
if
(!pendingSplit.containsKey(split.getTopicPartition())) {
+ if (initialized) {
+ // For newly discovered partitions,
set the start offset to
+ // start from the earliest
+ try {
+ split.setStartOffset(
+ listOffsets(
+
Collections.singletonList(
+
split
+
.getTopicPartition()),
+
OffsetSpec.earliest())
+
.get(split.getTopicPartition()));
+ } catch (ExecutionException |
InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
pendingSplit.put(split.getTopicPartition(), split);
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index bc3d4f2ed5..124ed7ff17 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -54,7 +54,9 @@ import
org.apache.seatunnel.format.text.TextSerializationSchema;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -81,6 +83,7 @@ import org.testcontainers.containers.Container;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;
@@ -110,6 +113,8 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
@Slf4j
@@ -143,7 +148,7 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
given().ignoreExceptions()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
- .atMost(180, TimeUnit.SECONDS)
+ .atMost(180, SECONDS)
.untilAsserted(this::initKafkaProducer);
Properties adminProps = new Properties();
@@ -472,6 +477,471 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
testKafkaTimestampToConsole(container);
}
+ @TestTemplate
+ @DisabledOnContainer(
+ type = {EngineType.SPARK, EngineType.FLINK},
+ value = {})
+ public void testDynamicPartitionDiscovery(TestContainer container)
+ throws InterruptedException, ExecutionException {
+
+ final String sourceTopic = "test_topic_dynamic_partition";
+ final String outputTopic = "test_topic_dynamic_partition_output";
+ final String jobId = "18696753645407";
+
+ // Write initial data to the existing partition (partition 0)
+ for (int i = 0; i < 10; i++) {
+ String message =
+ String.format(
+
"{\"id\":%d,\"message\":\"initial_message_%d\",\"timestamp\":%d}",
+ i, i, System.currentTimeMillis());
+ producer.send(new ProducerRecord<>(sourceTopic, null,
message.getBytes()));
+ }
+ producer.flush();
+
+ // Start the streaming job asynchronously
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.executeJob(
+
"/kafka/kafka_dynamic_partition_discovery.conf", jobId);
+ } catch (Exception e) {
+ log.error("Dynamic partition discovery job execution
exception", e);
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Wait for job to start and process initial data
+ Awaitility.await().pollDelay(5, SECONDS).atMost(1, MINUTES).until(()
-> true);
+
+ try (AdminClient adminClient = createKafkaAdmin()) {
+ Map<String, NewPartitions> newPartitions = new HashMap<>();
+ newPartitions.put(sourceTopic, NewPartitions.increaseTo(2));
+ adminClient.createPartitions(newPartitions).all().get();
+ log.info("Successfully created new partition for topic: {}",
sourceTopic);
+ }
+
+ Awaitility.await().pollDelay(3, SECONDS).atMost(30, SECONDS).until(()
-> true);
+
+ for (int i = 0; i < 15; i++) {
+ String message =
+ String.format(
+
"{\"id\":%d,\"message\":\"new_partition_message_%d\",\"timestamp\":%d}",
+ i + 100, i, System.currentTimeMillis());
+ producer.send(new ProducerRecord<>(sourceTopic, 1, null,
message.getBytes()));
+ }
+ producer.flush();
+
+ Awaitility.await()
+ .pollInterval(2, SECONDS)
+ .atMost(2, MINUTES)
+ .until(
+ () -> {
+ try {
+ // Check the output topic data count
+ List<String> outputData =
getKafkaConsumerListData(outputTopic);
+ log.info("Output topic data count: {}",
outputData.size());
+ return outputData.size() >= 15 &&
outputData.size() < 25;
+ } catch (Exception e) {
+ log.error("Error checking output topic data",
e);
+ return false;
+ }
+ });
+
+ try (AdminClient adminClient = createKafkaAdmin()) {
+ Map<String, TopicDescription> topicDescriptions =
+
adminClient.describeTopics(Arrays.asList(sourceTopic)).allTopicNames().get();
+ TopicDescription topicDescription =
topicDescriptions.get(sourceTopic);
+ int partitionCount = topicDescription.partitions().size();
+ log.info("Current partition count for topic {}: {}", sourceTopic,
partitionCount);
+ Assertions.assertTrue(partitionCount >= 2, "Partition count should
be at least 2");
+ }
+
+ log.info("Dynamic partition discovery test completed successfully");
+ }
+
+ // ------------------------------ restore --------------------------------
+ // ----------------------------- EARLIEST MODE
-----------------------------
+ @TestTemplate
+ @DisabledOnContainer(
+ type = {EngineType.SPARK, EngineType.FLINK},
+ value = {})
+ public void testSourceKafkaRestoreWithEarliestMode(TestContainer container)
+ throws IOException, InterruptedException {
+
+ final String sourceTopic = "test_topic_restore_earliest";
+ final String sinkTopic = "test_topic_restore_earliest_output";
+ final String payload = "Seatunnel Restore Test Data";
+ final String jobId = "18696753645408";
+
+ // Write 20 initial records with unique keys (avoid any potential
dedup logic
+ // elsewhere).
+ for (int i = 0; i < 20; i++) {
+ producer.send(
+ new ProducerRecord<>(sourceTopic, ("key_" + i).getBytes(),
payload.getBytes()));
+ }
+ producer.flush();
+
+ // Capture source end offset (LEO) on partition 0 before starting the
job.
+ long srcEndBeforeStart = endOffsetOnP0(sourceTopic);
+
+ // Start the first streaming job asynchronously.
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.executeJob(
+
"/kafka/kafkasource_restore_with_earliest_mode.conf", jobId);
+ } catch (Exception e) {
+ log.error("First job execution exception", e);
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Warm up (simple delay).
+ Awaitility.await().pollDelay(5, SECONDS).atMost(1, MINUTES).until(()
-> true);
+
+ // Produce 10 additional records after the job starts.
+ for (int i = 0; i < 10; i++) {
+ producer.send(
+ new ProducerRecord<>(
+ sourceTopic,
+ ("key_additional_" + i).getBytes(),
+ (payload + "_additional").getBytes()));
+ }
+ producer.flush();
+
+ // In earliest mode, first run should consume at least initial 20 +
additional
+ // 10.
+ final long expectedSinkAfterFirstRun = srcEndBeforeStart + 10;
+ Awaitility.await()
+ .pollInterval(2, SECONDS)
+ .atMost(2, MINUTES)
+ .until(() -> visibleCountOnP0(sinkTopic) ==
expectedSinkAfterFirstRun);
+
+ // Savepoint the running job (so restore should continue from this
position).
+ container.savepointJob(jobId);
+
+ // Append 15 records after savepoint, used to validate restore
progress.
+ for (int i = 0; i < 15; i++) {
+ producer.send(
+ new ProducerRecord<>(
+ sourceTopic,
+ ("key_restore_" + i).getBytes(),
+ (payload + "_restore").getBytes()));
+ }
+ producer.flush();
+
+ // Source end offset should move forward by at least 25 (10 + 15) from
the
+ // captured point.
+ long srcEndAfterAll = endOffsetOnP0(sourceTopic);
+ Assertions.assertTrue(
+ srcEndAfterAll == srcEndBeforeStart + 25,
+ "Final end offset should advance by at least 25");
+
+ // Restore the job from the savepoint asynchronously.
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.restoreJob(
+
"/kafka/kafkasource_restore_with_earliest_mode.conf", jobId);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // After restore, sink should advance by the 15 newly produced records
at
+ // minimum.
+ Awaitility.await()
+ .pollDelay(3, SECONDS)
+ .pollInterval(2, SECONDS)
+ .atMost(5, MINUTES)
+ .until(() -> visibleCountOnP0(sinkTopic) ==
expectedSinkAfterFirstRun + 15);
+ }
+
+ // ------------------------------ LATEST MODE
------------------------------
+
+ @TestTemplate
+ @DisabledOnContainer(
+ type = {EngineType.SPARK, EngineType.FLINK},
+ value = {})
+ public void testSourceKafkaRestoreWithLatestMode(TestContainer container)
+ throws IOException, InterruptedException {
+
+ final String sourceTopic = "test_topic_restore_latest";
+ final String sinkTopic = "test_topic_restore_latest_output";
+ final String payload = "Seatunnel Restore Test Data Latest";
+ final String jobId = "18696753645410";
+
+ // Write 20 initial records before starting the job.
+ for (int i = 0; i < 20; i++) {
+ producer.send(
+ new ProducerRecord<>(sourceTopic, ("key_" + i).getBytes(),
payload.getBytes()));
+ }
+ producer.flush();
+
+ long srcEndBeforeStart = endOffsetOnP0(sourceTopic);
+
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.executeJob(
+
"/kafka/kafkasource_restore_with_latest_mode.conf", jobId);
+ } catch (Exception e) {
+ log.error("First job execution exception", e);
+ throw new RuntimeException(e);
+ }
+ });
+
+ Awaitility.await().pollDelay(5, SECONDS).atMost(1, MINUTES).until(()
-> true);
+
+ // Produce 10 records after job start; latest mode should consume only
these 10
+ // initially.
+ for (int i = 0; i < 10; i++) {
+ producer.send(
+ new ProducerRecord<>(
+ sourceTopic,
+ ("key_additional_" + i).getBytes(),
+ (payload + "_additional").getBytes()));
+ }
+ producer.flush();
+
+ final long expectedSinkAfterFirstRun = 10;
+ Awaitility.await()
+ .pollInterval(2, SECONDS)
+ .atMost(2, MINUTES)
+ .until(() -> visibleCountOnP0(sinkTopic) ==
expectedSinkAfterFirstRun);
+
+ container.savepointJob(jobId);
+
+ // Append 15 more records after savepoint.
+ for (int i = 0; i < 15; i++) {
+ producer.send(
+ new ProducerRecord<>(
+ sourceTopic,
+ ("key_restore_" + i).getBytes(),
+ (payload + "_restore").getBytes()));
+ }
+ producer.flush();
+
+ long srcEndAfterAll = endOffsetOnP0(sourceTopic);
+ Assertions.assertTrue(
+ srcEndAfterAll == srcEndBeforeStart + 25,
+ "Final end offset should advance by at least 25");
+
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.restoreJob(
+
"/kafka/kafkasource_restore_with_latest_mode.conf", jobId);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ Awaitility.await()
+ .pollDelay(3, SECONDS)
+ .pollInterval(2, SECONDS)
+ .atMost(5, MINUTES)
+ .until(() -> visibleCountOnP0(sinkTopic) ==
expectedSinkAfterFirstRun + 15);
+ }
+
+ // ---------------------------- TIMESTAMP MODE
-----------------------------
+
+ @TestTemplate
+ @DisabledOnContainer(
+ type = {EngineType.SPARK, EngineType.FLINK},
+ value = {})
+ public void testSourceKafkaRestoreWithTimestampMode(TestContainer
container)
+ throws IOException, InterruptedException {
+
+ final String sourceTopic = "test_topic_restore_timestamp";
+ final String sinkTopic = "test_topic_restore_timestamp_output";
+ final String payload = "Seatunnel Restore Test Data Timestamp";
+ final String jobId = "18696753645411";
+
+ for (int i = 0; i < 20; i++) {
+ producer.send(
+ new ProducerRecord<>(sourceTopic, ("key_" + i).getBytes(),
payload.getBytes()));
+ }
+ producer.flush();
+
+ long srcEndBeforeStart = endOffsetOnP0(sourceTopic);
+
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.executeJob(
+
"/kafka/kafkasource_restore_with_timestamp_mode.conf", jobId);
+ } catch (Exception e) {
+ log.error("First job execution exception", e);
+ throw new RuntimeException(e);
+ }
+ });
+
+ Awaitility.await().pollDelay(5, SECONDS).atMost(1, MINUTES).until(()
-> true);
+
+ // Produce 10 records after job start.
+ for (int i = 0; i < 10; i++) {
+ producer.send(
+ new ProducerRecord<>(
+ sourceTopic,
+ ("key_additional_" + i).getBytes(),
+ (payload + "_additional").getBytes()));
+ }
+ producer.flush();
+
+ // Keep original semantics: expected sink count depends on
timestamp-based start
+ // config.
+ final long expectedSinkAfterFirstRun = srcEndBeforeStart + 10;
+ Awaitility.await()
+ .pollInterval(2, SECONDS)
+ .atMost(2, MINUTES)
+ .until(() -> visibleCountOnP0(sinkTopic) ==
expectedSinkAfterFirstRun);
+
+ container.savepointJob(jobId);
+
+ // Append 15 more records after savepoint.
+ for (int i = 0; i < 15; i++) {
+ producer.send(
+ new ProducerRecord<>(
+ sourceTopic,
+ ("key_restore_" + i).getBytes(),
+ (payload + "_restore").getBytes()));
+ }
+ producer.flush();
+
+ long srcEndAfterAll = endOffsetOnP0(sourceTopic);
+ Assertions.assertTrue(
+ srcEndAfterAll == srcEndBeforeStart + 25,
+ "Final end offset should advance by at least 25");
+
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.restoreJob(
+
"/kafka/kafkasource_restore_with_timestamp_mode.conf", jobId);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ Awaitility.await()
+ .pollDelay(3, SECONDS)
+ .pollInterval(2, SECONDS)
+ .atMost(5, MINUTES)
+ .until(() -> visibleCountOnP0(sinkTopic) ==
expectedSinkAfterFirstRun + 15);
+ }
+
+ // ------------------------- SPECIFIC OFFSETS MODE
-------------------------
+
+ @TestTemplate
+ @DisabledOnContainer(
+ type = {EngineType.SPARK, EngineType.FLINK},
+ value = {})
+ public void testSourceKafkaRestoreWithSpecificOffsetsMode(TestContainer
container)
+ throws IOException, InterruptedException {
+
+ final String sourceTopic = "test_topic_restore_specific_offsets";
+ final String sinkTopic = "test_topic_restore_specific_offsets_output";
+ final String payload = "Seatunnel Restore Test Data Specific Offsets";
+ final String jobId = "18696753645412";
+
+ for (int i = 0; i < 20; i++) {
+ producer.send(
+ new ProducerRecord<>(sourceTopic, ("key_" + i).getBytes(),
payload.getBytes()));
+ }
+ producer.flush();
+
+ long srcEndBeforeStart = endOffsetOnP0(sourceTopic);
+
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.executeJob(
+
"/kafka/kafkasource_restore_with_specific_offsets_mode.conf",
+ jobId);
+ } catch (Exception e) {
+ log.error("First job execution exception", e);
+ throw new RuntimeException(e);
+ }
+ });
+
+ Awaitility.await().pollDelay(5, SECONDS).atMost(1, MINUTES).until(()
-> true);
+
+ // Produce 10 records after job start.
+ for (int i = 0; i < 10; i++) {
+ producer.send(
+ new ProducerRecord<>(
+ sourceTopic,
+ ("key_additional_" + i).getBytes(),
+ (payload + "_additional").getBytes()));
+ }
+ producer.flush();
+
+ // Keep original semantics: expected sink count depends on explicit
offset
+ // config. -> 11
+ final long expectedSinkAfterFirstRun = srcEndBeforeStart + 10;
+ Awaitility.await()
+ .pollInterval(2, SECONDS)
+ .atMost(2, MINUTES)
+ .until(() -> visibleCountOnP0(sinkTopic) ==
expectedSinkAfterFirstRun - 11);
+
+ container.savepointJob(jobId);
+
+ // Append 15 more records after savepoint.
+ for (int i = 0; i < 15; i++) {
+ producer.send(
+ new ProducerRecord<>(
+ sourceTopic,
+ ("key_restore_" + i).getBytes(),
+ (payload + "_restore").getBytes()));
+ }
+ producer.flush();
+
+ long srcEndAfterAll = endOffsetOnP0(sourceTopic);
+ Assertions.assertTrue(
+ srcEndAfterAll == srcEndBeforeStart + 25,
+ "Final end offset should advance by at least 25");
+
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.restoreJob(
+
"/kafka/kafkasource_restore_with_specific_offsets_mode.conf",
+ jobId);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ Awaitility.await()
+ .pollDelay(3, SECONDS)
+ .pollInterval(2, SECONDS)
+ .atMost(5, MINUTES)
+ .until(() -> visibleCountOnP0(sinkTopic) ==
expectedSinkAfterFirstRun + 15 - 11);
+ }
+
+ /**
+ * Get visible record count on partition-0: endOffset - beginningOffset
(exclusive upper bound).
+ */
+ private long visibleCountOnP0(String topic) {
+ try (KafkaConsumer<String, String> c = new
KafkaConsumer<>(kafkaConsumerConfig())) {
+ TopicPartition tp0 = new TopicPartition(topic, 0);
+ c.assign(Collections.singletonList(tp0));
+ long begin =
c.beginningOffsets(Collections.singletonList(tp0)).get(tp0);
+ long end = c.endOffsets(Collections.singletonList(tp0)).get(tp0);
+ return end - begin;
+ }
+ }
+
+ /** Get the current end offset (LEO) on partition-0. */
+ private long endOffsetOnP0(String topic) {
+ try (KafkaConsumer<String, String> c = new
KafkaConsumer<>(kafkaConsumerConfig())) {
+ TopicPartition tp0 = new TopicPartition(topic, 0);
+ c.assign(Collections.singletonList(tp0));
+ return c.endOffsets(Collections.singletonList(tp0)).get(tp0);
+ }
+ }
+
@TestTemplate
public void testSourceKafkaWithEndTimestamp(TestContainer container)
throws IOException, InterruptedException {
@@ -973,10 +1443,10 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
});
// wait for data written to kafka
Long finalEndOffset = endOffset;
- given().pollDelay(30, TimeUnit.SECONDS)
- .pollInterval(5, TimeUnit.SECONDS)
+ given().pollDelay(30, SECONDS)
+ .pollInterval(5, SECONDS)
.await()
- .atMost(5, TimeUnit.MINUTES)
+ .atMost(5, MINUTES)
.untilAsserted(
() ->
Assertions.assertTrue(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_dynamic_partition_discovery.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_dynamic_partition_discovery.conf
new file mode 100644
index 0000000000..b1de6df0fa
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_dynamic_partition_discovery.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ job.mode = "STREAMING"
+ parallelism = 2
+ checkpoint.interval = 5000
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_dynamic_partition"
+ plugin_output = "kafka_table"
+ start_mode = latest
+ partition-discovery.interval-millis = 5000
+ consumer.group = "seatunnel-dynamic-partition-test-group"
+ format = json
+ schema = {
+ fields {
+ id = bigint
+ message = string
+ timestamp = bigint
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_dynamic_partition_output"
+ plugin_input = "kafka_table"
+ format = json
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_earliest_mode.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_earliest_mode.conf
new file mode 100644
index 0000000000..ac4c7d9117
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_earliest_mode.conf
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ checkpoint.timeout = 60000
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_restore_earliest"
+ format = text
+ start_mode = earliest
+ consumer.group = "test_restore_group"
+ discovery.interval.millis = 10000
+ }
+}
+
+transform {
+}
+
+sink {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_restore_earliest_output"
+ format = text
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_latest_mode.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_latest_mode.conf
new file mode 100644
index 0000000000..00b7b38c0a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_latest_mode.conf
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ checkpoint.timeout = 60000
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_restore_latest"
+ format = text
+ start_mode = latest
+ consumer.group = "test_restore_latest"
+ discovery.interval.millis = 10000
+ }
+}
+
+transform {
+}
+
+sink {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_restore_latest_output"
+ format = text
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_specific_offsets_mode.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_specific_offsets_mode.conf
new file mode 100644
index 0000000000..bc929bb24b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_specific_offsets_mode.conf
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ checkpoint.timeout = 60000
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_restore_specific_offsets"
+ format = text
+ start_mode = specific_offsets
+ start_mode.offsets = {
+ test_topic_restore_specific_offsets-0 = 11
+ }
+ consumer.group = "test_restore_specific_offsets"
+ discovery.interval.millis = 10000
+ }
+}
+
+transform {
+}
+
+sink {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_restore_specific_offsets_output"
+ format = text
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_timestamp_mode.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_timestamp_mode.conf
new file mode 100644
index 0000000000..f42dae81f5
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_restore_with_timestamp_mode.conf
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ checkpoint.timeout = 60000
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_restore_timestamp"
+ format = text
+ start_mode = timestamp
+ start_mode.timestamp = 1738395840000
+ consumer.group = "test_restore_timestamp"
+ discovery.interval.millis = 10000
+ }
+}
+
+transform {
+}
+
+sink {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_restore_timestamp_output"
+ format = text
+ }
+}