This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 10598b6aec [Improve][Kafka] kafka source refactored some reader read 
logic (#6408)
10598b6aec is described below

commit 10598b6aecca27988fcda1762bd27cbd9896bdb3
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Thu Sep 5 10:22:42 2024 +0800

    [Improve][Kafka] kafka source refactored some reader read logic (#6408)
---
 .../common/utils/TemporaryClassLoaderContext.java  |  53 +++
 .../seatunnel/kafka/source/ConsumerMetadata.java   |   1 -
 .../kafka/source/KafkaConsumerThread.java          | 111 -----
 .../kafka/source/KafkaPartitionSplitReader.java    | 458 +++++++++++++++++++++
 .../seatunnel/kafka/source/KafkaRecordEmitter.java | 112 +++++
 .../seatunnel/kafka/source/KafkaSource.java        |  33 +-
 .../seatunnel/kafka/source/KafkaSourceConfig.java  |   3 +-
 .../seatunnel/kafka/source/KafkaSourceReader.java  | 343 +++++----------
 .../kafka/source/KafkaSourceSplitEnumerator.java   |   8 +-
 .../kafka/source/KafkaSourceSplitState.java        |  45 ++
 .../source/fetch/KafkaSourceFetcherManager.java    |  99 +++++
 .../connector-kafka-e2e/pom.xml                    |   6 +
 .../multiFormatIT/kafka_multi_source_to_pg.conf    |   2 +-
 13 files changed, 918 insertions(+), 356 deletions(-)

diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/TemporaryClassLoaderContext.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/TemporaryClassLoaderContext.java
new file mode 100644
index 0000000000..d5ddd48106
--- /dev/null
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/TemporaryClassLoaderContext.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.common.utils;
+
+public final class TemporaryClassLoaderContext implements AutoCloseable {
+
+    /**
+     * Sets the context class loader to the given ClassLoader and returns a 
resource that sets it
+     * back to the current context ClassLoader when the resource is closed.
+     *
+     * <pre>{@code
+     * try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(classloader)) {
+     *     // code that needs the context class loader
+     * }
+     * }</pre>
+     */
+    public static TemporaryClassLoaderContext of(ClassLoader cl) {
+        final Thread t = Thread.currentThread();
+        final ClassLoader original = t.getContextClassLoader();
+
+        t.setContextClassLoader(cl);
+
+        return new TemporaryClassLoaderContext(t, original);
+    }
+
+    private final Thread thread;
+
+    private final ClassLoader originalContextClassLoader;
+
+    private TemporaryClassLoaderContext(Thread thread, ClassLoader 
originalContextClassLoader) {
+        this.thread = thread;
+        this.originalContextClassLoader = originalContextClassLoader;
+    }
+
+    @Override
+    public void close() {
+        thread.setContextClassLoader(originalContextClassLoader);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
index 8ce9dbb870..c129fbdc93 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
@@ -37,7 +37,6 @@ public class ConsumerMetadata implements Serializable {
     private String topic;
     private boolean isPattern = false;
     private Properties properties;
-    private String consumerGroup;
     private StartMode startMode = StartMode.GROUP_OFFSETS;
     private Map<TopicPartition, Long> specificStartOffsets;
     private Long startOffsetsTimestamp;
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
deleted file mode 100644
index 99b6baeaf1..0000000000
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.kafka.source;
-
-import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
-import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-
-import java.util.Properties;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-
-public class KafkaConsumerThread implements Runnable {
-
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private static final String CLIENT_ID_PREFIX = "seatunnel";
-    private final ConsumerMetadata metadata;
-
-    private final LinkedBlockingQueue<Consumer<KafkaConsumer<byte[], byte[]>>> 
tasks;
-
-    public KafkaConsumerThread(KafkaSourceConfig kafkaSourceConfig, 
ConsumerMetadata metadata) {
-        this.metadata = metadata;
-        this.tasks = new LinkedBlockingQueue<>();
-        this.consumer =
-                initConsumer(
-                        kafkaSourceConfig.getBootstrap(),
-                        metadata.getConsumerGroup(),
-                        kafkaSourceConfig.getProperties(),
-                        !kafkaSourceConfig.isCommitOnCheckpoint());
-    }
-
-    @Override
-    public void run() {
-        try {
-            while (!Thread.currentThread().isInterrupted()) {
-                try {
-                    Consumer<KafkaConsumer<byte[], byte[]>> task = 
tasks.poll(1, TimeUnit.SECONDS);
-                    if (task != null) {
-                        task.accept(consumer);
-                    }
-                } catch (Exception e) {
-                    throw new KafkaConnectorException(
-                            KafkaConnectorErrorCode.CONSUME_THREAD_RUN_ERROR, 
e);
-                }
-            }
-        } finally {
-            try {
-                if (consumer != null) {
-                    consumer.close();
-                }
-            } catch (Throwable t) {
-                throw new 
KafkaConnectorException(KafkaConnectorErrorCode.CONSUMER_CLOSE_FAILED, t);
-            }
-        }
-    }
-
-    public LinkedBlockingQueue<Consumer<KafkaConsumer<byte[], byte[]>>> 
getTasks() {
-        return tasks;
-    }
-
-    private KafkaConsumer<byte[], byte[]> initConsumer(
-            String bootstrapServer,
-            String consumerGroup,
-            Properties properties,
-            boolean autoCommit) {
-        Properties props = new Properties();
-        properties.forEach(
-                (key, value) -> props.setProperty(String.valueOf(key), 
String.valueOf(value)));
-        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
-        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
-        if (this.metadata.getProperties().get("client.id") == null) {
-            props.setProperty(
-                    ConsumerConfig.CLIENT_ID_CONFIG,
-                    CLIENT_ID_PREFIX + "-consumer-" + this.hashCode());
-        } else {
-            props.setProperty(
-                    ConsumerConfig.CLIENT_ID_CONFIG,
-                    this.metadata.getProperties().get("client.id").toString());
-        }
-        props.setProperty(
-                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                ByteArrayDeserializer.class.getName());
-        props.setProperty(
-                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                ByteArrayDeserializer.class.getName());
-        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
String.valueOf(autoCommit));
-
-        // Disable auto create topics feature
-        props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, 
"false");
-        return new KafkaConsumer<>(props);
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java
new file mode 100644
index 0000000000..8bca82999c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
+
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.common.utils.TemporaryClassLoaderContext;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitsAddition;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitsChange;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringJoiner;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class KafkaPartitionSplitReader
+        implements SplitReader<ConsumerRecord<byte[], byte[]>, 
KafkaSourceSplit> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaPartitionSplitReader.class);
+
+    private static final long POLL_TIMEOUT = 10000L;
+    private static final String CLIENT_ID_PREFIX = "seatunnel";
+    private final KafkaSourceConfig kafkaSourceConfig;
+
+    private final KafkaConsumer<byte[], byte[]> consumer;
+
+    private final Map<TopicPartition, Long> stoppingOffsets;
+
+    private final String groupId;
+
+    private final Set<String> emptySplits = new HashSet<>();
+
+    public KafkaPartitionSplitReader(
+            KafkaSourceConfig kafkaSourceConfig, SourceReader.Context context) 
{
+        this.kafkaSourceConfig = kafkaSourceConfig;
+        this.consumer = initConsumer(kafkaSourceConfig, 
context.getIndexOfSubtask());
+        this.stoppingOffsets = new HashMap<>();
+        this.groupId =
+                
kafkaSourceConfig.getProperties().getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+    }
+
+    @Override
+    public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws 
IOException {
+        ConsumerRecords<byte[], byte[]> consumerRecords;
+        try {
+            consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
+        } catch (WakeupException | IllegalStateException e) {
+            // IllegalStateException will be thrown if the consumer is not 
assigned any partitions.
+            // This happens if all assigned partitions are invalid or empty 
(starting offset >=
+            // stopping offset). We just mark empty partitions as finished and 
return an empty
+            // record container, and this consumer will be closed by 
SplitFetcherManager.
+            KafkaPartitionSplitRecords recordsBySplits =
+                    new KafkaPartitionSplitRecords(ConsumerRecords.empty());
+            markEmptySplitsAsFinished(recordsBySplits);
+            return recordsBySplits;
+        }
+        KafkaPartitionSplitRecords recordsBySplits =
+                new KafkaPartitionSplitRecords(consumerRecords);
+        List<TopicPartition> finishedPartitions = new ArrayList<>();
+        for (TopicPartition tp : consumerRecords.partitions()) {
+            long stoppingOffset = getStoppingOffset(tp);
+            final List<ConsumerRecord<byte[], byte[]>> recordsFromPartition =
+                    consumerRecords.records(tp);
+
+            if (recordsFromPartition.size() > 0) {
+                final ConsumerRecord<byte[], byte[]> lastRecord =
+                        recordsFromPartition.get(recordsFromPartition.size() - 
1);
+
+                // After processing a record with offset of "stoppingOffset - 
1", the split reader
+                // should not continue fetching because the record with 
stoppingOffset may not
+                // exist. Keep polling will just block forever.
+                if (lastRecord.offset() >= stoppingOffset - 1) {
+                    recordsBySplits.setPartitionStoppingOffset(tp, 
stoppingOffset);
+                    finishSplitAtRecord(
+                            tp,
+                            stoppingOffset,
+                            lastRecord.offset(),
+                            finishedPartitions,
+                            recordsBySplits);
+                }
+            }
+        }
+
+        markEmptySplitsAsFinished(recordsBySplits);
+
+        if (!finishedPartitions.isEmpty()) {
+            unassignPartitions(finishedPartitions);
+        }
+
+        return recordsBySplits;
+    }
+
+    private void finishSplitAtRecord(
+            TopicPartition tp,
+            long stoppingOffset,
+            long currentOffset,
+            List<TopicPartition> finishedPartitions,
+            KafkaPartitionSplitRecords recordsBySplits) {
+        LOG.debug(
+                "{} has reached stopping offset {}, current offset is {}",
+                tp,
+                stoppingOffset,
+                currentOffset);
+        finishedPartitions.add(tp);
+        recordsBySplits.addFinishedSplit(tp.toString());
+    }
+
+    private void markEmptySplitsAsFinished(KafkaPartitionSplitRecords 
recordsBySplits) {
+        // Some splits are discovered as empty when handling split additions. 
These splits should be
+        // added to finished splits to clean up states in split fetcher and 
source reader.
+        if (!emptySplits.isEmpty()) {
+            recordsBySplits.finishedSplits.addAll(emptySplits);
+            emptySplits.clear();
+        }
+    }
+
+    @Override
+    public void handleSplitsChanges(SplitsChange<KafkaSourceSplit> 
splitsChange) {
+        // Get all the partition assignments and stopping offsets.
+        if (!(splitsChange instanceof SplitsAddition)) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "The SplitChange type of %s is not supported.",
+                            splitsChange.getClass()));
+        }
+
+        // Assignment.
+        List<TopicPartition> newPartitionAssignments = new ArrayList<>();
+        // Starting offsets.
+        Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets = new 
HashMap<>();
+        List<TopicPartition> partitionsStartingFromEarliest = new 
ArrayList<>();
+        List<TopicPartition> partitionsStartingFromLatest = new ArrayList<>();
+        // Stopping offsets.
+        List<TopicPartition> partitionsStoppingAtLatest = new ArrayList<>();
+
+        // Parse the starting and stopping offsets.
+        splitsChange
+                .splits()
+                .forEach(
+                        s -> {
+                            newPartitionAssignments.add(s.getTopicPartition());
+                            parseStartingOffsets(
+                                    s,
+                                    partitionsStartingFromEarliest,
+                                    partitionsStartingFromLatest,
+                                    partitionsStartingFromSpecifiedOffsets);
+                            parseStoppingOffsets(s, 
partitionsStoppingAtLatest);
+                        });
+
+        // Assign new partitions.
+        newPartitionAssignments.addAll(consumer.assignment());
+        consumer.assign(newPartitionAssignments);
+
+        // Seek on the newly assigned partitions to their stating offsets.
+        seekToStartingOffsets(
+                partitionsStartingFromEarliest,
+                partitionsStartingFromLatest,
+                partitionsStartingFromSpecifiedOffsets);
+        // Setup the stopping offsets.
+        acquireAndSetStoppingOffsets(partitionsStoppingAtLatest);
+
+        // After acquiring the starting and stopping offsets, remove the empty 
splits if necessary.
+        removeEmptySplits();
+
+        maybeLogSplitChangesHandlingResult(splitsChange);
+    }
+
+    private void 
maybeLogSplitChangesHandlingResult(SplitsChange<KafkaSourceSplit> splitsChange) 
{
+        if (LOG.isDebugEnabled()) {
+            StringJoiner splitsInfo = new StringJoiner(",");
+            Set<TopicPartition> assginment = consumer.assignment();
+            for (KafkaSourceSplit split : splitsChange.splits()) {
+                if (!assginment.contains(split.getTopicPartition())) {
+                    continue;
+                }
+
+                long startingOffset =
+                        retryOnWakeup(
+                                () -> 
consumer.position(split.getTopicPartition()),
+                                "logging starting position");
+                long stoppingOffset = 
getStoppingOffset(split.getTopicPartition());
+                splitsInfo.add(
+                        String.format(
+                                "[%s, start:%d, stop: %d]",
+                                split.getTopicPartition(), startingOffset, 
stoppingOffset));
+            }
+            LOG.debug("SplitsChange handling result: {}", splitsInfo);
+        }
+    }
+
+    private void removeEmptySplits() {
+        List<TopicPartition> emptyPartitions = new ArrayList<>();
+        // If none of the partitions have any records,
+        for (TopicPartition tp : consumer.assignment()) {
+            if (retryOnWakeup(
+                            () -> consumer.position(tp),
+                            "getting starting offset to check if split is 
empty")
+                    >= getStoppingOffset(tp)) {
+                emptyPartitions.add(tp);
+            }
+        }
+        if (!emptyPartitions.isEmpty()) {
+            LOG.debug(
+                    "These assigning splits are empty and will be marked as 
finished in later fetch: {}",
+                    emptyPartitions);
+            // Add empty partitions to empty split set for later cleanup in 
fetch()
+            emptySplits.addAll(
+                    emptyPartitions.stream()
+                            .map(TopicPartition::toString)
+                            .collect(Collectors.toSet()));
+            // Un-assign partitions from Kafka consumer
+            unassignPartitions(emptyPartitions);
+        }
+    }
+
+    private void unassignPartitions(Collection<TopicPartition> 
partitionsToUnassign) {
+        Collection<TopicPartition> newAssignment = new 
HashSet<>(consumer.assignment());
+        newAssignment.removeAll(partitionsToUnassign);
+        consumer.assign(newAssignment);
+    }
+
+    private void acquireAndSetStoppingOffsets(List<TopicPartition> 
partitionsStoppingAtLatest) {
+        Map<TopicPartition, Long> endOffset = 
consumer.endOffsets(partitionsStoppingAtLatest);
+        stoppingOffsets.putAll(endOffset);
+    }
+
+    private void seekToStartingOffsets(
+            List<TopicPartition> partitionsStartingFromEarliest,
+            List<TopicPartition> partitionsStartingFromLatest,
+            Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets) {
+
+        if (!partitionsStartingFromEarliest.isEmpty()) {
+            LOG.trace("Seeking starting offsets to beginning: {}", 
partitionsStartingFromEarliest);
+            consumer.seekToBeginning(partitionsStartingFromEarliest);
+        }
+
+        if (!partitionsStartingFromLatest.isEmpty()) {
+            LOG.trace("Seeking starting offsets to end: {}", 
partitionsStartingFromLatest);
+            consumer.seekToEnd(partitionsStartingFromLatest);
+        }
+
+        if (!partitionsStartingFromSpecifiedOffsets.isEmpty()) {
+            LOG.trace(
+                    "Seeking starting offsets to specified offsets: {}",
+                    partitionsStartingFromSpecifiedOffsets);
+            partitionsStartingFromSpecifiedOffsets.forEach(consumer::seek);
+        }
+    }
+
+    private void parseStoppingOffsets(
+            KafkaSourceSplit split, List<TopicPartition> 
partitionsStoppingAtLatest) {
+        TopicPartition tp = split.getTopicPartition();
+        if (split.getEndOffset() >= 0) {
+            stoppingOffsets.put(tp, split.getEndOffset());
+        } else {
+            partitionsStoppingAtLatest.add(tp);
+        }
+    }
+
+    private long getStoppingOffset(TopicPartition tp) {
+        return stoppingOffsets.getOrDefault(tp, Long.MAX_VALUE);
+    }
+
+    private void parseStartingOffsets(
+            KafkaSourceSplit split,
+            List<TopicPartition> partitionsStartingFromEarliest,
+            List<TopicPartition> partitionsStartingFromLatest,
+            Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets) {
+        TopicPartition tp = split.getTopicPartition();
+        // Parse starting offsets.
+        ConsumerMetadata metadata = 
kafkaSourceConfig.getMapMetadata().get(split.getTablePath());
+        if (metadata.getStartMode() == StartMode.EARLIEST) {
+            partitionsStartingFromEarliest.add(tp);
+        } else if (metadata.getStartMode() == StartMode.LATEST) {
+            partitionsStartingFromLatest.add(tp);
+        } else if (metadata.getStartMode() == StartMode.GROUP_OFFSETS) {
+            // Do nothing here, the consumer will first try to get the 
committed offsets of
+            // these partitions by default.
+        } else {
+            partitionsStartingFromSpecifiedOffsets.put(tp, 
split.getStartOffset());
+        }
+    }
+
+    @Override
+    public void wakeUp() {
+        consumer.wakeup();
+    }
+
+    @Override
+    public void close() throws Exception {
+        consumer.close();
+    }
+
+    public void notifyCheckpointComplete(
+            Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
+            OffsetCommitCallback offsetCommitCallback) {
+        consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
+    }
+
+    private KafkaConsumer<byte[], byte[]> initConsumer(
+            KafkaSourceConfig kafkaSourceConfig, int subtaskId) {
+
+        try (TemporaryClassLoaderContext ignored =
+                
TemporaryClassLoaderContext.of(kafkaSourceConfig.getClass().getClassLoader())) {
+            Properties props = new Properties();
+            kafkaSourceConfig
+                    .getProperties()
+                    .forEach(
+                            (key, value) ->
+                                    props.setProperty(String.valueOf(key), 
String.valueOf(value)));
+            props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
kafkaSourceConfig.getConsumerGroup());
+            props.setProperty(
+                    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaSourceConfig.getBootstrap());
+            if (this.kafkaSourceConfig.getProperties().get("client.id") == 
null) {
+                props.setProperty(
+                        ConsumerConfig.CLIENT_ID_CONFIG,
+                        CLIENT_ID_PREFIX + "-consumer-" + subtaskId);
+            } else {
+                props.setProperty(
+                        ConsumerConfig.CLIENT_ID_CONFIG,
+                        
this.kafkaSourceConfig.getProperties().get("client.id").toString()
+                                + "-"
+                                + subtaskId);
+            }
+            props.setProperty(
+                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                    ByteArrayDeserializer.class.getName());
+            props.setProperty(
+                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                    ByteArrayDeserializer.class.getName());
+            props.setProperty(
+                    ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+                    String.valueOf(kafkaSourceConfig.isCommitOnCheckpoint()));
+
+            // Disable auto create topics feature
+            props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, 
"false");
+            return new KafkaConsumer<>(props);
+        }
+    }
+
+    private <V> V retryOnWakeup(Supplier<V> consumerCall, String description) {
+        try {
+            return consumerCall.get();
+        } catch (WakeupException we) {
+            LOG.info(
+                    "Caught WakeupException while executing Kafka consumer 
call for {}. Will retry the consumer call.",
+                    description);
+            return consumerCall.get();
+        }
+    }
+
+    private static class KafkaPartitionSplitRecords
+            implements RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> {
+
+        private final Set<String> finishedSplits = new HashSet<>();
+        private final Map<TopicPartition, Long> stoppingOffsets = new 
HashMap<>();
+        private final ConsumerRecords<byte[], byte[]> consumerRecords;
+        private final Iterator<TopicPartition> splitIterator;
+        private Iterator<ConsumerRecord<byte[], byte[]>> recordIterator;
+        private TopicPartition currentTopicPartition;
+        private Long currentSplitStoppingOffset;
+
+        private KafkaPartitionSplitRecords(ConsumerRecords<byte[], byte[]> 
consumerRecords) {
+            this.consumerRecords = consumerRecords;
+            this.splitIterator = consumerRecords.partitions().iterator();
+        }
+
+        private void setPartitionStoppingOffset(
+                TopicPartition topicPartition, long stoppingOffset) {
+            stoppingOffsets.put(topicPartition, stoppingOffset);
+        }
+
+        private void addFinishedSplit(String splitId) {
+            finishedSplits.add(splitId);
+        }
+
+        @Nullable @Override
+        public String nextSplit() {
+            if (splitIterator.hasNext()) {
+                currentTopicPartition = splitIterator.next();
+                recordIterator = 
consumerRecords.records(currentTopicPartition).iterator();
+                currentSplitStoppingOffset =
+                        stoppingOffsets.getOrDefault(currentTopicPartition, 
Long.MAX_VALUE);
+                return currentTopicPartition.toString();
+            } else {
+                currentTopicPartition = null;
+                recordIterator = null;
+                currentSplitStoppingOffset = null;
+                return null;
+            }
+        }
+
+        @Nullable @Override
+        public ConsumerRecord<byte[], byte[]> nextRecordFromSplit() {
+            Preconditions.checkNotNull(
+                    currentTopicPartition,
+                    "Make sure nextSplit() did not return null before "
+                            + "iterate over the records split.");
+            if (recordIterator.hasNext()) {
+                final ConsumerRecord<byte[], byte[]> record = 
recordIterator.next();
+                // Only emit records before stopping offset
+                if (record.offset() < currentSplitStoppingOffset) {
+                    return record;
+                }
+            }
+            return null;
+        }
+
+        @Override
+        public Set<String> finishedSplits() {
+            return finishedSplits;
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
new file mode 100644
index 0000000000..6593137aff
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
+import 
org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class KafkaRecordEmitter
+        implements RecordEmitter<
+                ConsumerRecord<byte[], byte[]>, SeaTunnelRow, 
KafkaSourceSplitState> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordEmitter.class);
+    private final Map<TablePath, ConsumerMetadata> mapMetadata;
+    private final OutputCollector<SeaTunnelRow> outputCollector;
+    private final MessageFormatErrorHandleWay messageFormatErrorHandleWay;
+
+    public KafkaRecordEmitter(
+            Map<TablePath, ConsumerMetadata> mapMetadata,
+            MessageFormatErrorHandleWay messageFormatErrorHandleWay) {
+        this.mapMetadata = mapMetadata;
+        this.messageFormatErrorHandleWay = messageFormatErrorHandleWay;
+        this.outputCollector = new OutputCollector<>();
+    }
+
+    @Override
+    public void emitRecord(
+            ConsumerRecord<byte[], byte[]> consumerRecord,
+            Collector<SeaTunnelRow> collector,
+            KafkaSourceSplitState splitState)
+            throws Exception {
+        outputCollector.output = collector;
+        // todo there is an additional loss in this place for non-multi-table 
scenarios
+        DeserializationSchema<SeaTunnelRow> deserializationSchema =
+                
mapMetadata.get(splitState.getTablePath()).getDeserializationSchema();
+        try {
+            if (deserializationSchema instanceof 
CompatibleKafkaConnectDeserializationSchema) {
+                ((CompatibleKafkaConnectDeserializationSchema) 
deserializationSchema)
+                        .deserialize(consumerRecord, outputCollector);
+            } else {
+                deserializationSchema.deserialize(consumerRecord.value(), 
outputCollector);
+            }
+            // consumerRecord.offset + 1 is the offset commit to Kafka and 
also the start offset
+            // for the next run
+            splitState.setCurrentOffset(consumerRecord.offset() + 1);
+        } catch (IOException e) {
+            if (this.messageFormatErrorHandleWay == 
MessageFormatErrorHandleWay.SKIP) {
+                logger.warn(
+                        "Deserialize message failed, skip this message, 
message: {}",
+                        new String(consumerRecord.value()));
+            }
+            throw e;
+        }
+    }
+
+    private static class OutputCollector<T> implements Collector<T> {
+        private Collector<T> output;
+
+        @Override
+        public void collect(T record) {
+            output.collect(record);
+        }
+
+        @Override
+        public void collect(SchemaChangeEvent event) {
+            output.collect(event);
+        }
+
+        @Override
+        public void markSchemaChangeBeforeCheckpoint() {
+            output.markSchemaChangeBeforeCheckpoint();
+        }
+
+        @Override
+        public void markSchemaChangeAfterCheckpoint() {
+            output.markSchemaChangeAfterCheckpoint();
+        }
+
+        @Override
+        public Object getCheckpointLock() {
+            return output.getCheckpointLock();
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 0c8cb4d496..5688fde5b6 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -27,20 +27,31 @@ import org.apache.seatunnel.api.source.SupportParallelism;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.constants.JobMode;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.source.fetch.KafkaSourceFetcherManager;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import com.google.common.base.Supplier;
+
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.stream.Collectors;
 
 public class KafkaSource
         implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSplit, 
KafkaSourceState>,
                 SupportParallelism {
 
+    private final ReadonlyConfig readonlyConfig;
     private JobContext jobContext;
 
     private final KafkaSourceConfig kafkaSourceConfig;
 
     public KafkaSource(ReadonlyConfig readonlyConfig) {
+        this.readonlyConfig = readonlyConfig;
         kafkaSourceConfig = new KafkaSourceConfig(readonlyConfig);
     }
 
@@ -66,10 +77,28 @@ public class KafkaSource
     @Override
     public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(
             SourceReader.Context readerContext) {
+
+        BlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>> 
elementsQueue =
+                new LinkedBlockingQueue<>();
+
+        Supplier<KafkaPartitionSplitReader> kafkaPartitionSplitReaderSupplier =
+                () -> new KafkaPartitionSplitReader(kafkaSourceConfig, 
readerContext);
+
+        KafkaSourceFetcherManager kafkaSourceFetcherManager =
+                new KafkaSourceFetcherManager(
+                        elementsQueue, kafkaPartitionSplitReaderSupplier::get);
+        KafkaRecordEmitter kafkaRecordEmitter =
+                new KafkaRecordEmitter(
+                        kafkaSourceConfig.getMapMetadata(),
+                        kafkaSourceConfig.getMessageFormatErrorHandleWay());
+
         return new KafkaSourceReader(
+                elementsQueue,
+                kafkaSourceFetcherManager,
+                kafkaRecordEmitter,
+                new SourceReaderOptions(readonlyConfig),
                 kafkaSourceConfig,
-                readerContext,
-                kafkaSourceConfig.getMessageFormatErrorHandleWay());
+                readerContext);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index 935f8d3b84..960a018402 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -89,6 +89,7 @@ public class KafkaSourceConfig implements Serializable {
     @Getter private final Properties properties;
     @Getter private final long discoveryIntervalMillis;
     @Getter private final MessageFormatErrorHandleWay 
messageFormatErrorHandleWay;
+    @Getter private final String consumerGroup;
 
     public KafkaSourceConfig(ReadonlyConfig readonlyConfig) {
         this.bootstrap = readonlyConfig.get(BOOTSTRAP_SERVERS);
@@ -98,6 +99,7 @@ public class KafkaSourceConfig implements Serializable {
         this.discoveryIntervalMillis = 
readonlyConfig.get(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS);
         this.messageFormatErrorHandleWay =
                 readonlyConfig.get(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION);
+        this.consumerGroup = readonlyConfig.get(CONSUMER_GROUP);
     }
 
     private Properties createKafkaProperties(ReadonlyConfig readonlyConfig) {
@@ -131,7 +133,6 @@ public class KafkaSourceConfig implements Serializable {
         ConsumerMetadata consumerMetadata = new ConsumerMetadata();
         consumerMetadata.setTopic(readonlyConfig.get(TOPIC));
         consumerMetadata.setPattern(readonlyConfig.get(PATTERN));
-        consumerMetadata.setConsumerGroup(readonlyConfig.get(CONSUMER_GROUP));
         consumerMetadata.setProperties(new Properties());
         // Create a catalog
         CatalogTable catalogTable = createCatalogTable(readonlyConfig);
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 6f4753110b..82a0522c41 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -17,283 +17,156 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.source;
 
-import org.apache.seatunnel.api.serialization.DeserializationSchema;
-import org.apache.seatunnel.api.source.Boundedness;
-import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
-import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import 
org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
-import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
-import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
-import 
org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SingleThreadMultiplexSourceReaderBase;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SingleThreadFetcherManager;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.source.fetch.KafkaSourceFetcherManager;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 
-import com.google.common.collect.Sets;
-import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.time.Duration;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
+import java.util.concurrent.ConcurrentMap;
 
-@Slf4j
-public class KafkaSourceReader implements SourceReader<SeaTunnelRow, 
KafkaSourceSplit> {
-
-    private static final long THREAD_WAIT_TIME = 500L;
-    private static final long POLL_TIMEOUT = 10000L;
+public class KafkaSourceReader
+        extends SingleThreadMultiplexSourceReaderBase<
+                ConsumerRecord<byte[], byte[]>,
+                SeaTunnelRow,
+                KafkaSourceSplit,
+                KafkaSourceSplitState> {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(KafkaSourceReader.class);
     private final SourceReader.Context context;
-    private final KafkaSourceConfig kafkaSourceConfig;
-
-    private final Map<TablePath, ConsumerMetadata> tablePathMetadataMap;
-    private final Set<KafkaSourceSplit> sourceSplits;
-    private final Map<Long, Map<TopicPartition, Long>> checkpointOffsetMap;
-    private final Map<TopicPartition, KafkaConsumerThread> consumerThreadMap;
-    private final ExecutorService executorService;
-    private final MessageFormatErrorHandleWay messageFormatErrorHandleWay;
 
-    private final LinkedBlockingQueue<KafkaSourceSplit> pendingPartitionsQueue;
+    private final KafkaSourceConfig kafkaSourceConfig;
+    private final SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> 
checkpointOffsetMap;
 
-    private volatile boolean running = false;
+    private final ConcurrentMap<TopicPartition, OffsetAndMetadata> 
offsetsOfFinishedSplits;
 
     KafkaSourceReader(
+            BlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>> 
elementsQueue,
+            SingleThreadFetcherManager<ConsumerRecord<byte[], byte[]>, 
KafkaSourceSplit>
+                    splitFetcherManager,
+            RecordEmitter<ConsumerRecord<byte[], byte[]>, SeaTunnelRow, 
KafkaSourceSplitState>
+                    recordEmitter,
+            SourceReaderOptions options,
             KafkaSourceConfig kafkaSourceConfig,
-            Context context,
-            MessageFormatErrorHandleWay messageFormatErrorHandleWay) {
+            Context context) {
+        super(elementsQueue, splitFetcherManager, recordEmitter, options, 
context);
         this.kafkaSourceConfig = kafkaSourceConfig;
-        this.tablePathMetadataMap = kafkaSourceConfig.getMapMetadata();
         this.context = context;
-        this.messageFormatErrorHandleWay = messageFormatErrorHandleWay;
-        this.sourceSplits = new HashSet<>();
-        this.consumerThreadMap = new ConcurrentHashMap<>();
-        this.checkpointOffsetMap = new ConcurrentHashMap<>();
-        this.executorService =
-                Executors.newCachedThreadPool(r -> new Thread(r, "Kafka Source 
Data Consumer"));
-        pendingPartitionsQueue = new LinkedBlockingQueue<>();
+        this.checkpointOffsetMap = Collections.synchronizedSortedMap(new 
TreeMap<>());
+        this.offsetsOfFinishedSplits = new ConcurrentHashMap<>();
     }
 
     @Override
-    public void open() {}
+    protected void onSplitFinished(Map<String, KafkaSourceSplitState> 
finishedSplitIds) {
+        finishedSplitIds.forEach(
+                (ignored, splitState) -> {
+                    if (splitState.getCurrentOffset() > 0) {
+                        offsetsOfFinishedSplits.put(
+                                splitState.getTopicPartition(),
+                                new 
OffsetAndMetadata(splitState.getCurrentOffset()));
+                    } else if (splitState.getEndOffset() > 0) {
+                        offsetsOfFinishedSplits.put(
+                                splitState.getTopicPartition(),
+                                new 
OffsetAndMetadata(splitState.getEndOffset()));
+                    }
+                });
+    }
 
     @Override
-    public void close() throws IOException {
-        if (executorService != null) {
-            executorService.shutdownNow();
-        }
+    protected KafkaSourceSplitState initializedState(KafkaSourceSplit split) {
+        return new KafkaSourceSplitState(split);
     }
 
     @Override
-    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
-        if (!running) {
-            Thread.sleep(THREAD_WAIT_TIME);
-            return;
-        }
+    protected KafkaSourceSplit toSplitType(String splitId, 
KafkaSourceSplitState splitState) {
+        return splitState.toKafkaSourceSplit();
+    }
 
-        while (!pendingPartitionsQueue.isEmpty()) {
-            sourceSplits.add(pendingPartitionsQueue.poll());
+    @Override
+    public List<KafkaSourceSplit> snapshotState(long checkpointId) {
+        List<KafkaSourceSplit> sourceSplits = 
super.snapshotState(checkpointId);
+        if (!kafkaSourceConfig.isCommitOnCheckpoint()) {
+            return sourceSplits;
         }
-        sourceSplits.forEach(
-                sourceSplit ->
-                        consumerThreadMap.computeIfAbsent(
-                                sourceSplit.getTopicPartition(),
-                                s -> {
-                                    ConsumerMetadata 
currentSplitConsumerMetaData =
-                                            
tablePathMetadataMap.get(sourceSplit.getTablePath());
-                                    KafkaConsumerThread thread =
-                                            new KafkaConsumerThread(
-                                                    kafkaSourceConfig,
-                                                    
currentSplitConsumerMetaData);
-                                    executorService.submit(thread);
-                                    return thread;
-                                }));
-        List<KafkaSourceSplit> finishedSplits = new CopyOnWriteArrayList<>();
-        sourceSplits.forEach(
-                sourceSplit -> {
-                    CompletableFuture<Boolean> completableFuture = new 
CompletableFuture<>();
-                    TablePath tablePath = sourceSplit.getTablePath();
-                    DeserializationSchema<SeaTunnelRow> deserializationSchema =
-                            
tablePathMetadataMap.get(tablePath).getDeserializationSchema();
-                    try {
-                        consumerThreadMap
-                                .get(sourceSplit.getTopicPartition())
-                                .getTasks()
-                                .put(
-                                        consumer -> {
-                                            try {
-                                                Set<TopicPartition> partitions 
=
-                                                        Sets.newHashSet(
-                                                                
sourceSplit.getTopicPartition());
-                                                consumer.assign(partitions);
-                                                if 
(sourceSplit.getStartOffset() >= 0) {
-                                                    consumer.seek(
-                                                            
sourceSplit.getTopicPartition(),
-                                                            
sourceSplit.getStartOffset());
-                                                }
-                                                ConsumerRecords<byte[], 
byte[]> records =
-                                                        consumer.poll(
-                                                                
Duration.ofMillis(POLL_TIMEOUT));
-                                                for (TopicPartition partition 
: partitions) {
-                                                    
List<ConsumerRecord<byte[], byte[]>>
-                                                            recordList = 
records.records(partition);
-                                                    if 
(Boundedness.BOUNDED.equals(
-                                                                    
context.getBoundedness())
-                                                            && 
recordList.isEmpty()) {
-                                                        
completableFuture.complete(true);
-                                                        return;
-                                                    }
-                                                    for 
(ConsumerRecord<byte[], byte[]> record :
-                                                            recordList) {
-                                                        try {
-                                                            if 
(deserializationSchema
-                                                                    instanceof
-                                                                    
CompatibleKafkaConnectDeserializationSchema) {
-                                                                
((CompatibleKafkaConnectDeserializationSchema)
-                                                                               
 deserializationSchema)
-                                                                        
.deserialize(
-                                                                               
 record, output);
-                                                            } else {
-                                                                
deserializationSchema.deserialize(
-                                                                        
record.value(), output);
-                                                            }
-                                                        } catch (IOException 
e) {
-                                                            if 
(this.messageFormatErrorHandleWay
-                                                                    == 
MessageFormatErrorHandleWay
-                                                                            
.SKIP) {
-                                                                log.warn(
-                                                                        
"Deserialize message failed, skip this message, message: {}",
-                                                                        new 
String(record.value()));
-                                                                continue;
-                                                            }
-                                                            throw e;
-                                                        }
-
-                                                        if 
(Boundedness.BOUNDED.equals(
-                                                                        
context.getBoundedness())
-                                                                && 
record.offset()
-                                                                        >= 
sourceSplit
-                                                                               
 .getEndOffset()) {
-                                                            
completableFuture.complete(true);
-                                                            return;
-                                                        }
-                                                    }
-                                                    long lastOffset = -1;
-                                                    if (!recordList.isEmpty()) 
{
-                                                        lastOffset =
-                                                                recordList
-                                                                        
.get(recordList.size() - 1)
-                                                                        
.offset();
-                                                        
sourceSplit.setStartOffset(lastOffset + 1);
-                                                    }
-
-                                                    if (lastOffset >= 
sourceSplit.getEndOffset()) {
-                                                        
sourceSplit.setEndOffset(lastOffset);
-                                                    }
-                                                }
-                                            } catch (Exception e) {
-                                                
completableFuture.completeExceptionally(e);
-                                            }
-                                            completableFuture.complete(false);
-                                        });
-                        if (completableFuture.get()) {
-                            finishedSplits.add(sourceSplit);
-                        }
-                    } catch (Exception e) {
-                        throw new KafkaConnectorException(
-                                KafkaConnectorErrorCode.CONSUME_DATA_FAILED, 
e);
-                    }
-                });
-        if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
-            for (KafkaSourceSplit split : finishedSplits) {
-                split.setFinish(true);
-                if (split.getStartOffset() == -1) {
-                    // log next running read start offset
-                    split.setStartOffset(split.getEndOffset());
+        if (sourceSplits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) {
+            logger.debug(
+                    "checkpoint {} does not have an offset to submit for 
splits", checkpointId);
+            checkpointOffsetMap.put(checkpointId, Collections.emptyMap());
+        } else {
+            Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap =
+                    checkpointOffsetMap.computeIfAbsent(checkpointId, id -> 
new HashMap<>());
+            for (KafkaSourceSplit kafkaSourceSplit : sourceSplits) {
+                if (kafkaSourceSplit.getStartOffset() >= 0) {
+                    offsetAndMetadataMap.put(
+                            kafkaSourceSplit.getTopicPartition(),
+                            new 
OffsetAndMetadata(kafkaSourceSplit.getStartOffset()));
                 }
             }
-            if (sourceSplits.stream().allMatch(KafkaSourceSplit::isFinish)) {
-                context.signalNoMoreElement();
-            }
+            offsetAndMetadataMap.putAll(offsetsOfFinishedSplits);
         }
+        return sourceSplits;
     }
 
     @Override
-    public List<KafkaSourceSplit> snapshotState(long checkpointId) {
-        checkpointOffsetMap.put(
-                checkpointId,
-                sourceSplits.stream()
-                        .collect(
-                                Collectors.toMap(
-                                        KafkaSourceSplit::getTopicPartition,
-                                        KafkaSourceSplit::getStartOffset)));
-        return 
sourceSplits.stream().map(KafkaSourceSplit::copy).collect(Collectors.toList());
-    }
+    public void notifyCheckpointComplete(long checkpointId) {
+        logger.debug("Committing offsets for checkpoint {}", checkpointId);
+        if (!kafkaSourceConfig.isCommitOnCheckpoint()) {
+            logger.debug("Submitting offsets after snapshot completion is 
prohibited");
+            return;
+        }
+        Map<TopicPartition, OffsetAndMetadata> committedPartitions =
+                checkpointOffsetMap.get(checkpointId);
 
-    @Override
-    public void addSplits(List<KafkaSourceSplit> splits) {
-        running = true;
-        splits.forEach(
-                s -> {
-                    try {
-                        pendingPartitionsQueue.put(s);
-                    } catch (InterruptedException e) {
-                        throw new KafkaConnectorException(
-                                
KafkaConnectorErrorCode.ADD_SPLIT_CHECKPOINT_FAILED, e);
-                    }
-                });
-    }
+        if (committedPartitions == null) {
+            logger.debug("Offsets for checkpoint {} have already been 
committed.", checkpointId);
+            return;
+        }
 
-    @Override
-    public void handleNoMoreSplits() {
-        log.info("receive no more splits message, this reader will not add new 
split.");
+        if (committedPartitions.isEmpty()) {
+            logger.debug("There are no offsets to commit for checkpoint {}.", 
checkpointId);
+            removeAllOffsetsToCommitUpToCheckpoint(checkpointId);
+            return;
+        }
+
+        ((KafkaSourceFetcherManager) splitFetcherManager)
+                .commitOffsets(
+                        committedPartitions,
+                        (ignored, e) -> {
+                            if (e != null) {
+                                logger.warn(
+                                        "Failed to commit consumer offsets for 
checkpoint {}",
+                                        checkpointId,
+                                        e);
+                                return;
+                            }
+                            offsetsOfFinishedSplits
+                                    .keySet()
+                                    
.removeIf(committedPartitions::containsKey);
+                            
removeAllOffsetsToCommitUpToCheckpoint(checkpointId);
+                        });
     }
 
-    @Override
-    public void notifyCheckpointComplete(long checkpointId) {
-        if (!checkpointOffsetMap.containsKey(checkpointId)) {
-            log.warn("checkpoint {} do not exist or have already been 
committed.", checkpointId);
-        } else {
-            checkpointOffsetMap
-                    .remove(checkpointId)
-                    .forEach(
-                            (topicPartition, offset) -> {
-                                try {
-                                    consumerThreadMap
-                                            .get(topicPartition)
-                                            .getTasks()
-                                            .put(
-                                                    consumer -> {
-                                                        if (kafkaSourceConfig
-                                                                
.isCommitOnCheckpoint()) {
-                                                            
Map<TopicPartition, OffsetAndMetadata>
-                                                                    offsets = 
new HashMap<>();
-                                                            if (offset >= 0) {
-                                                                offsets.put(
-                                                                        
topicPartition,
-                                                                        new 
OffsetAndMetadata(
-                                                                               
 offset));
-                                                                
consumer.commitSync(offsets);
-                                                            }
-                                                        }
-                                                    });
-                                } catch (InterruptedException e) {
-                                    log.error("commit offset to kafka failed", 
e);
-                                }
-                            });
+    private void removeAllOffsetsToCommitUpToCheckpoint(long checkpointId) {
+        while (!checkpointOffsetMap.isEmpty() && 
checkpointOffsetMap.firstKey() <= checkpointId) {
+            checkpointOffsetMap.remove(checkpointOffsetMap.firstKey());
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index f868eaed20..06ce4565c3 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -151,8 +151,7 @@ public class KafkaSourceSplitEnumerator
                             listOffsets(topicPartitions, 
OffsetSpec.earliest()));
                     break;
                 case GROUP_OFFSETS:
-                    topicPartitionOffsets.putAll(
-                            listConsumerGroupOffsets(topicPartitions, 
metadata));
+                    
topicPartitionOffsets.putAll(listConsumerGroupOffsets(topicPartitions));
                     break;
                 case LATEST:
                     topicPartitionOffsets.putAll(listOffsets(topicPartitions, 
OffsetSpec.latest()));
@@ -366,13 +365,12 @@ public class KafkaSourceSplitEnumerator
                 .get();
     }
 
-    public Map<TopicPartition, Long> listConsumerGroupOffsets(
-            Collection<TopicPartition> partitions, ConsumerMetadata metadata)
+    public Map<TopicPartition, Long> 
listConsumerGroupOffsets(Collection<TopicPartition> partitions)
             throws ExecutionException, InterruptedException {
         ListConsumerGroupOffsetsOptions options =
                 new ListConsumerGroupOffsetsOptions().topicPartitions(new 
ArrayList<>(partitions));
         return adminClient
-                .listConsumerGroupOffsets(metadata.getConsumerGroup(), options)
+                
.listConsumerGroupOffsets(kafkaSourceConfig.getConsumerGroup(), options)
                 .partitionsToOffsetAndMetadata()
                 .thenApply(
                         result -> {
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitState.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitState.java
new file mode 100644
index 0000000000..ab2592cc38
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitState.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+public class KafkaSourceSplitState extends KafkaSourceSplit {
+
+    private long currentOffset;
+
+    public KafkaSourceSplitState(KafkaSourceSplit sourceSplit) {
+        super(
+                sourceSplit.getTablePath(),
+                sourceSplit.getTopicPartition(),
+                sourceSplit.getStartOffset(),
+                sourceSplit.getEndOffset());
+        this.currentOffset = sourceSplit.getStartOffset();
+    }
+
+    public long getCurrentOffset() {
+        return currentOffset;
+    }
+
+    public void setCurrentOffset(long currentOffset) {
+        this.currentOffset = currentOffset;
+    }
+
+    public KafkaSourceSplit toKafkaSourceSplit() {
+        return new KafkaSourceSplit(
+                getTablePath(), getTopicPartition(), getCurrentOffset(), 
getEndOffset());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/fetch/KafkaSourceFetcherManager.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/fetch/KafkaSourceFetcherManager.java
new file mode 100644
index 0000000000..bc80455725
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/fetch/KafkaSourceFetcherManager.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source.fetch;
+
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SingleThreadFetcherManager;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherTask;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaPartitionSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public class KafkaSourceFetcherManager
+        extends SingleThreadFetcherManager<ConsumerRecord<byte[], byte[]>, 
KafkaSourceSplit> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(KafkaSourceFetcherManager.class);
+
+    public KafkaSourceFetcherManager(
+            BlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>> 
elementsQueue,
+            Supplier<SplitReader<ConsumerRecord<byte[], byte[]>, 
KafkaSourceSplit>>
+                    splitReaderSupplier) {
+        super(elementsQueue, splitReaderSupplier);
+    }
+
+    public KafkaSourceFetcherManager(
+            BlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>> 
elementsQueue,
+            Supplier<SplitReader<ConsumerRecord<byte[], byte[]>, 
KafkaSourceSplit>>
+                    splitReaderSupplier,
+            Consumer<Collection<String>> splitFinishedHook) {
+        super(elementsQueue, splitReaderSupplier, splitFinishedHook);
+    }
+
+    public void commitOffsets(
+            Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, 
OffsetCommitCallback callback) {
+        logger.debug("Committing offsets {}", offsetsToCommit);
+        if (offsetsToCommit.isEmpty()) {
+            return;
+        }
+        SplitFetcher<ConsumerRecord<byte[], byte[]>, KafkaSourceSplit> 
splitFetcher =
+                fetchers.get(0);
+        if (splitFetcher != null) {
+            // The fetcher thread is still running. This should be the 
majority of the cases.
+            enqueueOffsetsCommitTask(splitFetcher, offsetsToCommit, callback);
+        } else {
+            splitFetcher = createSplitFetcher();
+            enqueueOffsetsCommitTask(splitFetcher, offsetsToCommit, callback);
+            startFetcher(splitFetcher);
+        }
+    }
+
+    private void enqueueOffsetsCommitTask(
+            SplitFetcher<ConsumerRecord<byte[], byte[]>, KafkaSourceSplit> 
splitFetcher,
+            Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
+            OffsetCommitCallback callback) {
+        KafkaPartitionSplitReader kafkaReader =
+                (KafkaPartitionSplitReader) splitFetcher.getSplitReader();
+
+        splitFetcher.addTask(
+                new SplitFetcherTask() {
+                    @Override
+                    public void run() throws IOException {
+                        kafkaReader.notifyCheckpointComplete(offsetsToCommit, 
callback);
+                    }
+
+                    @Override
+                    public void wakeUp() {}
+                });
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
index fa2e1930cc..668747b9db 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
@@ -43,6 +43,12 @@
             <artifactId>connector-cdc-mysql</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.kafka</groupId>
+                    <artifactId>kafka-clients</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/multiFormatIT/kafka_multi_source_to_pg.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/multiFormatIT/kafka_multi_source_to_pg.conf
index 8bc6d41cd3..6ef14eed66 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/multiFormatIT/kafka_multi_source_to_pg.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/multiFormatIT/kafka_multi_source_to_pg.conf
@@ -26,11 +26,11 @@ env {
 source {
   Kafka {
     bootstrap.servers = "kafka_e2e:9092"
+    consumer.group = "ogg_multi_group"
     table_list = [
       {
         topic = "^test-ogg-sou.*"
         pattern = "true"
-        consumer.group = "ogg_multi_group"
         start_mode = earliest
         schema = {
           fields {

Reply via email to