This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 999cfd606 [Improve][Connector-V2-kafka] Support for dynamic discover
topic & partition in streaming mode (#3125)
999cfd606 is described below
commit 999cfd6069d128b26f19d3ed268a90695f7f567b
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Tue Nov 22 09:35:45 2022 +0800
[Improve][Connector-V2-kafka] Support for dynamic discover topic &
partition in streaming mode (#3125)
[Improve][Connector-V2-kafka] Support for dynamic discover topic &
partition in streaming mode
Co-authored-by: zhouyao <[email protected]>
Co-authored-by: Eric <[email protected]>
---
docs/en/connector-v2/source/kafka.md | 36 +++++++-----
.../connectors/seatunnel/kafka/config/Config.java | 8 +++
.../seatunnel/kafka/source/KafkaSource.java | 10 +++-
.../seatunnel/kafka/source/KafkaSourceFactory.java | 11 ++--
.../seatunnel/kafka/source/KafkaSourceReader.java | 25 +++++---
.../kafka/source/KafkaSourceSplitEnumerator.java | 66 +++++++++++++++++++---
6 files changed, 119 insertions(+), 37 deletions(-)
diff --git a/docs/en/connector-v2/source/kafka.md
b/docs/en/connector-v2/source/kafka.md
index a0c660b26..80a1cba60 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -17,20 +17,21 @@ Source connector for Apache Kafka.
## Options
-| name | type | required | default value |
-|----------------------|---------| -------- |--------------------------|
-| topic | String | yes | - |
-| bootstrap.servers | String | yes | - |
-| pattern | Boolean | no | false |
-| consumer.group | String | no | SeaTunnel-Consumer-Group |
-| commit_on_checkpoint | Boolean | no | true |
-| kafka.* | String | no | - |
-| common-options | config | no | - |
-| schema | | no | - |
-| format | String | no | json |
-| start_mode | String | no | group_offsets |
-| start_mode.offsets | | no | |
-| start_mode.timestamp | Long | no | |
+| name | type | required | default value
|
+|-------------------------------------|---------| --------
|--------------------------|
+| topic | String | yes | -
|
+| bootstrap.servers | String | yes | -
|
+| pattern | Boolean | no | false
|
+| consumer.group | String | no |
SeaTunnel-Consumer-Group |
+| commit_on_checkpoint | Boolean | no | true
|
+| kafka.* | String | no | -
|
+| common-options | config | no | -
|
+| schema | | no | -
|
+| format | String | no | json
|
+| start_mode | String | no | group_offsets
|
+| start_mode.offsets | | no |
|
+| start_mode.timestamp | Long | no |
|
+| partition-discovery.interval-millis | long | no | -1
|
### topic [string]
@@ -52,6 +53,10 @@ If `pattern` is set to `true`,the regular expression for a
pattern of topic name
If true the consumer's offset will be periodically committed in the background.
+## partition-discovery.interval-millis [long]
+
+The interval for dynamically discovering topics and partitions.
+
### kafka.* [string]
In addition to the above necessary parameters that must be specified by the
`Kafka consumer` client, users can also specify multiple `consumer` client
non-mandatory parameters, covering [all consumer parameters specified in the
official Kafka
document](https://kafka.apache.org/documentation.html#consumerconfigs).
@@ -147,4 +152,5 @@ source {
### Next Version
-- [Improve] Support setting read starting offset or time at startup config
([3157](https://github.com/apache/incubator-seatunnel/pull/3157))
\ No newline at end of file
+- [Improve] Support setting read starting offset or time at startup config
([3157](https://github.com/apache/incubator-seatunnel/pull/3157))
+- [Improve] Support for dynamic discover topic & partition in streaming mode
([3125](https://github.com/apache/incubator-seatunnel/pull/3125))
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 6030bfa39..d4f982bca 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -122,4 +122,12 @@ public class Config {
.noDefaultValue()
.withDescription("The offset required for consumption mode to be
specific_offsets.");
+ /**
+ * Configuration key to define the consumer's partition discovery
interval, in milliseconds.
+ */
+ public static final Option<Long> KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS =
Options.key("partition-discovery.interval-millis")
+ .longType()
+ .defaultValue(-1L)
+ .withDescription("The interval for dynamically discovering topics and
partitions.");
+
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 79d1328b4..01ab14cc6 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -24,6 +24,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFA
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SCHEMA;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE;
@@ -72,6 +73,7 @@ public class KafkaSource implements
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
private SeaTunnelRowType typeInfo;
private JobContext jobContext;
+ private long discoveryIntervalMillis =
KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.defaultValue();
@Override
public Boundedness getBoundedness() {
@@ -140,6 +142,10 @@ public class KafkaSource implements
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
}
}
+ if (config.hasPath(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key())) {
+ this.discoveryIntervalMillis =
config.getLong(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key());
+ }
+
TypesafeConfigUtils.extractSubConfig(config, "kafka.",
false).entrySet().forEach(e -> {
this.metadata.getProperties().put(e.getKey(),
String.valueOf(e.getValue().unwrapped()));
});
@@ -159,12 +165,12 @@ public class KafkaSource implements
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
@Override
public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState>
createEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit>
enumeratorContext) throws Exception {
- return new KafkaSourceSplitEnumerator(this.metadata,
enumeratorContext);
+ return new KafkaSourceSplitEnumerator(this.metadata,
enumeratorContext, discoveryIntervalMillis);
}
@Override
public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState>
restoreEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit>
enumeratorContext, KafkaSourceState checkpointState) throws Exception {
- return new KafkaSourceSplitEnumerator(this.metadata,
enumeratorContext, checkpointState);
+ return new KafkaSourceSplitEnumerator(this.metadata,
enumeratorContext, checkpointState, discoveryIntervalMillis);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
index 222a48bde..747a3542f 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
@@ -37,10 +37,11 @@ public class KafkaSourceFactory implements
TableSourceFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(Config.TOPIC, Config.BOOTSTRAP_SERVERS)
- .optional(Config.PATTERN, Config.CONSUMER_GROUP,
Config.COMMIT_ON_CHECKPOINT, Config.KAFKA_CONFIG_PREFIX, Config.SCHEMA,
Config.FORMAT)
- .conditional(Condition.of(Config.START_MODE,
StartMode.TIMESTAMP), Config.START_MODE_TIMESTAMP)
- .conditional(Condition.of(Config.START_MODE,
StartMode.SPECIFIC_OFFSETS), Config.START_MODE_OFFSETS)
- .build();
+ .required(Config.TOPIC, Config.BOOTSTRAP_SERVERS)
+ .optional(Config.PATTERN, Config.CONSUMER_GROUP,
Config.COMMIT_ON_CHECKPOINT, Config.KAFKA_CONFIG_PREFIX, Config.SCHEMA,
+ Config.FORMAT, Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS)
+ .conditional(Condition.of(Config.START_MODE, StartMode.TIMESTAMP),
Config.START_MODE_TIMESTAMP)
+ .conditional(Condition.of(Config.START_MODE,
StartMode.SPECIFIC_OFFSETS), Config.START_MODE_OFFSETS)
+ .build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 24252ce68..7eb68237f 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -41,9 +41,9 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
@Slf4j
@@ -56,11 +56,14 @@ public class KafkaSourceReader implements
SourceReader<SeaTunnelRow, KafkaSource
private final ConsumerMetadata metadata;
private final Set<KafkaSourceSplit> sourceSplits;
private final Map<Long, Map<TopicPartition, Long>> checkpointOffsetMap;
- private final ConcurrentMap<TopicPartition, KafkaSourceSplit>
sourceSplitMap;
private final Map<TopicPartition, KafkaConsumerThread> consumerThreadMap;
private final ExecutorService executorService;
private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
+ private final LinkedBlockingQueue<KafkaSourceSplit> pendingPartitionsQueue;
+
+ private volatile boolean running = false;
+
KafkaSourceReader(ConsumerMetadata metadata,
DeserializationSchema<SeaTunnelRow>
deserializationSchema,
SourceReader.Context context) {
@@ -69,10 +72,10 @@ public class KafkaSourceReader implements
SourceReader<SeaTunnelRow, KafkaSource
this.sourceSplits = new HashSet<>();
this.deserializationSchema = deserializationSchema;
this.consumerThreadMap = new ConcurrentHashMap<>();
- this.sourceSplitMap = new ConcurrentHashMap<>();
this.checkpointOffsetMap = new ConcurrentHashMap<>();
this.executorService = Executors.newCachedThreadPool(
r -> new Thread(r, "Kafka Source Data Consumer"));
+ pendingPartitionsQueue = new LinkedBlockingQueue<>();
}
@Override
@@ -88,10 +91,14 @@ public class KafkaSourceReader implements
SourceReader<SeaTunnelRow, KafkaSource
@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
- if (sourceSplitMap.isEmpty()) {
+ if (!running) {
Thread.sleep(THREAD_WAIT_TIME);
return;
}
+
+ while (pendingPartitionsQueue.size() != 0) {
+ sourceSplits.add(pendingPartitionsQueue.poll());
+ }
sourceSplits.forEach(sourceSplit ->
consumerThreadMap.computeIfAbsent(sourceSplit.getTopicPartition(), s -> {
KafkaConsumerThread thread = new KafkaConsumerThread(metadata);
executorService.submit(thread);
@@ -157,9 +164,13 @@ public class KafkaSourceReader implements
SourceReader<SeaTunnelRow, KafkaSource
@Override
public void addSplits(List<KafkaSourceSplit> splits) {
- sourceSplits.addAll(splits);
- sourceSplits.forEach(split -> {
- sourceSplitMap.put(split.getTopicPartition(), split);
+ running = true;
+ splits.forEach(s -> {
+ try {
+ pendingPartitionsQueue.put(s);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
});
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 59c1aba7f..71760b96b 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -38,6 +38,10 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -48,10 +52,13 @@ public class KafkaSourceSplitEnumerator implements
SourceSplitEnumerator<KafkaSo
private final ConsumerMetadata metadata;
private final Context<KafkaSourceSplit> context;
+ private long discoveryIntervalMillis;
private AdminClient adminClient;
private Map<TopicPartition, KafkaSourceSplit> pendingSplit;
private final Map<TopicPartition, KafkaSourceSplit> assignedSplit;
+ private ScheduledExecutorService executor;
+ private ScheduledFuture scheduledFuture;
KafkaSourceSplitEnumerator(ConsumerMetadata metadata,
Context<KafkaSourceSplit> context) {
this.metadata = metadata;
@@ -65,20 +72,43 @@ public class KafkaSourceSplitEnumerator implements
SourceSplitEnumerator<KafkaSo
this(metadata, context);
}
+ KafkaSourceSplitEnumerator(ConsumerMetadata metadata,
Context<KafkaSourceSplit> context,
+ long discoveryIntervalMillis) {
+ this(metadata, context);
+ this.discoveryIntervalMillis = discoveryIntervalMillis;
+ }
+
+ KafkaSourceSplitEnumerator(ConsumerMetadata metadata,
Context<KafkaSourceSplit> context,
+ KafkaSourceState sourceState, long
discoveryIntervalMillis) {
+ this(metadata, context, sourceState);
+ this.discoveryIntervalMillis = discoveryIntervalMillis;
+ }
+
@Override
public void open() {
this.adminClient = initAdminClient(this.metadata.getProperties());
+ if (discoveryIntervalMillis > 0) {
+ this.executor = Executors.newScheduledThreadPool(1, runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setDaemon(true);
+ thread.setName("kafka-partition-dynamic-discovery");
+ return thread;
+ });
+ this.scheduledFuture = executor.scheduleWithFixedDelay(
+ () -> {
+ try {
+ discoverySplits();
+ } catch (Exception e) {
+ log.error("Dynamic discovery failure:", e);
+ }
+ }, discoveryIntervalMillis, discoveryIntervalMillis,
TimeUnit.MILLISECONDS
+ );
+ }
}
@Override
public void run() throws ExecutionException, InterruptedException {
- getTopicInfo().forEach(split -> {
- if (!assignedSplit.containsKey(split.getTopicPartition())) {
- if (!pendingSplit.containsKey(split.getTopicPartition())) {
- pendingSplit.put(split.getTopicPartition(), split);
- }
- }
- });
+ fetchPendingPartitionSplit();
setPartitionStartOffset();
assignSplit();
}
@@ -117,6 +147,12 @@ public class KafkaSourceSplitEnumerator implements
SourceSplitEnumerator<KafkaSo
if (this.adminClient != null) {
adminClient.close();
}
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ }
}
@Override
@@ -197,7 +233,7 @@ public class KafkaSourceSplitEnumerator implements
SourceSplitEnumerator<KafkaSo
}).collect(Collectors.toSet());
}
- private void assignSplit() {
+ private synchronized void assignSplit() {
Map<Integer, List<KafkaSourceSplit>> readySplit = new
HashMap<>(Common.COLLECTION_SIZE);
for (int taskID = 0; taskID < context.currentParallelism(); taskID++) {
readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
@@ -260,4 +296,18 @@ public class KafkaSourceSplitEnumerator implements
SourceSplitEnumerator<KafkaSo
.get();
}
+ private void discoverySplits() throws ExecutionException,
InterruptedException {
+ fetchPendingPartitionSplit();
+ assignSplit();
+ }
+
+ private void fetchPendingPartitionSplit() throws ExecutionException,
InterruptedException {
+ getTopicInfo().forEach(split -> {
+ if (!assignedSplit.containsKey(split.getTopicPartition())) {
+ if (!pendingSplit.containsKey(split.getTopicPartition())) {
+ pendingSplit.put(split.getTopicPartition(), split);
+ }
+ }
+ });
+ }
}