This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 10598b6aec [Improve][Kafka] kafka source refactored some reader read
logic (#6408)
10598b6aec is described below
commit 10598b6aecca27988fcda1762bd27cbd9896bdb3
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Thu Sep 5 10:22:42 2024 +0800
[Improve][Kafka] kafka source refactored some reader read logic (#6408)
---
.../common/utils/TemporaryClassLoaderContext.java | 53 +++
.../seatunnel/kafka/source/ConsumerMetadata.java | 1 -
.../kafka/source/KafkaConsumerThread.java | 111 -----
.../kafka/source/KafkaPartitionSplitReader.java | 458 +++++++++++++++++++++
.../seatunnel/kafka/source/KafkaRecordEmitter.java | 112 +++++
.../seatunnel/kafka/source/KafkaSource.java | 33 +-
.../seatunnel/kafka/source/KafkaSourceConfig.java | 3 +-
.../seatunnel/kafka/source/KafkaSourceReader.java | 343 +++++----------
.../kafka/source/KafkaSourceSplitEnumerator.java | 8 +-
.../kafka/source/KafkaSourceSplitState.java | 45 ++
.../source/fetch/KafkaSourceFetcherManager.java | 99 +++++
.../connector-kafka-e2e/pom.xml | 6 +
.../multiFormatIT/kafka_multi_source_to_pg.conf | 2 +-
13 files changed, 918 insertions(+), 356 deletions(-)
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/TemporaryClassLoaderContext.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/TemporaryClassLoaderContext.java
new file mode 100644
index 0000000000..d5ddd48106
--- /dev/null
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/TemporaryClassLoaderContext.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.common.utils;
+
+public final class TemporaryClassLoaderContext implements AutoCloseable {
+
+ /**
+ * Sets the context class loader to the given ClassLoader and returns a
resource that sets it
+ * back to the current context ClassLoader when the resource is closed.
+ *
+ * <pre>{@code
+ * try (TemporaryClassLoaderContext ignored =
TemporaryClassLoaderContext.of(classloader)) {
+ * // code that needs the context class loader
+ * }
+ * }</pre>
+ */
+ public static TemporaryClassLoaderContext of(ClassLoader cl) {
+ final Thread t = Thread.currentThread();
+ final ClassLoader original = t.getContextClassLoader();
+
+ t.setContextClassLoader(cl);
+
+ return new TemporaryClassLoaderContext(t, original);
+ }
+
+ private final Thread thread;
+
+ private final ClassLoader originalContextClassLoader;
+
+ private TemporaryClassLoaderContext(Thread thread, ClassLoader
originalContextClassLoader) {
+ this.thread = thread;
+ this.originalContextClassLoader = originalContextClassLoader;
+ }
+
+ @Override
+ public void close() {
+ thread.setContextClassLoader(originalContextClassLoader);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
index 8ce9dbb870..c129fbdc93 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
@@ -37,7 +37,6 @@ public class ConsumerMetadata implements Serializable {
private String topic;
private boolean isPattern = false;
private Properties properties;
- private String consumerGroup;
private StartMode startMode = StartMode.GROUP_OFFSETS;
private Map<TopicPartition, Long> specificStartOffsets;
private Long startOffsetsTimestamp;
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
deleted file mode 100644
index 99b6baeaf1..0000000000
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.kafka.source;
-
-import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
-import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-
-import java.util.Properties;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-
-public class KafkaConsumerThread implements Runnable {
-
- private final KafkaConsumer<byte[], byte[]> consumer;
- private static final String CLIENT_ID_PREFIX = "seatunnel";
- private final ConsumerMetadata metadata;
-
- private final LinkedBlockingQueue<Consumer<KafkaConsumer<byte[], byte[]>>>
tasks;
-
- public KafkaConsumerThread(KafkaSourceConfig kafkaSourceConfig,
ConsumerMetadata metadata) {
- this.metadata = metadata;
- this.tasks = new LinkedBlockingQueue<>();
- this.consumer =
- initConsumer(
- kafkaSourceConfig.getBootstrap(),
- metadata.getConsumerGroup(),
- kafkaSourceConfig.getProperties(),
- !kafkaSourceConfig.isCommitOnCheckpoint());
- }
-
- @Override
- public void run() {
- try {
- while (!Thread.currentThread().isInterrupted()) {
- try {
- Consumer<KafkaConsumer<byte[], byte[]>> task =
tasks.poll(1, TimeUnit.SECONDS);
- if (task != null) {
- task.accept(consumer);
- }
- } catch (Exception e) {
- throw new KafkaConnectorException(
- KafkaConnectorErrorCode.CONSUME_THREAD_RUN_ERROR,
e);
- }
- }
- } finally {
- try {
- if (consumer != null) {
- consumer.close();
- }
- } catch (Throwable t) {
- throw new
KafkaConnectorException(KafkaConnectorErrorCode.CONSUMER_CLOSE_FAILED, t);
- }
- }
- }
-
- public LinkedBlockingQueue<Consumer<KafkaConsumer<byte[], byte[]>>>
getTasks() {
- return tasks;
- }
-
- private KafkaConsumer<byte[], byte[]> initConsumer(
- String bootstrapServer,
- String consumerGroup,
- Properties properties,
- boolean autoCommit) {
- Properties props = new Properties();
- properties.forEach(
- (key, value) -> props.setProperty(String.valueOf(key),
String.valueOf(value)));
- props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
- props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServer);
- if (this.metadata.getProperties().get("client.id") == null) {
- props.setProperty(
- ConsumerConfig.CLIENT_ID_CONFIG,
- CLIENT_ID_PREFIX + "-consumer-" + this.hashCode());
- } else {
- props.setProperty(
- ConsumerConfig.CLIENT_ID_CONFIG,
- this.metadata.getProperties().get("client.id").toString());
- }
- props.setProperty(
- ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- ByteArrayDeserializer.class.getName());
- props.setProperty(
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- ByteArrayDeserializer.class.getName());
- props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
String.valueOf(autoCommit));
-
- // Disable auto create topics feature
- props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,
"false");
- return new KafkaConsumer<>(props);
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java
new file mode 100644
index 0000000000..8bca82999c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
+
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.common.utils.TemporaryClassLoaderContext;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitsAddition;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitsChange;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringJoiner;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class KafkaPartitionSplitReader
+ implements SplitReader<ConsumerRecord<byte[], byte[]>,
KafkaSourceSplit> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaPartitionSplitReader.class);
+
+ private static final long POLL_TIMEOUT = 10000L;
+ private static final String CLIENT_ID_PREFIX = "seatunnel";
+ private final KafkaSourceConfig kafkaSourceConfig;
+
+ private final KafkaConsumer<byte[], byte[]> consumer;
+
+ private final Map<TopicPartition, Long> stoppingOffsets;
+
+ private final String groupId;
+
+ private final Set<String> emptySplits = new HashSet<>();
+
+ public KafkaPartitionSplitReader(
+ KafkaSourceConfig kafkaSourceConfig, SourceReader.Context context)
{
+ this.kafkaSourceConfig = kafkaSourceConfig;
+ this.consumer = initConsumer(kafkaSourceConfig,
context.getIndexOfSubtask());
+ this.stoppingOffsets = new HashMap<>();
+ this.groupId =
+
kafkaSourceConfig.getProperties().getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+ }
+
+ @Override
+ public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws
IOException {
+ ConsumerRecords<byte[], byte[]> consumerRecords;
+ try {
+ consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
+ } catch (WakeupException | IllegalStateException e) {
+ // IllegalStateException will be thrown if the consumer is not
assigned any partitions.
+ // This happens if all assigned partitions are invalid or empty
(starting offset >=
+ // stopping offset). We just mark empty partitions as finished and
return an empty
+ // record container, and this consumer will be closed by
SplitFetcherManager.
+ KafkaPartitionSplitRecords recordsBySplits =
+ new KafkaPartitionSplitRecords(ConsumerRecords.empty());
+ markEmptySplitsAsFinished(recordsBySplits);
+ return recordsBySplits;
+ }
+ KafkaPartitionSplitRecords recordsBySplits =
+ new KafkaPartitionSplitRecords(consumerRecords);
+ List<TopicPartition> finishedPartitions = new ArrayList<>();
+ for (TopicPartition tp : consumerRecords.partitions()) {
+ long stoppingOffset = getStoppingOffset(tp);
+ final List<ConsumerRecord<byte[], byte[]>> recordsFromPartition =
+ consumerRecords.records(tp);
+
+ if (recordsFromPartition.size() > 0) {
+ final ConsumerRecord<byte[], byte[]> lastRecord =
+ recordsFromPartition.get(recordsFromPartition.size() -
1);
+
+ // After processing a record with offset of "stoppingOffset -
1", the split reader
+ // should not continue fetching because the record with
stoppingOffset may not
+ // exist. Keep polling will just block forever.
+ if (lastRecord.offset() >= stoppingOffset - 1) {
+ recordsBySplits.setPartitionStoppingOffset(tp,
stoppingOffset);
+ finishSplitAtRecord(
+ tp,
+ stoppingOffset,
+ lastRecord.offset(),
+ finishedPartitions,
+ recordsBySplits);
+ }
+ }
+ }
+
+ markEmptySplitsAsFinished(recordsBySplits);
+
+ if (!finishedPartitions.isEmpty()) {
+ unassignPartitions(finishedPartitions);
+ }
+
+ return recordsBySplits;
+ }
+
+ private void finishSplitAtRecord(
+ TopicPartition tp,
+ long stoppingOffset,
+ long currentOffset,
+ List<TopicPartition> finishedPartitions,
+ KafkaPartitionSplitRecords recordsBySplits) {
+ LOG.debug(
+ "{} has reached stopping offset {}, current offset is {}",
+ tp,
+ stoppingOffset,
+ currentOffset);
+ finishedPartitions.add(tp);
+ recordsBySplits.addFinishedSplit(tp.toString());
+ }
+
+ private void markEmptySplitsAsFinished(KafkaPartitionSplitRecords
recordsBySplits) {
+ // Some splits are discovered as empty when handling split additions.
These splits should be
+ // added to finished splits to clean up states in split fetcher and
source reader.
+ if (!emptySplits.isEmpty()) {
+ recordsBySplits.finishedSplits.addAll(emptySplits);
+ emptySplits.clear();
+ }
+ }
+
+ @Override
+ public void handleSplitsChanges(SplitsChange<KafkaSourceSplit>
splitsChange) {
+ // Get all the partition assignments and stopping offsets.
+ if (!(splitsChange instanceof SplitsAddition)) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "The SplitChange type of %s is not supported.",
+ splitsChange.getClass()));
+ }
+
+ // Assignment.
+ List<TopicPartition> newPartitionAssignments = new ArrayList<>();
+ // Starting offsets.
+ Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets = new
HashMap<>();
+ List<TopicPartition> partitionsStartingFromEarliest = new
ArrayList<>();
+ List<TopicPartition> partitionsStartingFromLatest = new ArrayList<>();
+ // Stopping offsets.
+ List<TopicPartition> partitionsStoppingAtLatest = new ArrayList<>();
+
+ // Parse the starting and stopping offsets.
+ splitsChange
+ .splits()
+ .forEach(
+ s -> {
+ newPartitionAssignments.add(s.getTopicPartition());
+ parseStartingOffsets(
+ s,
+ partitionsStartingFromEarliest,
+ partitionsStartingFromLatest,
+ partitionsStartingFromSpecifiedOffsets);
+ parseStoppingOffsets(s,
partitionsStoppingAtLatest);
+ });
+
+ // Assign new partitions.
+ newPartitionAssignments.addAll(consumer.assignment());
+ consumer.assign(newPartitionAssignments);
+
+ // Seek on the newly assigned partitions to their stating offsets.
+ seekToStartingOffsets(
+ partitionsStartingFromEarliest,
+ partitionsStartingFromLatest,
+ partitionsStartingFromSpecifiedOffsets);
+ // Setup the stopping offsets.
+ acquireAndSetStoppingOffsets(partitionsStoppingAtLatest);
+
+ // After acquiring the starting and stopping offsets, remove the empty
splits if necessary.
+ removeEmptySplits();
+
+ maybeLogSplitChangesHandlingResult(splitsChange);
+ }
+
+ private void
maybeLogSplitChangesHandlingResult(SplitsChange<KafkaSourceSplit> splitsChange)
{
+ if (LOG.isDebugEnabled()) {
+ StringJoiner splitsInfo = new StringJoiner(",");
+ Set<TopicPartition> assginment = consumer.assignment();
+ for (KafkaSourceSplit split : splitsChange.splits()) {
+ if (!assginment.contains(split.getTopicPartition())) {
+ continue;
+ }
+
+ long startingOffset =
+ retryOnWakeup(
+ () ->
consumer.position(split.getTopicPartition()),
+ "logging starting position");
+ long stoppingOffset =
getStoppingOffset(split.getTopicPartition());
+ splitsInfo.add(
+ String.format(
+ "[%s, start:%d, stop: %d]",
+ split.getTopicPartition(), startingOffset,
stoppingOffset));
+ }
+ LOG.debug("SplitsChange handling result: {}", splitsInfo);
+ }
+ }
+
+ private void removeEmptySplits() {
+ List<TopicPartition> emptyPartitions = new ArrayList<>();
+ // If none of the partitions have any records,
+ for (TopicPartition tp : consumer.assignment()) {
+ if (retryOnWakeup(
+ () -> consumer.position(tp),
+ "getting starting offset to check if split is
empty")
+ >= getStoppingOffset(tp)) {
+ emptyPartitions.add(tp);
+ }
+ }
+ if (!emptyPartitions.isEmpty()) {
+ LOG.debug(
+ "These assigning splits are empty and will be marked as
finished in later fetch: {}",
+ emptyPartitions);
+ // Add empty partitions to empty split set for later cleanup in
fetch()
+ emptySplits.addAll(
+ emptyPartitions.stream()
+ .map(TopicPartition::toString)
+ .collect(Collectors.toSet()));
+ // Un-assign partitions from Kafka consumer
+ unassignPartitions(emptyPartitions);
+ }
+ }
+
+ private void unassignPartitions(Collection<TopicPartition>
partitionsToUnassign) {
+ Collection<TopicPartition> newAssignment = new
HashSet<>(consumer.assignment());
+ newAssignment.removeAll(partitionsToUnassign);
+ consumer.assign(newAssignment);
+ }
+
+ private void acquireAndSetStoppingOffsets(List<TopicPartition>
partitionsStoppingAtLatest) {
+ Map<TopicPartition, Long> endOffset =
consumer.endOffsets(partitionsStoppingAtLatest);
+ stoppingOffsets.putAll(endOffset);
+ }
+
+ private void seekToStartingOffsets(
+ List<TopicPartition> partitionsStartingFromEarliest,
+ List<TopicPartition> partitionsStartingFromLatest,
+ Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets) {
+
+ if (!partitionsStartingFromEarliest.isEmpty()) {
+ LOG.trace("Seeking starting offsets to beginning: {}",
partitionsStartingFromEarliest);
+ consumer.seekToBeginning(partitionsStartingFromEarliest);
+ }
+
+ if (!partitionsStartingFromLatest.isEmpty()) {
+ LOG.trace("Seeking starting offsets to end: {}",
partitionsStartingFromLatest);
+ consumer.seekToEnd(partitionsStartingFromLatest);
+ }
+
+ if (!partitionsStartingFromSpecifiedOffsets.isEmpty()) {
+ LOG.trace(
+ "Seeking starting offsets to specified offsets: {}",
+ partitionsStartingFromSpecifiedOffsets);
+ partitionsStartingFromSpecifiedOffsets.forEach(consumer::seek);
+ }
+ }
+
+ private void parseStoppingOffsets(
+ KafkaSourceSplit split, List<TopicPartition>
partitionsStoppingAtLatest) {
+ TopicPartition tp = split.getTopicPartition();
+ if (split.getEndOffset() >= 0) {
+ stoppingOffsets.put(tp, split.getEndOffset());
+ } else {
+ partitionsStoppingAtLatest.add(tp);
+ }
+ }
+
+ private long getStoppingOffset(TopicPartition tp) {
+ return stoppingOffsets.getOrDefault(tp, Long.MAX_VALUE);
+ }
+
+ private void parseStartingOffsets(
+ KafkaSourceSplit split,
+ List<TopicPartition> partitionsStartingFromEarliest,
+ List<TopicPartition> partitionsStartingFromLatest,
+ Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets) {
+ TopicPartition tp = split.getTopicPartition();
+ // Parse starting offsets.
+ ConsumerMetadata metadata =
kafkaSourceConfig.getMapMetadata().get(split.getTablePath());
+ if (metadata.getStartMode() == StartMode.EARLIEST) {
+ partitionsStartingFromEarliest.add(tp);
+ } else if (metadata.getStartMode() == StartMode.LATEST) {
+ partitionsStartingFromLatest.add(tp);
+ } else if (metadata.getStartMode() == StartMode.GROUP_OFFSETS) {
+ // Do nothing here, the consumer will first try to get the
committed offsets of
+ // these partitions by default.
+ } else {
+ partitionsStartingFromSpecifiedOffsets.put(tp,
split.getStartOffset());
+ }
+ }
+
+ @Override
+ public void wakeUp() {
+ consumer.wakeup();
+ }
+
+ @Override
+ public void close() throws Exception {
+ consumer.close();
+ }
+
+ public void notifyCheckpointComplete(
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
+ OffsetCommitCallback offsetCommitCallback) {
+ consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
+ }
+
+ private KafkaConsumer<byte[], byte[]> initConsumer(
+ KafkaSourceConfig kafkaSourceConfig, int subtaskId) {
+
+ try (TemporaryClassLoaderContext ignored =
+
TemporaryClassLoaderContext.of(kafkaSourceConfig.getClass().getClassLoader())) {
+ Properties props = new Properties();
+ kafkaSourceConfig
+ .getProperties()
+ .forEach(
+ (key, value) ->
+ props.setProperty(String.valueOf(key),
String.valueOf(value)));
+ props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
kafkaSourceConfig.getConsumerGroup());
+ props.setProperty(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaSourceConfig.getBootstrap());
+ if (this.kafkaSourceConfig.getProperties().get("client.id") ==
null) {
+ props.setProperty(
+ ConsumerConfig.CLIENT_ID_CONFIG,
+ CLIENT_ID_PREFIX + "-consumer-" + subtaskId);
+ } else {
+ props.setProperty(
+ ConsumerConfig.CLIENT_ID_CONFIG,
+
this.kafkaSourceConfig.getProperties().get("client.id").toString()
+ + "-"
+ + subtaskId);
+ }
+ props.setProperty(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ props.setProperty(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ props.setProperty(
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+ String.valueOf(kafkaSourceConfig.isCommitOnCheckpoint()));
+
+ // Disable auto create topics feature
+ props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,
"false");
+ return new KafkaConsumer<>(props);
+ }
+ }
+
+ private <V> V retryOnWakeup(Supplier<V> consumerCall, String description) {
+ try {
+ return consumerCall.get();
+ } catch (WakeupException we) {
+ LOG.info(
+ "Caught WakeupException while executing Kafka consumer
call for {}. Will retry the consumer call.",
+ description);
+ return consumerCall.get();
+ }
+ }
+
+ private static class KafkaPartitionSplitRecords
+ implements RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> {
+
+ private final Set<String> finishedSplits = new HashSet<>();
+ private final Map<TopicPartition, Long> stoppingOffsets = new
HashMap<>();
+ private final ConsumerRecords<byte[], byte[]> consumerRecords;
+ private final Iterator<TopicPartition> splitIterator;
+ private Iterator<ConsumerRecord<byte[], byte[]>> recordIterator;
+ private TopicPartition currentTopicPartition;
+ private Long currentSplitStoppingOffset;
+
+ private KafkaPartitionSplitRecords(ConsumerRecords<byte[], byte[]>
consumerRecords) {
+ this.consumerRecords = consumerRecords;
+ this.splitIterator = consumerRecords.partitions().iterator();
+ }
+
+ private void setPartitionStoppingOffset(
+ TopicPartition topicPartition, long stoppingOffset) {
+ stoppingOffsets.put(topicPartition, stoppingOffset);
+ }
+
+ private void addFinishedSplit(String splitId) {
+ finishedSplits.add(splitId);
+ }
+
+ @Nullable @Override
+ public String nextSplit() {
+ if (splitIterator.hasNext()) {
+ currentTopicPartition = splitIterator.next();
+ recordIterator =
consumerRecords.records(currentTopicPartition).iterator();
+ currentSplitStoppingOffset =
+ stoppingOffsets.getOrDefault(currentTopicPartition,
Long.MAX_VALUE);
+ return currentTopicPartition.toString();
+ } else {
+ currentTopicPartition = null;
+ recordIterator = null;
+ currentSplitStoppingOffset = null;
+ return null;
+ }
+ }
+
+ @Nullable @Override
+ public ConsumerRecord<byte[], byte[]> nextRecordFromSplit() {
+ Preconditions.checkNotNull(
+ currentTopicPartition,
+ "Make sure nextSplit() did not return null before "
+ + "iterate over the records split.");
+ if (recordIterator.hasNext()) {
+ final ConsumerRecord<byte[], byte[]> record =
recordIterator.next();
+ // Only emit records before stopping offset
+ if (record.offset() < currentSplitStoppingOffset) {
+ return record;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Set<String> finishedSplits() {
+ return finishedSplits;
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
new file mode 100644
index 0000000000..6593137aff
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
+import
org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class KafkaRecordEmitter
+ implements RecordEmitter<
+ ConsumerRecord<byte[], byte[]>, SeaTunnelRow,
KafkaSourceSplitState> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(KafkaRecordEmitter.class);
+ private final Map<TablePath, ConsumerMetadata> mapMetadata;
+ private final OutputCollector<SeaTunnelRow> outputCollector;
+ private final MessageFormatErrorHandleWay messageFormatErrorHandleWay;
+
+ public KafkaRecordEmitter(
+ Map<TablePath, ConsumerMetadata> mapMetadata,
+ MessageFormatErrorHandleWay messageFormatErrorHandleWay) {
+ this.mapMetadata = mapMetadata;
+ this.messageFormatErrorHandleWay = messageFormatErrorHandleWay;
+ this.outputCollector = new OutputCollector<>();
+ }
+
+ @Override
+ public void emitRecord(
+ ConsumerRecord<byte[], byte[]> consumerRecord,
+ Collector<SeaTunnelRow> collector,
+ KafkaSourceSplitState splitState)
+ throws Exception {
+ outputCollector.output = collector;
+ // todo there is an additional loss in this place for non-multi-table
scenarios
+ DeserializationSchema<SeaTunnelRow> deserializationSchema =
+
mapMetadata.get(splitState.getTablePath()).getDeserializationSchema();
+ try {
+ if (deserializationSchema instanceof
CompatibleKafkaConnectDeserializationSchema) {
+ ((CompatibleKafkaConnectDeserializationSchema)
deserializationSchema)
+ .deserialize(consumerRecord, outputCollector);
+ } else {
+ deserializationSchema.deserialize(consumerRecord.value(),
outputCollector);
+ }
+ // consumerRecord.offset + 1 is the offset commit to Kafka and
also the start offset
+ // for the next run
+ splitState.setCurrentOffset(consumerRecord.offset() + 1);
+ } catch (IOException e) {
+ if (this.messageFormatErrorHandleWay ==
MessageFormatErrorHandleWay.SKIP) {
+ logger.warn(
+ "Deserialize message failed, skip this message,
message: {}",
+ new String(consumerRecord.value()));
+ }
+ throw e;
+ }
+ }
+
+ private static class OutputCollector<T> implements Collector<T> {
+ private Collector<T> output;
+
+ @Override
+ public void collect(T record) {
+ output.collect(record);
+ }
+
+ @Override
+ public void collect(SchemaChangeEvent event) {
+ output.collect(event);
+ }
+
+ @Override
+ public void markSchemaChangeBeforeCheckpoint() {
+ output.markSchemaChangeBeforeCheckpoint();
+ }
+
+ @Override
+ public void markSchemaChangeAfterCheckpoint() {
+ output.markSchemaChangeAfterCheckpoint();
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return output.getCheckpointLock();
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 0c8cb4d496..5688fde5b6 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -27,20 +27,31 @@ import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.JobMode;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.source.fetch.KafkaSourceFetcherManager;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import com.google.common.base.Supplier;
+
import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
public class KafkaSource
implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSplit,
KafkaSourceState>,
SupportParallelism {
+ private final ReadonlyConfig readonlyConfig;
private JobContext jobContext;
private final KafkaSourceConfig kafkaSourceConfig;
public KafkaSource(ReadonlyConfig readonlyConfig) {
+ this.readonlyConfig = readonlyConfig;
kafkaSourceConfig = new KafkaSourceConfig(readonlyConfig);
}
@@ -66,10 +77,28 @@ public class KafkaSource
@Override
public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(
SourceReader.Context readerContext) {
+
+ BlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>
elementsQueue =
+ new LinkedBlockingQueue<>();
+
+ Supplier<KafkaPartitionSplitReader> kafkaPartitionSplitReaderSupplier =
+ () -> new KafkaPartitionSplitReader(kafkaSourceConfig,
readerContext);
+
+ KafkaSourceFetcherManager kafkaSourceFetcherManager =
+ new KafkaSourceFetcherManager(
+ elementsQueue, kafkaPartitionSplitReaderSupplier::get);
+ KafkaRecordEmitter kafkaRecordEmitter =
+ new KafkaRecordEmitter(
+ kafkaSourceConfig.getMapMetadata(),
+ kafkaSourceConfig.getMessageFormatErrorHandleWay());
+
return new KafkaSourceReader(
+ elementsQueue,
+ kafkaSourceFetcherManager,
+ kafkaRecordEmitter,
+ new SourceReaderOptions(readonlyConfig),
kafkaSourceConfig,
- readerContext,
- kafkaSourceConfig.getMessageFormatErrorHandleWay());
+ readerContext);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index 935f8d3b84..960a018402 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -89,6 +89,7 @@ public class KafkaSourceConfig implements Serializable {
@Getter private final Properties properties;
@Getter private final long discoveryIntervalMillis;
@Getter private final MessageFormatErrorHandleWay
messageFormatErrorHandleWay;
+ @Getter private final String consumerGroup;
public KafkaSourceConfig(ReadonlyConfig readonlyConfig) {
this.bootstrap = readonlyConfig.get(BOOTSTRAP_SERVERS);
@@ -98,6 +99,7 @@ public class KafkaSourceConfig implements Serializable {
this.discoveryIntervalMillis =
readonlyConfig.get(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS);
this.messageFormatErrorHandleWay =
readonlyConfig.get(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION);
+ this.consumerGroup = readonlyConfig.get(CONSUMER_GROUP);
}
private Properties createKafkaProperties(ReadonlyConfig readonlyConfig) {
@@ -131,7 +133,6 @@ public class KafkaSourceConfig implements Serializable {
ConsumerMetadata consumerMetadata = new ConsumerMetadata();
consumerMetadata.setTopic(readonlyConfig.get(TOPIC));
consumerMetadata.setPattern(readonlyConfig.get(PATTERN));
- consumerMetadata.setConsumerGroup(readonlyConfig.get(CONSUMER_GROUP));
consumerMetadata.setProperties(new Properties());
// Create a catalog
CatalogTable catalogTable = createCatalogTable(readonlyConfig);
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 6f4753110b..82a0522c41 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -17,283 +17,156 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.source;
-import org.apache.seatunnel.api.serialization.DeserializationSchema;
-import org.apache.seatunnel.api.source.Boundedness;
-import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
-import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import
org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
-import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
-import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
-import
org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SingleThreadMultiplexSourceReaderBase;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SingleThreadFetcherManager;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.source.fetch.KafkaSourceFetcherManager;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
-import com.google.common.collect.Sets;
-import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.time.Duration;
+import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
+import java.util.concurrent.ConcurrentMap;
-@Slf4j
-public class KafkaSourceReader implements SourceReader<SeaTunnelRow,
KafkaSourceSplit> {
-
- private static final long THREAD_WAIT_TIME = 500L;
- private static final long POLL_TIMEOUT = 10000L;
+public class KafkaSourceReader
+ extends SingleThreadMultiplexSourceReaderBase<
+ ConsumerRecord<byte[], byte[]>,
+ SeaTunnelRow,
+ KafkaSourceSplit,
+ KafkaSourceSplitState> {
+ private static final Logger logger =
LoggerFactory.getLogger(KafkaSourceReader.class);
private final SourceReader.Context context;
- private final KafkaSourceConfig kafkaSourceConfig;
-
- private final Map<TablePath, ConsumerMetadata> tablePathMetadataMap;
- private final Set<KafkaSourceSplit> sourceSplits;
- private final Map<Long, Map<TopicPartition, Long>> checkpointOffsetMap;
- private final Map<TopicPartition, KafkaConsumerThread> consumerThreadMap;
- private final ExecutorService executorService;
- private final MessageFormatErrorHandleWay messageFormatErrorHandleWay;
- private final LinkedBlockingQueue<KafkaSourceSplit> pendingPartitionsQueue;
+ private final KafkaSourceConfig kafkaSourceConfig;
+ private final SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>>
checkpointOffsetMap;
- private volatile boolean running = false;
+ private final ConcurrentMap<TopicPartition, OffsetAndMetadata>
offsetsOfFinishedSplits;
KafkaSourceReader(
+ BlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>
elementsQueue,
+ SingleThreadFetcherManager<ConsumerRecord<byte[], byte[]>,
KafkaSourceSplit>
+ splitFetcherManager,
+ RecordEmitter<ConsumerRecord<byte[], byte[]>, SeaTunnelRow,
KafkaSourceSplitState>
+ recordEmitter,
+ SourceReaderOptions options,
KafkaSourceConfig kafkaSourceConfig,
- Context context,
- MessageFormatErrorHandleWay messageFormatErrorHandleWay) {
+ Context context) {
+ super(elementsQueue, splitFetcherManager, recordEmitter, options,
context);
this.kafkaSourceConfig = kafkaSourceConfig;
- this.tablePathMetadataMap = kafkaSourceConfig.getMapMetadata();
this.context = context;
- this.messageFormatErrorHandleWay = messageFormatErrorHandleWay;
- this.sourceSplits = new HashSet<>();
- this.consumerThreadMap = new ConcurrentHashMap<>();
- this.checkpointOffsetMap = new ConcurrentHashMap<>();
- this.executorService =
- Executors.newCachedThreadPool(r -> new Thread(r, "Kafka Source
Data Consumer"));
- pendingPartitionsQueue = new LinkedBlockingQueue<>();
+ this.checkpointOffsetMap = Collections.synchronizedSortedMap(new
TreeMap<>());
+ this.offsetsOfFinishedSplits = new ConcurrentHashMap<>();
}
@Override
- public void open() {}
+ protected void onSplitFinished(Map<String, KafkaSourceSplitState>
finishedSplitIds) {
+ finishedSplitIds.forEach(
+ (ignored, splitState) -> {
+ if (splitState.getCurrentOffset() > 0) {
+ offsetsOfFinishedSplits.put(
+ splitState.getTopicPartition(),
+ new
OffsetAndMetadata(splitState.getCurrentOffset()));
+ } else if (splitState.getEndOffset() > 0) {
+ offsetsOfFinishedSplits.put(
+ splitState.getTopicPartition(),
+ new
OffsetAndMetadata(splitState.getEndOffset()));
+ }
+ });
+ }
@Override
- public void close() throws IOException {
- if (executorService != null) {
- executorService.shutdownNow();
- }
+ protected KafkaSourceSplitState initializedState(KafkaSourceSplit split) {
+ return new KafkaSourceSplitState(split);
}
@Override
- public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
- if (!running) {
- Thread.sleep(THREAD_WAIT_TIME);
- return;
- }
+ protected KafkaSourceSplit toSplitType(String splitId,
KafkaSourceSplitState splitState) {
+ return splitState.toKafkaSourceSplit();
+ }
- while (!pendingPartitionsQueue.isEmpty()) {
- sourceSplits.add(pendingPartitionsQueue.poll());
+ @Override
+ public List<KafkaSourceSplit> snapshotState(long checkpointId) {
+ List<KafkaSourceSplit> sourceSplits =
super.snapshotState(checkpointId);
+ if (!kafkaSourceConfig.isCommitOnCheckpoint()) {
+ return sourceSplits;
}
- sourceSplits.forEach(
- sourceSplit ->
- consumerThreadMap.computeIfAbsent(
- sourceSplit.getTopicPartition(),
- s -> {
- ConsumerMetadata
currentSplitConsumerMetaData =
-
tablePathMetadataMap.get(sourceSplit.getTablePath());
- KafkaConsumerThread thread =
- new KafkaConsumerThread(
- kafkaSourceConfig,
-
currentSplitConsumerMetaData);
- executorService.submit(thread);
- return thread;
- }));
- List<KafkaSourceSplit> finishedSplits = new CopyOnWriteArrayList<>();
- sourceSplits.forEach(
- sourceSplit -> {
- CompletableFuture<Boolean> completableFuture = new
CompletableFuture<>();
- TablePath tablePath = sourceSplit.getTablePath();
- DeserializationSchema<SeaTunnelRow> deserializationSchema =
-
tablePathMetadataMap.get(tablePath).getDeserializationSchema();
- try {
- consumerThreadMap
- .get(sourceSplit.getTopicPartition())
- .getTasks()
- .put(
- consumer -> {
- try {
- Set<TopicPartition> partitions
=
- Sets.newHashSet(
-
sourceSplit.getTopicPartition());
- consumer.assign(partitions);
- if
(sourceSplit.getStartOffset() >= 0) {
- consumer.seek(
-
sourceSplit.getTopicPartition(),
-
sourceSplit.getStartOffset());
- }
- ConsumerRecords<byte[],
byte[]> records =
- consumer.poll(
-
Duration.ofMillis(POLL_TIMEOUT));
- for (TopicPartition partition
: partitions) {
-
List<ConsumerRecord<byte[], byte[]>>
- recordList =
records.records(partition);
- if
(Boundedness.BOUNDED.equals(
-
context.getBoundedness())
- &&
recordList.isEmpty()) {
-
completableFuture.complete(true);
- return;
- }
- for
(ConsumerRecord<byte[], byte[]> record :
- recordList) {
- try {
- if
(deserializationSchema
- instanceof
-
CompatibleKafkaConnectDeserializationSchema) {
-
((CompatibleKafkaConnectDeserializationSchema)
-
deserializationSchema)
-
.deserialize(
-
record, output);
- } else {
-
deserializationSchema.deserialize(
-
record.value(), output);
- }
- } catch (IOException
e) {
- if
(this.messageFormatErrorHandleWay
- ==
MessageFormatErrorHandleWay
-
.SKIP) {
- log.warn(
-
"Deserialize message failed, skip this message, message: {}",
- new
String(record.value()));
- continue;
- }
- throw e;
- }
-
- if
(Boundedness.BOUNDED.equals(
-
context.getBoundedness())
- &&
record.offset()
- >=
sourceSplit
-
.getEndOffset()) {
-
completableFuture.complete(true);
- return;
- }
- }
- long lastOffset = -1;
- if (!recordList.isEmpty())
{
- lastOffset =
- recordList
-
.get(recordList.size() - 1)
-
.offset();
-
sourceSplit.setStartOffset(lastOffset + 1);
- }
-
- if (lastOffset >=
sourceSplit.getEndOffset()) {
-
sourceSplit.setEndOffset(lastOffset);
- }
- }
- } catch (Exception e) {
-
completableFuture.completeExceptionally(e);
- }
- completableFuture.complete(false);
- });
- if (completableFuture.get()) {
- finishedSplits.add(sourceSplit);
- }
- } catch (Exception e) {
- throw new KafkaConnectorException(
- KafkaConnectorErrorCode.CONSUME_DATA_FAILED,
e);
- }
- });
- if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
- for (KafkaSourceSplit split : finishedSplits) {
- split.setFinish(true);
- if (split.getStartOffset() == -1) {
- // log next running read start offset
- split.setStartOffset(split.getEndOffset());
+ if (sourceSplits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) {
+ logger.debug(
+ "checkpoint {} does not have an offset to submit for
splits", checkpointId);
+ checkpointOffsetMap.put(checkpointId, Collections.emptyMap());
+ } else {
+ Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap =
+ checkpointOffsetMap.computeIfAbsent(checkpointId, id ->
new HashMap<>());
+ for (KafkaSourceSplit kafkaSourceSplit : sourceSplits) {
+ if (kafkaSourceSplit.getStartOffset() >= 0) {
+ offsetAndMetadataMap.put(
+ kafkaSourceSplit.getTopicPartition(),
+ new
OffsetAndMetadata(kafkaSourceSplit.getStartOffset()));
}
}
- if (sourceSplits.stream().allMatch(KafkaSourceSplit::isFinish)) {
- context.signalNoMoreElement();
- }
+ offsetAndMetadataMap.putAll(offsetsOfFinishedSplits);
}
+ return sourceSplits;
}
@Override
- public List<KafkaSourceSplit> snapshotState(long checkpointId) {
- checkpointOffsetMap.put(
- checkpointId,
- sourceSplits.stream()
- .collect(
- Collectors.toMap(
- KafkaSourceSplit::getTopicPartition,
- KafkaSourceSplit::getStartOffset)));
- return
sourceSplits.stream().map(KafkaSourceSplit::copy).collect(Collectors.toList());
- }
+ public void notifyCheckpointComplete(long checkpointId) {
+ logger.debug("Committing offsets for checkpoint {}", checkpointId);
+ if (!kafkaSourceConfig.isCommitOnCheckpoint()) {
+ logger.debug("Submitting offsets after snapshot completion is
prohibited");
+ return;
+ }
+ Map<TopicPartition, OffsetAndMetadata> committedPartitions =
+ checkpointOffsetMap.get(checkpointId);
- @Override
- public void addSplits(List<KafkaSourceSplit> splits) {
- running = true;
- splits.forEach(
- s -> {
- try {
- pendingPartitionsQueue.put(s);
- } catch (InterruptedException e) {
- throw new KafkaConnectorException(
-
KafkaConnectorErrorCode.ADD_SPLIT_CHECKPOINT_FAILED, e);
- }
- });
- }
+ if (committedPartitions == null) {
+ logger.debug("Offsets for checkpoint {} have already been
committed.", checkpointId);
+ return;
+ }
- @Override
- public void handleNoMoreSplits() {
- log.info("receive no more splits message, this reader will not add new
split.");
+ if (committedPartitions.isEmpty()) {
+ logger.debug("There are no offsets to commit for checkpoint {}.",
checkpointId);
+ removeAllOffsetsToCommitUpToCheckpoint(checkpointId);
+ return;
+ }
+
+ ((KafkaSourceFetcherManager) splitFetcherManager)
+ .commitOffsets(
+ committedPartitions,
+ (ignored, e) -> {
+ if (e != null) {
+ logger.warn(
+ "Failed to commit consumer offsets for
checkpoint {}",
+ checkpointId,
+ e);
+ return;
+ }
+ offsetsOfFinishedSplits
+ .keySet()
+
.removeIf(committedPartitions::containsKey);
+
removeAllOffsetsToCommitUpToCheckpoint(checkpointId);
+ });
}
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
- if (!checkpointOffsetMap.containsKey(checkpointId)) {
- log.warn("checkpoint {} do not exist or have already been
committed.", checkpointId);
- } else {
- checkpointOffsetMap
- .remove(checkpointId)
- .forEach(
- (topicPartition, offset) -> {
- try {
- consumerThreadMap
- .get(topicPartition)
- .getTasks()
- .put(
- consumer -> {
- if (kafkaSourceConfig
-
.isCommitOnCheckpoint()) {
-
Map<TopicPartition, OffsetAndMetadata>
- offsets =
new HashMap<>();
- if (offset >= 0) {
- offsets.put(
-
topicPartition,
- new
OffsetAndMetadata(
-
offset));
-
consumer.commitSync(offsets);
- }
- }
- });
- } catch (InterruptedException e) {
- log.error("commit offset to kafka failed",
e);
- }
- });
+ private void removeAllOffsetsToCommitUpToCheckpoint(long checkpointId) {
+ while (!checkpointOffsetMap.isEmpty() &&
checkpointOffsetMap.firstKey() <= checkpointId) {
+ checkpointOffsetMap.remove(checkpointOffsetMap.firstKey());
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index f868eaed20..06ce4565c3 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -151,8 +151,7 @@ public class KafkaSourceSplitEnumerator
listOffsets(topicPartitions,
OffsetSpec.earliest()));
break;
case GROUP_OFFSETS:
- topicPartitionOffsets.putAll(
- listConsumerGroupOffsets(topicPartitions,
metadata));
+
topicPartitionOffsets.putAll(listConsumerGroupOffsets(topicPartitions));
break;
case LATEST:
topicPartitionOffsets.putAll(listOffsets(topicPartitions,
OffsetSpec.latest()));
@@ -366,13 +365,12 @@ public class KafkaSourceSplitEnumerator
.get();
}
- public Map<TopicPartition, Long> listConsumerGroupOffsets(
- Collection<TopicPartition> partitions, ConsumerMetadata metadata)
+ public Map<TopicPartition, Long>
listConsumerGroupOffsets(Collection<TopicPartition> partitions)
throws ExecutionException, InterruptedException {
ListConsumerGroupOffsetsOptions options =
new ListConsumerGroupOffsetsOptions().topicPartitions(new
ArrayList<>(partitions));
return adminClient
- .listConsumerGroupOffsets(metadata.getConsumerGroup(), options)
+
.listConsumerGroupOffsets(kafkaSourceConfig.getConsumerGroup(), options)
.partitionsToOffsetAndMetadata()
.thenApply(
result -> {
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitState.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitState.java
new file mode 100644
index 0000000000..ab2592cc38
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitState.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+public class KafkaSourceSplitState extends KafkaSourceSplit {
+
+ private long currentOffset;
+
+ public KafkaSourceSplitState(KafkaSourceSplit sourceSplit) {
+ super(
+ sourceSplit.getTablePath(),
+ sourceSplit.getTopicPartition(),
+ sourceSplit.getStartOffset(),
+ sourceSplit.getEndOffset());
+ this.currentOffset = sourceSplit.getStartOffset();
+ }
+
+ public long getCurrentOffset() {
+ return currentOffset;
+ }
+
+ public void setCurrentOffset(long currentOffset) {
+ this.currentOffset = currentOffset;
+ }
+
+ public KafkaSourceSplit toKafkaSourceSplit() {
+ return new KafkaSourceSplit(
+ getTablePath(), getTopicPartition(), getCurrentOffset(),
getEndOffset());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/fetch/KafkaSourceFetcherManager.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/fetch/KafkaSourceFetcherManager.java
new file mode 100644
index 0000000000..bc80455725
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/fetch/KafkaSourceFetcherManager.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source.fetch;
+
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SingleThreadFetcherManager;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherTask;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaPartitionSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public class KafkaSourceFetcherManager
+ extends SingleThreadFetcherManager<ConsumerRecord<byte[], byte[]>,
KafkaSourceSplit> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(KafkaSourceFetcherManager.class);
+
+ public KafkaSourceFetcherManager(
+ BlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>
elementsQueue,
+ Supplier<SplitReader<ConsumerRecord<byte[], byte[]>,
KafkaSourceSplit>>
+ splitReaderSupplier) {
+ super(elementsQueue, splitReaderSupplier);
+ }
+
+ public KafkaSourceFetcherManager(
+ BlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>
elementsQueue,
+ Supplier<SplitReader<ConsumerRecord<byte[], byte[]>,
KafkaSourceSplit>>
+ splitReaderSupplier,
+ Consumer<Collection<String>> splitFinishedHook) {
+ super(elementsQueue, splitReaderSupplier, splitFinishedHook);
+ }
+
+ public void commitOffsets(
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
OffsetCommitCallback callback) {
+ logger.debug("Committing offsets {}", offsetsToCommit);
+ if (offsetsToCommit.isEmpty()) {
+ return;
+ }
+ SplitFetcher<ConsumerRecord<byte[], byte[]>, KafkaSourceSplit>
splitFetcher =
+ fetchers.get(0);
+ if (splitFetcher != null) {
+ // The fetcher thread is still running. This should be the
majority of the cases.
+ enqueueOffsetsCommitTask(splitFetcher, offsetsToCommit, callback);
+ } else {
+ splitFetcher = createSplitFetcher();
+ enqueueOffsetsCommitTask(splitFetcher, offsetsToCommit, callback);
+ startFetcher(splitFetcher);
+ }
+ }
+
+ private void enqueueOffsetsCommitTask(
+ SplitFetcher<ConsumerRecord<byte[], byte[]>, KafkaSourceSplit>
splitFetcher,
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
+ OffsetCommitCallback callback) {
+ KafkaPartitionSplitReader kafkaReader =
+ (KafkaPartitionSplitReader) splitFetcher.getSplitReader();
+
+ splitFetcher.addTask(
+ new SplitFetcherTask() {
+ @Override
+ public void run() throws IOException {
+ kafkaReader.notifyCheckpointComplete(offsetsToCommit,
callback);
+ }
+
+ @Override
+ public void wakeUp() {}
+ });
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
index fa2e1930cc..668747b9db 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
@@ -43,6 +43,12 @@
<artifactId>connector-cdc-mysql</artifactId>
<version>${project.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/multiFormatIT/kafka_multi_source_to_pg.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/multiFormatIT/kafka_multi_source_to_pg.conf
index 8bc6d41cd3..6ef14eed66 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/multiFormatIT/kafka_multi_source_to_pg.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/multiFormatIT/kafka_multi_source_to_pg.conf
@@ -26,11 +26,11 @@ env {
source {
Kafka {
bootstrap.servers = "kafka_e2e:9092"
+ consumer.group = "ogg_multi_group"
table_list = [
{
topic = "^test-ogg-sou.*"
pattern = "true"
- consumer.group = "ogg_multi_group"
start_mode = earliest
schema = {
fields {