This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a0eeeb9b62 [Fix][Kafka] Fix in kafka streaming mode can not read
incremental data (#7871)
a0eeeb9b62 is described below
commit a0eeeb9b6234ce842f25395e6f5524eef53fb1f5
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Sat Nov 16 11:17:37 2024 +0800
[Fix][Kafka] Fix in kafka streaming mode can not read incremental data
(#7871)
---
docs/en/connector-v2/source/kafka.md | 1 +
.../seatunnel/kafka/source/KafkaSource.java | 11 +-
.../kafka/source/KafkaSourceSplitEnumerator.java | 34 ++++-
.../admin/KafkaSourceSplitEnumeratorTest.java | 156 +++++++++++++++++++++
.../source/KafkaSourceSplitEnumeratorTest.java | 74 ----------
5 files changed, 194 insertions(+), 82 deletions(-)
diff --git a/docs/en/connector-v2/source/kafka.md
b/docs/en/connector-v2/source/kafka.md
index 90c183c2c1..c0ed66186b 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -59,6 +59,7 @@ They can be downloaded via install-plugin.sh or from the
Maven central repositor
### Simple
> This example reads the data of kafka's topic_1, topic_2, topic_3 and prints
> it to the client.And if you have not yet installed and deployed SeaTunnel,
> you need to follow the instructions in Install SeaTunnel to install and
> deploy SeaTunnel. And if you have not yet installed and deployed SeaTunnel,
> you need to follow the instructions in [Install
> SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy
> SeaTunnel. And then follow the instructions in [Quick Start With SeaTunn
> [...]
+> In batch mode, during the enumerator sharding process, it will fetch the
latest offset for each partition and use it as the stopping point.
```hocon
# Defining the runtime environment
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 5688fde5b6..0ff99807f2 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -104,7 +104,11 @@ public class KafkaSource
@Override
public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState>
createEnumerator(
SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext)
{
- return new KafkaSourceSplitEnumerator(kafkaSourceConfig,
enumeratorContext, null);
+ return new KafkaSourceSplitEnumerator(
+ kafkaSourceConfig,
+ enumeratorContext,
+ null,
+ getBoundedness() == Boundedness.UNBOUNDED);
}
@Override
@@ -112,7 +116,10 @@ public class KafkaSource
SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext,
KafkaSourceState checkpointState) {
return new KafkaSourceSplitEnumerator(
- kafkaSourceConfig, enumeratorContext, checkpointState);
+ kafkaSourceConfig,
+ enumeratorContext,
+ checkpointState,
+ getBoundedness() == Boundedness.UNBOUNDED);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 06ce4565c3..6d6c1ca96f 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -70,10 +70,13 @@ public class KafkaSourceSplitEnumerator
private final Map<String, TablePath> topicMappingTablePathMap = new
HashMap<>();
+ private boolean isStreamingMode;
+
KafkaSourceSplitEnumerator(
KafkaSourceConfig kafkaSourceConfig,
Context<KafkaSourceSplit> context,
- KafkaSourceState sourceState) {
+ KafkaSourceState sourceState,
+ boolean isStreamingMode) {
this.kafkaSourceConfig = kafkaSourceConfig;
this.tablePathMetadataMap = kafkaSourceConfig.getMapMetadata();
this.context = context;
@@ -81,10 +84,11 @@ public class KafkaSourceSplitEnumerator
this.pendingSplit = new HashMap<>();
this.adminClient =
initAdminClient(this.kafkaSourceConfig.getProperties());
this.discoveryIntervalMillis =
kafkaSourceConfig.getDiscoveryIntervalMillis();
+ this.isStreamingMode = isStreamingMode;
}
@VisibleForTesting
- protected KafkaSourceSplitEnumerator(
+ public KafkaSourceSplitEnumerator(
AdminClient adminClient,
Map<TopicPartition, KafkaSourceSplit> pendingSplit,
Map<TopicPartition, KafkaSourceSplit> assignedSplit) {
@@ -97,6 +101,16 @@ public class KafkaSourceSplitEnumerator
this.assignedSplit = assignedSplit;
}
+ @VisibleForTesting
+ public KafkaSourceSplitEnumerator(
+ AdminClient adminClient,
+ Map<TopicPartition, KafkaSourceSplit> pendingSplit,
+ Map<TopicPartition, KafkaSourceSplit> assignedSplit,
+ boolean isStreamingMode) {
+ this(adminClient, pendingSplit, assignedSplit);
+ this.isStreamingMode = isStreamingMode;
+ }
+
@Override
public void open() {
if (discoveryIntervalMillis > 0) {
@@ -204,7 +218,7 @@ public class KafkaSourceSplitEnumerator
private Map<TopicPartition, ? extends KafkaSourceSplit> convertToNextSplit(
List<KafkaSourceSplit> splits) {
try {
- Map<TopicPartition, Long> listOffsets =
+ Map<TopicPartition, Long> latestOffsets =
listOffsets(
splits.stream()
.map(KafkaSourceSplit::getTopicPartition)
@@ -214,7 +228,10 @@ public class KafkaSourceSplitEnumerator
splits.forEach(
split -> {
split.setStartOffset(split.getEndOffset() + 1);
-
split.setEndOffset(listOffsets.get(split.getTopicPartition()));
+ split.setEndOffset(
+ isStreamingMode
+ ? Long.MAX_VALUE
+ :
latestOffsets.get(split.getTopicPartition()));
});
return splits.stream()
.collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition, split -> split));
@@ -305,7 +322,10 @@ public class KafkaSourceSplitEnumerator
// Obtain the corresponding topic TablePath from
kafka topic
TablePath tablePath =
topicMappingTablePathMap.get(partition.topic());
KafkaSourceSplit split = new
KafkaSourceSplit(tablePath, partition);
-
split.setEndOffset(latestOffsets.get(split.getTopicPartition()));
+ split.setEndOffset(
+ isStreamingMode
+ ? Long.MAX_VALUE
+ : latestOffsets.get(partition));
return split;
})
.collect(Collectors.toSet());
@@ -344,6 +364,7 @@ public class KafkaSourceSplitEnumerator
private Map<TopicPartition, Long> listOffsets(
Collection<TopicPartition> partitions, OffsetSpec offsetSpec)
throws ExecutionException, InterruptedException {
+
Map<TopicPartition, OffsetSpec> topicPartitionOffsets =
partitions.stream()
.collect(Collectors.toMap(partition -> partition, __
-> offsetSpec));
@@ -391,7 +412,8 @@ public class KafkaSourceSplitEnumerator
assignSplit();
}
- private void fetchPendingPartitionSplit() throws ExecutionException,
InterruptedException {
+ @VisibleForTesting
+ public void fetchPendingPartitionSplit() throws ExecutionException,
InterruptedException {
getTopicInfo()
.forEach(
split -> {
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
new file mode 100644
index 0000000000..00e059ecfe
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+class KafkaSourceSplitEnumeratorTest {
+
+ AdminClient adminClient = Mockito.mock(KafkaAdminClient.class);
+ // prepare
+ TopicPartition partition = new TopicPartition("test", 0);
+
+ @BeforeEach
+ void init() {
+
+ Mockito.when(adminClient.listOffsets(Mockito.any(java.util.Map.class)))
+ .thenReturn(
+ new ListOffsetsResult(
+ new HashMap<
+ TopicPartition,
+
KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>>() {
+ {
+ put(
+ partition,
+ KafkaFuture.completedFuture(
+ new
ListOffsetsResult.ListOffsetsResultInfo(
+ 0, 0,
Optional.of(0))));
+ }
+ }));
+
Mockito.when(adminClient.describeTopics(Mockito.any(java.util.Collection.class)))
+ .thenReturn(
+ DescribeTopicsResult.ofTopicNames(
+ new HashMap<String,
KafkaFuture<TopicDescription>>() {
+ {
+ put(
+ partition.topic(),
+ KafkaFuture.completedFuture(
+ new TopicDescription(
+
partition.topic(),
+ false,
+
Collections.singletonList(
+ new
TopicPartitionInfo(
+
0,
+
null,
+
Collections
+
.emptyList(),
+
Collections
+
.emptyList())))));
+ }
+ }));
+ }
+
+ @Test
+ void addSplitsBack() {
+ // test
+ Map<TopicPartition, KafkaSourceSplit> assignedSplit =
+ new HashMap<TopicPartition, KafkaSourceSplit>() {
+ {
+ put(partition, new KafkaSourceSplit(null, partition));
+ }
+ };
+ Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
+ List<KafkaSourceSplit> splits = Arrays.asList(new
KafkaSourceSplit(null, partition));
+ KafkaSourceSplitEnumerator enumerator =
+ new KafkaSourceSplitEnumerator(adminClient, pendingSplit,
assignedSplit);
+ enumerator.addSplitsBack(splits, 1);
+ Assertions.assertTrue(pendingSplit.size() == splits.size());
+ Assertions.assertNull(assignedSplit.get(partition));
+ Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 0);
+ }
+
+ @Test
+ void addStreamingSplitsBack() {
+ // test
+ Map<TopicPartition, KafkaSourceSplit> assignedSplit =
+ new HashMap<TopicPartition, KafkaSourceSplit>() {
+ {
+ put(partition, new KafkaSourceSplit(null, partition));
+ }
+ };
+ Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
+ List<KafkaSourceSplit> splits =
+ Collections.singletonList(new KafkaSourceSplit(null,
partition));
+ KafkaSourceSplitEnumerator enumerator =
+ new KafkaSourceSplitEnumerator(adminClient, pendingSplit,
assignedSplit, true);
+ enumerator.addSplitsBack(splits, 1);
+ Assertions.assertEquals(pendingSplit.size(), splits.size());
+ Assertions.assertNull(assignedSplit.get(partition));
+ Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() ==
Long.MAX_VALUE);
+ }
+
+ @Test
+ void addStreamingSplits() throws ExecutionException, InterruptedException {
+ // test
+ Map<TopicPartition, KafkaSourceSplit> assignedSplit =
+ new HashMap<TopicPartition, KafkaSourceSplit>();
+ Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
+ List<KafkaSourceSplit> splits =
+ Collections.singletonList(new KafkaSourceSplit(null,
partition));
+ KafkaSourceSplitEnumerator enumerator =
+ new KafkaSourceSplitEnumerator(adminClient, pendingSplit,
assignedSplit, true);
+ enumerator.fetchPendingPartitionSplit();
+ Assertions.assertEquals(pendingSplit.size(), splits.size());
+ Assertions.assertNotNull(pendingSplit.get(partition));
+ Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() ==
Long.MAX_VALUE);
+ }
+
+ @Test
+ void addplits() throws ExecutionException, InterruptedException {
+ // test
+ Map<TopicPartition, KafkaSourceSplit> assignedSplit =
+ new HashMap<TopicPartition, KafkaSourceSplit>();
+ Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
+ List<KafkaSourceSplit> splits =
+ Collections.singletonList(new KafkaSourceSplit(null,
partition));
+ KafkaSourceSplitEnumerator enumerator =
+ new KafkaSourceSplitEnumerator(adminClient, pendingSplit,
assignedSplit, false);
+ enumerator.fetchPendingPartitionSplit();
+ Assertions.assertEquals(pendingSplit.size(), splits.size());
+ Assertions.assertNotNull(pendingSplit.get(partition));
+ Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 0);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumeratorTest.java
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumeratorTest.java
deleted file mode 100644
index 6a8de812d3..0000000000
---
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumeratorTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.kafka.source;
-
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.KafkaAdminClient;
-import org.apache.kafka.clients.admin.ListOffsetsResult;
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.TopicPartition;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-class KafkaSourceSplitEnumeratorTest {
-
- @Test
- void addSplitsBack() {
- // prepare
- TopicPartition partition = new TopicPartition("test", 0);
-
- AdminClient adminClient = Mockito.mock(KafkaAdminClient.class);
- Mockito.when(adminClient.listOffsets(Mockito.any(java.util.Map.class)))
- .thenReturn(
- new ListOffsetsResult(
- new HashMap<
- TopicPartition,
-
KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>>() {
- {
- put(
- partition,
- KafkaFuture.completedFuture(
- new
ListOffsetsResult.ListOffsetsResultInfo(
- 0, 0,
Optional.of(0))));
- }
- }));
-
- // test
- Map<TopicPartition, KafkaSourceSplit> assignedSplit =
- new HashMap<TopicPartition, KafkaSourceSplit>() {
- {
- put(partition, new KafkaSourceSplit(null, partition));
- }
- };
- Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
- List<KafkaSourceSplit> splits = Arrays.asList(new
KafkaSourceSplit(null, partition));
- KafkaSourceSplitEnumerator enumerator =
- new KafkaSourceSplitEnumerator(adminClient, pendingSplit,
assignedSplit);
- enumerator.addSplitsBack(splits, 1);
- Assertions.assertTrue(pendingSplit.size() == splits.size());
- Assertions.assertNull(assignedSplit.get(partition));
- }
-}