This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 691aabcb [api-draft] Support coordinated source & Fix checkpoint lock 
implementation (#1951)
691aabcb is described below

commit 691aabcb873392349af9e6379bd29a1fb5dd9f01
Author: Zongwen Li <[email protected]>
AuthorDate: Fri May 27 09:55:50 2022 +0800

    [api-draft] Support coordinated source & Fix checkpoint lock implementation 
(#1951)
    
    * [api-draft][flink] Support coordinated source & Fix checkpoint lock 
implementation
    
    * [api-draft][spark] Support coordinated source
    
    * [api-draft][spark] Coordinated source support parallel checkpoint lock
---
 .../org/apache/seatunnel/api/source/Collector.java |   4 +-
 .../apache/seatunnel/api/state/CheckpointLock.java |  26 ---
 .../translation/source/BaseSourceFunction.java}    |  23 +-
 .../source/CoordinatedEnumeratorContext.java       |  10 +-
 ...rContext.java => CoordinatedReaderContext.java} |  24 +-
 .../translation/source/CoordinatedSource.java      | 241 ++++++++++++++++++++-
 .../translation/source/ParallelReaderContext.java  |   1 -
 .../translation/source/ParallelSource.java         |  49 +++--
 .../seatunnel/translation/state/EmptyLock.java     |  32 ---
 ...ource.java => BaseSeaTunnelSourceFunction.java} |  67 +++---
 .../flink/source/FlinkCheckpointLock.java          |  89 --------
 .../translation/flink/source/RowCollector.java     |   7 +-
 .../flink/source/SeaTunnelCoordinatedSource.java   |  43 ++++
 .../flink/source/SeaTunnelParallelSource.java      | 114 +---------
 .../spark/source/InternalRowCollector.java         |  12 +-
 .../translation/spark/source/ReaderState.java      |   7 +-
 .../spark/source/SeaTunnelSourceSupport.java       |  23 +-
 .../spark/source/batch/BatchPartition.java         |   7 +-
 ...lelSourceReader.java => BatchSourceReader.java} |  17 +-
 .../batch/CoordinatedBatchPartitionReader.java     | 100 +++++++++
 ...ader.java => ParallelBatchPartitionReader.java} |  45 ++--
 .../source/continnous/ContinuousPartition.java     |   7 +-
 ...urceReader.java => ContinuousSourceReader.java} |   7 +-
 ...java => ParallelContinuousPartitionReader.java} |  17 +-
 .../CoordinatedMicroBatchPartitionReader.java      | 134 ++++++++++++
 .../spark/source/micro/MicroBatchPartition.java    |   7 +-
 ...urceReader.java => MicroBatchSourceReader.java} |  17 +-
 ...java => ParallelMicroBatchPartitionReader.java} |  78 +++----
 28 files changed, 756 insertions(+), 452 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
index a2bc77ba..0b924bb5 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
@@ -17,8 +17,6 @@
 
 package org.apache.seatunnel.api.source;
 
-import org.apache.seatunnel.api.state.CheckpointLock;
-
 /**
  * A {@link Collector} is used to collect data from {@link SourceReader}.
  *
@@ -33,5 +31,5 @@ public interface Collector<T> {
      *
      * @return The object to use as the lock
      */
-    CheckpointLock getCheckpointLock();
+    Object getCheckpointLock();
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointLock.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointLock.java
deleted file mode 100644
index 4a63cb15..00000000
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointLock.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.state;
-
-public interface CheckpointLock {
-
-    void lock();
-
-    void unlock();
-
-}
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/SingleReaderCheckpointLock.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/BaseSourceFunction.java
similarity index 63%
rename from 
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/SingleReaderCheckpointLock.java
rename to 
seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/BaseSourceFunction.java
index d0ceb189..eaa7325e 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/SingleReaderCheckpointLock.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/BaseSourceFunction.java
@@ -15,22 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.translation.spark.source.micro;
+package org.apache.seatunnel.translation.source;
 
-import org.apache.seatunnel.api.state.CheckpointLock;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.state.CheckpointListener;
 
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.List;
+import java.util.Map;
 
-public class SingleReaderCheckpointLock implements CheckpointLock {
-    private final ReentrantLock fairLock = new ReentrantLock(true);
+public interface BaseSourceFunction<T> extends AutoCloseable, 
CheckpointListener {
 
-    @Override
-    public void lock() {
-        fairLock.lock();
-    }
+    void open() throws Exception;
 
-    @Override
-    public void unlock() {
-        fairLock.unlock();
-    }
+    void run(Collector<T> collector) throws Exception;
+
+    Map<Integer, List<byte[]>> snapshotState(long checkpointId) throws 
Exception;
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java
index 8c06e3c8..5f585b97 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java
@@ -34,27 +34,27 @@ public class CoordinatedEnumeratorContext<SplitT extends 
SourceSplit> implements
 
     @Override
     public int currentParallelism() {
-        return 0;
+        return coordinatedSource.currentReaderCount();
     }
 
     @Override
     public Set<Integer> registeredReaders() {
-        return null;
+        return coordinatedSource.registeredReaders();
     }
 
     @Override
     public void assignSplit(int subtaskId, List<SplitT> splits) {
-
+        coordinatedSource.addSplits(subtaskId, splits);
     }
 
     @Override
     public void signalNoMoreSplits(int subtaskId) {
-
+        coordinatedSource.handleNoMoreSplits(subtaskId);
     }
 
     @Override
     public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
-
+        coordinatedSource.handleEnumeratorEvent(subtaskId, event);
     }
 
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedReaderContext.java
similarity index 69%
copy from 
seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
copy to 
seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedReaderContext.java
index 9080c578..6f410844 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedReaderContext.java
@@ -21,44 +21,42 @@ import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceReader;
 
-public class ParallelReaderContext implements SourceReader.Context {
+public class CoordinatedReaderContext implements SourceReader.Context {
 
-    protected final ParallelSource<?, ?, ?> parallelSource;
+    protected final CoordinatedSource<?, ?, ?> coordinatedSource;
     protected final Boundedness boundedness;
     protected final Integer subtaskId;
 
-    public ParallelReaderContext(ParallelSource<?, ?, ?> parallelSource,
-                                 Boundedness boundedness,
-                                 Integer subtaskId) {
-        this.parallelSource = parallelSource;
+    public CoordinatedReaderContext(CoordinatedSource<?, ?, ?> 
coordinatedSource,
+                                    Boundedness boundedness,
+                                    Integer subtaskId) {
+        this.coordinatedSource = coordinatedSource;
         this.boundedness = boundedness;
         this.subtaskId = subtaskId;
     }
 
     @Override
     public int getIndexOfSubtask() {
-        return subtaskId;
+        return this.subtaskId;
     }
 
     @Override
     public Boundedness getBoundedness() {
-        return boundedness;
+        return this.boundedness;
     }
 
     @Override
     public void signalNoMoreElement() {
-        // todo: if we have multiple subtasks, we need to know if all subtask 
is stopped
-        parallelSource.handleNoMoreElement();
+        coordinatedSource.handleNoMoreElement(subtaskId);
     }
 
     @Override
     public void sendSplitRequest() {
-        parallelSource.handleSplitRequest(subtaskId);
+        coordinatedSource.handleSplitRequest(subtaskId);
     }
 
     @Override
     public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
-        // TODO: exception
-        throw new RuntimeException("");
+        coordinatedSource.handleReaderEvent(subtaskId, sourceEvent);
     }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 142bd8e3..9806a6d0 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -17,13 +17,250 @@
 
 package org.apache.seatunnel.translation.source;
 
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
-public class CoordinatedSource<T, SplitT extends SourceSplit, StateT> {
-    protected volatile Map<Integer, SourceReader<T, SplitT>> readerMap = new 
ConcurrentHashMap<>();
+public class CoordinatedSource<T, SplitT extends SourceSplit, StateT> 
implements BaseSourceFunction<T> {
+    protected static final long SLEEP_TIME_INTERVAL = 5L;
+    protected final SeaTunnelSource<T, SplitT, StateT> source;
+    protected final Map<Integer, List<byte[]>> restoredState;
+    protected final Integer parallelism;
 
+    protected final Serializer<SplitT> splitSerializer;
+    protected final Serializer<StateT> enumeratorStateSerializer;
+
+    protected final CoordinatedEnumeratorContext<SplitT> 
coordinatedEnumeratorContext;
+    protected final Map<Integer, CoordinatedReaderContext> readerContextMap;
+    protected final Map<Integer, List<SplitT>> restoredSplitStateMap = new 
HashMap<>();
+
+    protected transient volatile SourceSplitEnumerator<SplitT, StateT> 
splitEnumerator;
+    protected transient Map<Integer, SourceReader<T, SplitT>> readerMap = new 
ConcurrentHashMap<>();
+    protected final Map<Integer, AtomicBoolean> readerRunningMap;
+    protected transient volatile ScheduledThreadPoolExecutor executorService;
+
+    /**
+     * Flag indicating whether the consumer is still running.
+     */
+    protected volatile boolean running = true;
+
+    public CoordinatedSource(SeaTunnelSource<T, SplitT, StateT> source,
+                             Map<Integer, List<byte[]>> restoredState,
+                             int parallelism) {
+        this.source = source;
+        this.restoredState = restoredState;
+        this.parallelism = parallelism;
+        this.splitSerializer = source.getSplitSerializer();
+        this.enumeratorStateSerializer = source.getEnumeratorStateSerializer();
+
+        this.coordinatedEnumeratorContext = new 
CoordinatedEnumeratorContext<>(this);
+        this.readerContextMap = new ConcurrentHashMap<>(parallelism);
+        this.readerRunningMap = new ConcurrentHashMap<>(parallelism);
+        try {
+            createSplitEnumerator();
+            createReaders();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private void createSplitEnumerator() throws Exception {
+        if (restoredState != null && restoredState.size() > 0) {
+            StateT restoredEnumeratorState = 
enumeratorStateSerializer.deserialize(restoredState.get(-1).get(0));
+            splitEnumerator = 
source.restoreEnumerator(coordinatedEnumeratorContext, restoredEnumeratorState);
+            restoredState.forEach((subtaskId, splitBytes) -> {
+                if (subtaskId == -1) {
+                    return;
+                }
+                List<SplitT> restoredSplitState = new 
ArrayList<>(splitBytes.size());
+                for (byte[] splitByte : splitBytes) {
+                    try {
+                        
restoredSplitState.add(splitSerializer.deserialize(splitByte));
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+                restoredSplitStateMap.put(subtaskId, restoredSplitState);
+            });
+        } else {
+            splitEnumerator = 
source.createEnumerator(coordinatedEnumeratorContext);
+        }
+    }
+
+    private void createReaders() throws Exception {
+        for (int subtaskId = 0; subtaskId < this.parallelism; subtaskId++) {
+            CoordinatedReaderContext readerContext = new 
CoordinatedReaderContext(this, source.getBoundedness(), subtaskId);
+            readerContextMap.put(subtaskId, readerContext);
+            SourceReader<T, SplitT> reader = 
source.createReader(readerContext);
+            readerMap.put(subtaskId, reader);
+        }
+    }
+
+    @Override
+    public void open() throws Exception {
+        executorService = 
ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(parallelism, 
"parallel-split-enumerator-executor");
+        splitEnumerator.open();
+        restoredSplitStateMap.forEach((subtaskId, splits) -> {
+            splitEnumerator.addSplitsBack(splits, subtaskId);
+        });
+        readerMap.entrySet().parallelStream().forEach(entry -> {
+            try {
+                entry.getValue().open();
+                splitEnumerator.registerReader(entry.getKey());
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    @Override
+    public void run(Collector<T> collector) throws Exception {
+        readerMap.entrySet().parallelStream().forEach(entry -> {
+            final AtomicBoolean flag = readerRunningMap.get(entry.getKey());
+            final SourceReader<T, SplitT> reader = entry.getValue();
+            executorService.execute(() -> {
+                while (flag.get()) {
+                    try {
+                        reader.pollNext(collector);
+                    } catch (Exception e) {
+                        running = false;
+                        flag.set(false);
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
+        });
+        splitEnumerator.run();
+        while (running) {
+            Thread.sleep(SLEEP_TIME_INTERVAL);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        running = false;
+
+        for (Map.Entry<Integer, SourceReader<T, SplitT>> entry : 
readerMap.entrySet()) {
+            readerRunningMap.get(entry.getKey()).set(false);
+            entry.getValue().close();
+        }
+
+        if (executorService != null) {
+            executorService.shutdown();
+        }
+
+        try (SourceSplitEnumerator<SplitT, StateT> closed = splitEnumerator) {
+            // just close the resources
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Checkpoint & state
+    // 
--------------------------------------------------------------------------------------------
+
+    @Override
+    public Map<Integer, List<byte[]>> snapshotState(long checkpointId) throws 
Exception {
+        StateT enumeratorState = splitEnumerator.snapshotState(checkpointId);
+        byte[] enumeratorStateBytes = 
enumeratorStateSerializer.serialize(enumeratorState);
+        Map<Integer, List<byte[]>> allStates = readerMap.entrySet()
+            .parallelStream()
+            .collect(Collectors.toMap(
+                Map.Entry<Integer, SourceReader<T, SplitT>>::getKey,
+                readerEntry -> {
+                    try {
+                        List<SplitT> splitStates = 
readerEntry.getValue().snapshotState(checkpointId);
+                        final List<byte[]> rawValues = new 
ArrayList<>(splitStates.size());
+                        for (SplitT splitState : splitStates) {
+                            
rawValues.add(splitSerializer.serialize(splitState));
+                        }
+                        return rawValues;
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                }));
+        allStates.put(-1, Collections.singletonList(enumeratorStateBytes));
+        return allStates;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        splitEnumerator.notifyCheckpointComplete(checkpointId);
+        readerMap.values().parallelStream().forEach(reader -> {
+            try {
+                reader.notifyCheckpointComplete(checkpointId);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    @Override
+    public void notifyCheckpointAborted(long checkpointId) throws Exception {
+        splitEnumerator.notifyCheckpointAborted(checkpointId);
+        readerMap.values().parallelStream().forEach(reader -> {
+            try {
+                reader.notifyCheckpointAborted(checkpointId);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Reader context methods
+    // 
--------------------------------------------------------------------------------------------
+
+    protected void handleNoMoreElement(int subtaskId) {
+        readerRunningMap.get(subtaskId).set(false);
+        readerContextMap.remove(subtaskId);
+    }
+
+    protected void handleSplitRequest(int subtaskId) {
+        splitEnumerator.handleSplitRequest(subtaskId);
+    }
+
+    protected void handleReaderEvent(int subtaskId, SourceEvent event) {
+        splitEnumerator.handleSourceEvent(subtaskId, event);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Enumerator context methods
+    // 
--------------------------------------------------------------------------------------------
+
+    public int currentReaderCount() {
+        return readerContextMap.size();
+    }
+
+    public Set<Integer> registeredReaders() {
+        return readerMap.keySet();
+    }
+
+    protected void addSplits(int subtaskId, List<SplitT> splits) {
+        readerMap.get(subtaskId).addSplits(splits);
+    }
+
+    protected void handleNoMoreSplits(int subtaskId) {
+        readerMap.get(subtaskId).handleNoMoreSplits();
+    }
+
+    protected void handleEnumeratorEvent(int subtaskId, SourceEvent event) {
+        readerMap.get(subtaskId).handleSourceEvent(event);
+    }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
index 9080c578..24c082cf 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
@@ -47,7 +47,6 @@ public class ParallelReaderContext implements 
SourceReader.Context {
 
     @Override
     public void signalNoMoreElement() {
-        // todo: if we have multiple subtasks, we need to know if all subtask 
is stopped
         parallelSource.handleNoMoreElement();
     }
 
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index 83e61f78..1f25d736 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -23,19 +23,18 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.api.state.CheckpointListener;
 import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
-public class ParallelSource<T, SplitT extends SourceSplit, StateT> implements 
AutoCloseable, CheckpointListener {
-
-    private final long splitEnumeratorTimeInterval = 5L;
+public class ParallelSource<T, SplitT extends SourceSplit, StateT> implements 
BaseSourceFunction<T> {
 
     protected final SeaTunnelSource<T, SplitT, StateT> source;
     protected final ParallelEnumeratorContext<SplitT> 
parallelEnumeratorContext;
@@ -48,8 +47,8 @@ public class ParallelSource<T, SplitT extends SourceSplit, 
StateT> implements Au
 
     protected final List<SplitT> restoredSplitState;
 
-    protected transient volatile SourceSplitEnumerator<SplitT, StateT> 
splitEnumerator;
-    protected transient volatile SourceReader<T, SplitT> reader;
+    protected final SourceSplitEnumerator<SplitT, StateT> splitEnumerator;
+    protected final SourceReader<T, SplitT> reader;
     protected transient volatile ScheduledThreadPoolExecutor executorService;
 
     /**
@@ -58,7 +57,7 @@ public class ParallelSource<T, SplitT extends SourceSplit, 
StateT> implements Au
     private volatile boolean running = true;
 
     public ParallelSource(SeaTunnelSource<T, SplitT, StateT> source,
-                          List<byte[]> restoredState,
+                          Map<Integer, List<byte[]>> restoredState,
                           int parallelism,
                           int subtaskId) {
         this.source = source;
@@ -73,10 +72,10 @@ public class ParallelSource<T, SplitT extends SourceSplit, 
StateT> implements Au
         // Create or restore split enumerator & reader
         try {
             if (restoredState != null && restoredState.size() > 0) {
-                StateT restoredEnumeratorState = 
enumeratorStateSerializer.deserialize(restoredState.get(0));
-                restoredSplitState = new ArrayList<>(restoredState.size());
-                for (int i = 1; i < restoredState.size(); i++) {
-                    
restoredSplitState.add(splitSerializer.deserialize(restoredState.get(i)));
+                StateT restoredEnumeratorState = 
enumeratorStateSerializer.deserialize(restoredState.get(-1).get(0));
+                restoredSplitState = new 
ArrayList<>(restoredState.get(subtaskId).size());
+                for (byte[] splitBytes : restoredState.get(subtaskId)) {
+                    
restoredSplitState.add(splitSerializer.deserialize(splitBytes));
                 }
 
                 splitEnumerator = 
source.restoreEnumerator(parallelEnumeratorContext, restoredEnumeratorState);
@@ -90,17 +89,19 @@ public class ParallelSource<T, SplitT extends SourceSplit, 
StateT> implements Au
         }
     }
 
-    private transient volatile Thread splitEnumeratorThread;
-
+    @Override
     public void open() throws Exception {
         executorService = 
ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(1, 
String.format("parallel-split-enumerator-executor-%s", subtaskId));
         splitEnumerator.open();
-        splitEnumerator.addSplitsBack(restoredSplitState, subtaskId);
+        if (restoredSplitState.size() > 0) {
+            splitEnumerator.addSplitsBack(restoredSplitState, subtaskId);
+        }
         reader.open();
         parallelEnumeratorContext.register();
         splitEnumerator.registerReader(subtaskId);
     }
 
+    @Override
     public void run(Collector<T> collector) throws Exception {
         Future<?> future = executorService.submit(() -> {
             try {
@@ -161,16 +162,22 @@ public class ParallelSource<T, SplitT extends 
SourceSplit, StateT> implements Au
         reader.handleNoMoreSplits();
     }
 
-    public List<byte[]> snapshotState(long checkpointId) throws Exception {
-        StateT enumeratorState = splitEnumerator.snapshotState(checkpointId);
-        byte[] enumeratorStateBytes = 
enumeratorStateSerializer.serialize(enumeratorState);
+    // 
--------------------------------------------------------------------------------------------
+    // Checkpoint & state
+    // 
--------------------------------------------------------------------------------------------
+
+    @Override
+    public Map<Integer, List<byte[]>> snapshotState(long checkpointId) throws 
Exception {
+        byte[] enumeratorStateBytes = 
enumeratorStateSerializer.serialize(splitEnumerator.snapshotState(checkpointId));
         List<SplitT> splitStates = reader.snapshotState(checkpointId);
-        final List<byte[]> rawValues = new ArrayList<>(splitStates.size() + 1);
-        rawValues.add(enumeratorStateBytes);
+        final List<byte[]> readerStateBytes = new 
ArrayList<>(splitStates.size());
         for (SplitT splitState : splitStates) {
-            rawValues.add(splitSerializer.serialize(splitState));
+            readerStateBytes.add(splitSerializer.serialize(splitState));
         }
-        return rawValues;
+        Map<Integer, List<byte[]>> allStates = new HashMap<>(2);
+        allStates.put(-1, Collections.singletonList(enumeratorStateBytes));
+        allStates.put(subtaskId, readerStateBytes);
+        return allStates;
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/state/EmptyLock.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/state/EmptyLock.java
deleted file mode 100644
index 363956ad..00000000
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/state/EmptyLock.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.state;
-
-import org.apache.seatunnel.api.state.CheckpointLock;
-
-public class EmptyLock implements CheckpointLock {
-    @Override
-    public void lock() {
-        // nothing
-    }
-
-    @Override
-    public void unlock() {
-        // nothing
-    }
-}
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
similarity index 64%
copy from 
seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
copy to 
seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
index 48374567..1778596f 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
@@ -21,77 +21,70 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
 import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
-import org.apache.seatunnel.translation.source.ParallelSource;
+import org.apache.seatunnel.translation.source.BaseSourceFunction;
 
 import org.apache.flink.api.common.state.CheckpointListener;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.types.Row;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
-public class SeaTunnelParallelSource extends RichParallelSourceFunction<Row>
+public abstract class BaseSeaTunnelSourceFunction extends 
RichSourceFunction<Row>
     implements CheckpointListener, ResultTypeQueryable<Row>, 
CheckpointedFunction {
-    private static final Logger LOG = 
LoggerFactory.getLogger(SeaTunnelParallelSource.class);
-    protected static final String PARALLEL_SOURCE_STATE_NAME = 
"parallel-source-states";
+    private static final Logger LOG = 
LoggerFactory.getLogger(BaseSeaTunnelSourceFunction.class);
 
     protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
-    protected transient volatile ParallelSource<SeaTunnelRow, ?, ?> 
parallelSource;
+    protected transient volatile BaseSourceFunction<SeaTunnelRow> 
internalSource;
 
-    protected transient ListState<byte[]> sourceState;
-    protected transient volatile List<byte[]> restoredState;
+    protected transient MapState<Integer, List<byte[]>> sourceState;
+    protected transient volatile Map<Integer, List<byte[]>> restoredState = 
new HashMap<>();
 
     /**
      * Flag indicating whether the consumer is still running.
      */
     private volatile boolean running = true;
 
-    private FlinkCheckpointLock checkpointLock = null;
-
-    public SeaTunnelParallelSource(SeaTunnelSource<SeaTunnelRow, ?, ?> source) 
{
-        // TODO: Make sure the source is uncoordinated.
+    public BaseSeaTunnelSourceFunction(SeaTunnelSource<SeaTunnelRow, ?, ?> 
source) {
         this.source = source;
     }
 
     @Override
     public void open(Configuration parameters) throws Exception {
         super.open(parameters);
-        this.parallelSource = new ParallelSource<>(source,
-                restoredState,
-                getRuntimeContext().getNumberOfParallelSubtasks(),
-                getRuntimeContext().getIndexOfThisSubtask());
-        this.parallelSource.open();
+        this.internalSource = createInternalSource();
+        this.internalSource.open();
     }
 
+    protected abstract BaseSourceFunction<SeaTunnelRow> createInternalSource();
+
     @Override
     public void run(SourceFunction.SourceContext<Row> sourceContext) throws 
Exception {
-        checkpointLock = new 
FlinkCheckpointLock(sourceContext.getCheckpointLock(), 
getRuntimeContext().getIndexOfThisSubtask());
-        parallelSource.run(new RowCollector(sourceContext, checkpointLock));
+        internalSource.run(new RowCollector(sourceContext, 
sourceContext.getCheckpointLock()));
     }
 
     @Override
     public void cancel() {
         running = false;
         try {
-            parallelSource.close();
-            if (checkpointLock != null) {
-                checkpointLock.close();
-            }
+            internalSource.close();
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -99,12 +92,12 @@ public class SeaTunnelParallelSource extends 
RichParallelSourceFunction<Row>
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        parallelSource.notifyCheckpointComplete(checkpointId);
+        internalSource.notifyCheckpointComplete(checkpointId);
     }
 
     @Override
     public void notifyCheckpointAborted(long checkpointId) throws Exception {
-        parallelSource.notifyCheckpointAborted(checkpointId);
+        internalSource.notifyCheckpointAborted(checkpointId);
     }
 
     @Override
@@ -120,22 +113,26 @@ public class SeaTunnelParallelSource extends 
RichParallelSourceFunction<Row>
         if (!running) {
             LOG.debug("snapshotState() called on closed source");
         } else {
-            
sourceState.update(parallelSource.snapshotState(snapshotContext.getCheckpointId()));
+            sourceState.clear();
+            
sourceState.putAll(internalSource.snapshotState(snapshotContext.getCheckpointId()));
         }
     }
 
     @Override
     public void initializeState(FunctionInitializationContext 
initializeContext) throws Exception {
-        OperatorStateStore stateStore = 
initializeContext.getOperatorStateStore();
-        this.sourceState = stateStore.getListState(new 
ListStateDescriptor<>(PARALLEL_SOURCE_STATE_NAME, 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
+        this.sourceState = initializeContext.getKeyedStateStore()
+            .getMapState(new MapStateDescriptor<>(
+                getStateName(),
+                BasicTypeInfo.INT_TYPE_INFO,
+                
Types.LIST(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)));
         if (initializeContext.isRestored()) {
-            restoredState = new ArrayList<>();
             // populate actual holder for restored state
-            sourceState.get().forEach(restoredState::add);
-
+            sourceState.entries().forEach(entry -> 
restoredState.put(entry.getKey(), entry.getValue()));
             LOG.info("Consumer subtask {} restored state", 
getRuntimeContext().getIndexOfThisSubtask());
         } else {
             LOG.info("Consumer subtask {} has no restore state.", 
getRuntimeContext().getIndexOfThisSubtask());
         }
     }
+
+    protected abstract String getStateName();
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkCheckpointLock.java
 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkCheckpointLock.java
deleted file mode 100644
index 59c10514..00000000
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkCheckpointLock.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.source;
-
-import org.apache.seatunnel.api.state.CheckpointLock;
-import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
-
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class FlinkCheckpointLock implements CheckpointLock, AutoCloseable {
-
-    private static final Integer LOOP_INTERVAL = 10;
-    private final Object flinkLock;
-    private final AtomicInteger lock = new AtomicInteger(0);
-    private volatile boolean engineLock = false;
-    private volatile boolean running = true;
-    private final ScheduledThreadPoolExecutor executor;
-    public FlinkCheckpointLock(Object flinkLock, int subtaskId) {
-        this.flinkLock = flinkLock;
-        this.executor = 
ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(1, 
String.format("checkpoint-lock-monitor-%s", subtaskId));
-        executor.execute(this::monitorLock);
-    }
-
-    private void monitorLock() {
-        if (!running) {
-            return;
-        }
-        while (lock.get() == 0 && running) {
-            sleep();
-        }
-        if (!running) {
-            return;
-        }
-        synchronized (flinkLock) {
-            engineLock = true;
-            while (lock.get() != 0 && running) {
-                sleep();
-            }
-        }
-        engineLock = false;
-        monitorLock();
-    }
-
-    @Override
-    public void lock() {
-        lock.incrementAndGet();
-        while (!engineLock) {
-            sleep();
-        }
-    }
-
-    @Override
-    public void unlock() {
-        int num = lock.decrementAndGet();
-        while (engineLock && num == 0) {
-            sleep();
-        }
-    }
-
-    private void sleep() {
-        try {
-            Thread.sleep(LOOP_INTERVAL);
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        running = false;
-        executor.shutdown();
-    }
-}
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
index 1541cf31..f9727926 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.translation.flink.source;
 
 import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.state.CheckpointLock;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import 
org.apache.seatunnel.translation.flink.serialization.FlinkRowSerialization;
 
@@ -31,9 +30,9 @@ public class RowCollector implements Collector<SeaTunnelRow> {
 
     protected final SourceFunction.SourceContext<Row> internalCollector;
     protected final FlinkRowSerialization rowSerialization = new 
FlinkRowSerialization();
-    protected final CheckpointLock checkpointLock;
+    protected final Object checkpointLock;
 
-    public RowCollector(SourceFunction.SourceContext<Row> internalCollector, 
CheckpointLock checkpointLock) {
+    public RowCollector(SourceFunction.SourceContext<Row> internalCollector, 
Object checkpointLock) {
         this.internalCollector = internalCollector;
         this.checkpointLock = checkpointLock;
     }
@@ -48,7 +47,7 @@ public class RowCollector implements Collector<SeaTunnelRow> {
     }
 
     @Override
-    public CheckpointLock getCheckpointLock() {
+    public Object getCheckpointLock() {
         return this.checkpointLock;
     }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
new file mode 100644
index 00000000..a6992341
--- /dev/null
+++ 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.source;
+
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.translation.source.BaseSourceFunction;
+import org.apache.seatunnel.translation.source.CoordinatedSource;
+
+public class SeaTunnelCoordinatedSource extends BaseSeaTunnelSourceFunction {
+
+    protected static final String COORDINATED_SOURCE_STATE_NAME = 
"coordinated-source-states";
+
+    public SeaTunnelCoordinatedSource(SeaTunnelSource<SeaTunnelRow, ?, ?> 
source) {
+        // TODO: Make sure the source is coordinated.
+        super(source);
+    }
+
+    @Override
+    protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
+        return new CoordinatedSource<>(source, restoredState, 
getRuntimeContext().getNumberOfParallelSubtasks());
+    }
+
+    @Override
+    protected String getStateName() {
+        return COORDINATED_SOURCE_STATE_NAME;
+    }
+}
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
index 48374567..c3f8bfeb 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
@@ -19,123 +19,31 @@ package org.apache.seatunnel.translation.flink.source;
 
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
-import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
+import org.apache.seatunnel.translation.source.BaseSourceFunction;
 import org.apache.seatunnel.translation.source.ParallelSource;
 
-import org.apache.flink.api.common.state.CheckpointListener;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+public class SeaTunnelParallelSource extends BaseSeaTunnelSourceFunction 
implements ParallelSourceFunction<Row> {
 
-public class SeaTunnelParallelSource extends RichParallelSourceFunction<Row>
-    implements CheckpointListener, ResultTypeQueryable<Row>, 
CheckpointedFunction {
-    private static final Logger LOG = 
LoggerFactory.getLogger(SeaTunnelParallelSource.class);
     protected static final String PARALLEL_SOURCE_STATE_NAME = 
"parallel-source-states";
 
-    protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
-    protected transient volatile ParallelSource<SeaTunnelRow, ?, ?> 
parallelSource;
-
-    protected transient ListState<byte[]> sourceState;
-    protected transient volatile List<byte[]> restoredState;
-
-    /**
-     * Flag indicating whether the consumer is still running.
-     */
-    private volatile boolean running = true;
-
-    private FlinkCheckpointLock checkpointLock = null;
-
     public SeaTunnelParallelSource(SeaTunnelSource<SeaTunnelRow, ?, ?> source) 
{
         // TODO: Make sure the source is uncoordinated.
-        this.source = source;
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
-        this.parallelSource = new ParallelSource<>(source,
-                restoredState,
-                getRuntimeContext().getNumberOfParallelSubtasks(),
-                getRuntimeContext().getIndexOfThisSubtask());
-        this.parallelSource.open();
-    }
-
-    @Override
-    public void run(SourceFunction.SourceContext<Row> sourceContext) throws 
Exception {
-        checkpointLock = new 
FlinkCheckpointLock(sourceContext.getCheckpointLock(), 
getRuntimeContext().getIndexOfThisSubtask());
-        parallelSource.run(new RowCollector(sourceContext, checkpointLock));
+        super(source);
     }
 
     @Override
-    public void cancel() {
-        running = false;
-        try {
-            parallelSource.close();
-            if (checkpointLock != null) {
-                checkpointLock.close();
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
+    protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
+        return new ParallelSource<>(source,
+            restoredState,
+            getRuntimeContext().getNumberOfParallelSubtasks(),
+            getRuntimeContext().getIndexOfThisSubtask());
     }
 
     @Override
-    public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        parallelSource.notifyCheckpointComplete(checkpointId);
-    }
-
-    @Override
-    public void notifyCheckpointAborted(long checkpointId) throws Exception {
-        parallelSource.notifyCheckpointAborted(checkpointId);
-    }
-
-    @Override
-    public TypeInformation<Row> getProducedType() {
-        SeaTunnelRowTypeInfo rowTypeInfo = source.getRowTypeInfo();
-        TypeInformation<?>[] typeInformation = 
Arrays.stream(rowTypeInfo.getSeaTunnelDataTypes())
-            
.map(TypeConverterUtils::convertType).toArray(TypeInformation[]::new);
-        return new RowTypeInfo(typeInformation, rowTypeInfo.getFieldNames());
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext snapshotContext) throws 
Exception {
-        if (!running) {
-            LOG.debug("snapshotState() called on closed source");
-        } else {
-            
sourceState.update(parallelSource.snapshotState(snapshotContext.getCheckpointId()));
-        }
-    }
-
-    @Override
-    public void initializeState(FunctionInitializationContext 
initializeContext) throws Exception {
-        OperatorStateStore stateStore = 
initializeContext.getOperatorStateStore();
-        this.sourceState = stateStore.getListState(new 
ListStateDescriptor<>(PARALLEL_SOURCE_STATE_NAME, 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
-        if (initializeContext.isRestored()) {
-            restoredState = new ArrayList<>();
-            // populate actual holder for restored state
-            sourceState.get().forEach(restoredState::add);
-
-            LOG.info("Consumer subtask {} restored state", 
getRuntimeContext().getIndexOfThisSubtask());
-        } else {
-            LOG.info("Consumer subtask {} has no restore state.", 
getRuntimeContext().getIndexOfThisSubtask());
-        }
+    protected String getStateName() {
+        return PARALLEL_SOURCE_STATE_NAME;
     }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/InternalRowCollector.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/InternalRowCollector.java
index b5678eb8..88e1da93 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/InternalRowCollector.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/InternalRowCollector.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.translation.spark.source;
 
 import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.state.CheckpointLock;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import 
org.apache.seatunnel.translation.spark.serialization.InternalRowSerialization;
 
@@ -27,10 +26,10 @@ import org.apache.spark.sql.types.StructType;
 
 public class InternalRowCollector implements Collector<SeaTunnelRow> {
     private final Handover<InternalRow> handover;
-    private final CheckpointLock checkpointLock;
+    private final Object checkpointLock;
     private final InternalRowSerialization rowSerialization;
 
-    public InternalRowCollector(Handover<InternalRow> handover, CheckpointLock 
checkpointLock, StructType sparkSchema) {
+    public InternalRowCollector(Handover<InternalRow> handover, Object 
checkpointLock, StructType sparkSchema) {
         this.handover = handover;
         this.checkpointLock = checkpointLock;
         this.rowSerialization = new InternalRowSerialization(sparkSchema);
@@ -39,15 +38,16 @@ public class InternalRowCollector implements 
Collector<SeaTunnelRow> {
     @Override
     public void collect(SeaTunnelRow record) {
         try {
-            // TODO: Lock InternalRowCollector while checkpoint is running
-            handover.produce(rowSerialization.serialize(record));
+            synchronized (checkpointLock) {
+                handover.produce(rowSerialization.serialize(record));
+            }
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
 
     @Override
-    public CheckpointLock getCheckpointLock() {
+    public Object getCheckpointLock() {
         return this.checkpointLock;
     }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/ReaderState.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/ReaderState.java
index 45765a36..5eed6425 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/ReaderState.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/ReaderState.java
@@ -20,19 +20,20 @@ package org.apache.seatunnel.translation.spark.source;
 import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
 
 import java.util.List;
+import java.util.Map;
 
 public class ReaderState implements PartitionOffset {
-    private final List<byte[]> bytes;
+    private final Map<Integer, List<byte[]>> bytes;
     private final Integer subtaskId;
     private final Integer checkpointId;
 
-    public ReaderState(List<byte[]> bytes, Integer subtaskId, Integer 
checkpointId) {
+    public ReaderState(Map<Integer, List<byte[]>> bytes, Integer subtaskId, 
Integer checkpointId) {
         this.bytes = bytes;
         this.subtaskId = subtaskId;
         this.checkpointId = checkpointId;
     }
 
-    public List<byte[]> getBytes() {
+    public Map<Integer, List<byte[]>> getBytes() {
         return bytes;
     }
 
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
index 09b969b1..5d388e82 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
@@ -20,9 +20,9 @@ package org.apache.seatunnel.translation.spark.source;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.utils.SerializationUtils;
-import 
org.apache.seatunnel.translation.spark.source.batch.BatchParallelSourceReader;
-import 
org.apache.seatunnel.translation.spark.source.continnous.ContinuousParallelSourceReader;
-import 
org.apache.seatunnel.translation.spark.source.micro.MicroBatchParallelSourceReader;
+import org.apache.seatunnel.translation.spark.source.batch.BatchSourceReader;
+import 
org.apache.seatunnel.translation.spark.source.continnous.ContinuousSourceReader;
+import 
org.apache.seatunnel.translation.spark.source.micro.MicroBatchSourceReader;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -38,11 +38,13 @@ import 
org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
 import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
 import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Optional;
 
 public class SeaTunnelSourceSupport implements DataSourceV2, ReadSupport, 
MicroBatchReadSupport, ContinuousReadSupport, DataSourceRegister {
-
+    private static final Logger LOG = 
LoggerFactory.getLogger(SeaTunnelSourceSupport.class);
     public static final String SEA_TUNNEL_SOURCE_NAME = "SeaTunnelSource";
     public static final Integer CHECKPOINT_INTERVAL_DEFAULT = 10000;
 
@@ -54,9 +56,8 @@ public class SeaTunnelSourceSupport implements DataSourceV2, 
ReadSupport, MicroB
     @Override
     public DataSourceReader createReader(StructType rowType, DataSourceOptions 
options) {
         SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = 
getSeaTunnelSource(options);
-        Integer parallelism = options.getInt("source.parallelism", 1);
-        // TODO: case coordinated source
-        return new BatchParallelSourceReader(seaTunnelSource, parallelism, 
rowType);
+        int parallelism = options.getInt("source.parallelism", 1);
+        return new BatchSourceReader(seaTunnelSource, parallelism, rowType);
     }
 
     @Override
@@ -73,10 +74,9 @@ public class SeaTunnelSourceSupport implements DataSourceV2, 
ReadSupport, MicroB
         String checkpointPath = StringUtils.replacePattern(checkpointLocation, 
"sources/\\d+", "sources-state");
         Configuration configuration = 
SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration();
         String hdfsRoot = 
options.get("hdfs.root").orElse(FileSystem.getDefaultUri(configuration).toString());
-        String hdfsUser = options.get("hdfs.user").orElseThrow(() -> new 
UnsupportedOperationException("HDFS user is required."));
+        String hdfsUser = options.get("hdfs.user").orElse("");
         Integer checkpointId = options.getInt("checkpoint.id", 1);
-        // TODO: case coordinated source
-        return new MicroBatchParallelSourceReader(seaTunnelSource, 
parallelism, checkpointId, checkpointInterval, checkpointPath, hdfsRoot, 
hdfsUser, rowType);
+        return new MicroBatchSourceReader(seaTunnelSource, parallelism, 
checkpointId, checkpointInterval, checkpointPath, hdfsRoot, hdfsUser, rowType);
     }
 
     @Override
@@ -84,8 +84,7 @@ public class SeaTunnelSourceSupport implements DataSourceV2, 
ReadSupport, MicroB
         StructType rowType = checkRowType(rowTypeOptional);
         SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = 
getSeaTunnelSource(options);
         Integer parallelism = options.getInt("source.parallelism", 1);
-        // TODO: case coordinated source
-        return new ContinuousParallelSourceReader(seaTunnelSource, 
parallelism, rowType);
+        return new ContinuousSourceReader(seaTunnelSource, parallelism, 
rowType);
     }
 
     private SeaTunnelSource<SeaTunnelRow, ?, ?> 
getSeaTunnelSource(DataSourceOptions options) {
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartition.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartition.java
index 321f8ae8..d8c050c1 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartition.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartition.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.translation.spark.source.batch;
 
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SupportCoordinate;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -40,6 +41,10 @@ public class BatchPartition implements 
InputPartition<InternalRow> {
 
     @Override
     public InputPartitionReader<InternalRow> createPartitionReader() {
-        return new BatchPartitionReader(source, parallelism, subtaskId, 
rowType);
+        if (source instanceof SupportCoordinate) {
+            return new CoordinatedBatchPartitionReader(source, parallelism, 
subtaskId, rowType);
+        } else {
+            return new ParallelBatchPartitionReader(source, parallelism, 
subtaskId, rowType);
+        }
     }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchParallelSourceReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchSourceReader.java
similarity index 72%
rename from 
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchParallelSourceReader.java
rename to 
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchSourceReader.java
index 0498bf28..d0a26d80 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchParallelSourceReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchSourceReader.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.translation.spark.source.batch;
 
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SupportCoordinate;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -28,13 +29,13 @@ import org.apache.spark.sql.types.StructType;
 import java.util.ArrayList;
 import java.util.List;
 
-public class BatchParallelSourceReader implements DataSourceReader {
+public class BatchSourceReader implements DataSourceReader {
 
     protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
     protected final Integer parallelism;
     protected final StructType rowType;
 
-    public BatchParallelSourceReader(SeaTunnelSource<SeaTunnelRow, ?, ?> 
source, Integer parallelism, StructType rowType) {
+    public BatchSourceReader(SeaTunnelSource<SeaTunnelRow, ?, ?> source, 
Integer parallelism, StructType rowType) {
         this.source = source;
         this.parallelism = parallelism;
         this.rowType = rowType;
@@ -47,9 +48,15 @@ public class BatchParallelSourceReader implements 
DataSourceReader {
 
     @Override
     public List<InputPartition<InternalRow>> planInputPartitions() {
-        List<InputPartition<InternalRow>> virtualPartitions = new 
ArrayList<>(parallelism);
-        for (int subtaskId = 0; subtaskId < parallelism; subtaskId++) {
-            virtualPartitions.add(new BatchPartition(source, parallelism, 
subtaskId, rowType));
+        List<InputPartition<InternalRow>> virtualPartitions;
+        if (source instanceof SupportCoordinate) {
+            virtualPartitions = new ArrayList<>(1);
+            virtualPartitions.add(new BatchPartition(source, parallelism, 0, 
rowType));
+        } else {
+            virtualPartitions = new ArrayList<>(parallelism);
+            for (int subtaskId = 0; subtaskId < parallelism; subtaskId++) {
+                virtualPartitions.add(new BatchPartition(source, parallelism, 
subtaskId, rowType));
+            }
         }
         return virtualPartitions;
     }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
new file mode 100644
index 00000000..00c2cd71
--- /dev/null
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.source.batch;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.translation.source.BaseSourceFunction;
+import org.apache.seatunnel.translation.source.CoordinatedSource;
+import org.apache.seatunnel.translation.spark.source.InternalRowCollector;
+
+import org.apache.spark.sql.types.StructType;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CoordinatedBatchPartitionReader extends 
ParallelBatchPartitionReader {
+
+    protected final Map<Integer, InternalRowCollector> collectorMap;
+
+    public CoordinatedBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> 
source, Integer parallelism, Integer subtaskId, StructType rowType) {
+        super(source, parallelism, subtaskId, rowType);
+        this.collectorMap = new HashMap<>(parallelism);
+        for (int i = 0; i < parallelism; i++) {
+            collectorMap.put(i, new InternalRowCollector(handover, new 
Object(), rowType));
+        }
+    }
+
+    @Override
+    protected String getEnumeratorThreadName() {
+        return "coordinated-split-enumerator-executor";
+    }
+
+    @Override
+    protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
+        return new InternalCoordinatedSource<>(source,
+            null,
+            parallelism);
+    }
+
+    public class InternalCoordinatedSource<SplitT extends SourceSplit, StateT> 
extends CoordinatedSource<SeaTunnelRow, SplitT, StateT> {
+        protected final AtomicInteger completedReader = new AtomicInteger(0);
+
+        public InternalCoordinatedSource(SeaTunnelSource<SeaTunnelRow, SplitT, 
StateT> source, Map<Integer, List<byte[]>> restoredState, int parallelism) {
+            super(source, restoredState, parallelism);
+        }
+
+        @Override
+        public void run(Collector<SeaTunnelRow> collector) throws Exception {
+            readerMap.entrySet().parallelStream().forEach(entry -> {
+                final AtomicBoolean flag = 
readerRunningMap.get(entry.getKey());
+                final SourceReader<SeaTunnelRow, SplitT> reader = 
entry.getValue();
+                final Collector<SeaTunnelRow> rowCollector = 
collectorMap.get(entry.getKey());
+                executorService.execute(() -> {
+                    while (flag.get()) {
+                        try {
+                            reader.pollNext(rowCollector);
+                        } catch (Exception e) {
+                            this.running = false;
+                            flag.set(false);
+                            throw new RuntimeException(e);
+                        }
+                    }
+                });
+            });
+            splitEnumerator.run();
+            while (this.running) {
+                Thread.sleep(SLEEP_TIME_INTERVAL);
+            }
+        }
+
+        @Override
+        protected void handleNoMoreElement(int subtaskId) {
+            super.handleNoMoreElement(subtaskId);
+            if (completedReader.incrementAndGet() == this.parallelism) {
+                CoordinatedBatchPartitionReader.this.running = false;
+            }
+        }
+    }
+}
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/ParallelBatchPartitionReader.java
similarity index 76%
rename from 
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
rename to 
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/ParallelBatchPartitionReader.java
index 9f5beeb5..44320228 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/ParallelBatchPartitionReader.java
@@ -17,14 +17,13 @@
 
 package org.apache.seatunnel.translation.spark.source.batch;
 
-import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.translation.source.BaseSourceFunction;
 import org.apache.seatunnel.translation.source.ParallelSource;
 import org.apache.seatunnel.translation.spark.source.Handover;
 import org.apache.seatunnel.translation.spark.source.InternalRowCollector;
-import org.apache.seatunnel.translation.state.EmptyLock;
 import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
 
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -35,11 +34,12 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
-public class BatchPartitionReader implements InputPartitionReader<InternalRow> 
{
+public class ParallelBatchPartitionReader implements 
InputPartitionReader<InternalRow> {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(BatchPartitionReader.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ParallelBatchPartitionReader.class);
 
     protected static final Integer INTERVAL = 100;
 
@@ -51,21 +51,26 @@ public class BatchPartitionReader implements 
InputPartitionReader<InternalRow> {
     protected final ExecutorService executorService;
     protected final Handover<InternalRow> handover;
 
+    protected final Object checkpointLock = new Object();
+
     protected volatile boolean running = true;
     protected volatile boolean prepare = true;
 
-    protected volatile ParallelSource<SeaTunnelRow, ?, ?> parallelSource;
-    protected volatile Collector<SeaTunnelRow> collector;
+    protected volatile BaseSourceFunction<SeaTunnelRow> internalSource;
 
-    public BatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> source, 
Integer parallelism, Integer subtaskId, StructType rowType) {
+    public ParallelBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> 
source, Integer parallelism, Integer subtaskId, StructType rowType) {
         this.source = source;
         this.parallelism = parallelism;
         this.subtaskId = subtaskId;
         this.rowType = rowType;
-        this.executorService = 
ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(1, 
String.format("parallel-split-enumerator-executor-%s", subtaskId));
+        this.executorService = 
ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(1, 
getEnumeratorThreadName());
         this.handover = new Handover<>();
     }
 
+    protected String getEnumeratorThreadName() {
+        return String.format("parallel-split-enumerator-executor-%s", 
subtaskId);
+    }
+
     @Override
     public boolean next() throws IOException {
         prepare();
@@ -83,31 +88,27 @@ public class BatchPartitionReader implements 
InputPartitionReader<InternalRow> {
         if (!prepare) {
             return;
         }
-        this.collector = createCollector();
-        this.parallelSource = createParallelSource();
+
+        this.internalSource = createInternalSource();
         try {
-            this.parallelSource.open();
+            this.internalSource.open();
         } catch (Exception e) {
             running = false;
             throw new RuntimeException("");
         }
         executorService.execute(() -> {
             try {
-                parallelSource.run(collector);
+                internalSource.run(new InternalRowCollector(handover, 
checkpointLock, rowType));
             } catch (Exception e) {
                 handover.reportError(e);
-                LOGGER.error("ParallelSource execute failed.", e);
+                LOGGER.error("BatchPartitionReader execute failed.", e);
                 running = false;
             }
         });
         prepare = false;
     }
 
-    protected Collector<SeaTunnelRow> createCollector() {
-        return new InternalRowCollector(handover, new EmptyLock(), rowType);
-    }
-
-    protected ParallelSource<SeaTunnelRow, ?, ?> createParallelSource() {
+    protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
         return new InternalParallelSource<>(source,
                 null,
                 parallelism,
@@ -126,13 +127,17 @@ public class BatchPartitionReader implements 
InputPartitionReader<InternalRow> {
     @Override
     public void close() throws IOException {
         running = false;
-        parallelSource.close();
+        try {
+            internalSource.close();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
         executorService.shutdown();
     }
 
     public class InternalParallelSource<SplitT extends SourceSplit, StateT> 
extends ParallelSource<SeaTunnelRow, SplitT, StateT> {
 
-        public InternalParallelSource(SeaTunnelSource<SeaTunnelRow, SplitT, 
StateT> source, List<byte[]> restoredState, int parallelism, int subtaskId) {
+        public InternalParallelSource(SeaTunnelSource<SeaTunnelRow, SplitT, 
StateT> source, Map<Integer, List<byte[]>> restoredState, int parallelism, int 
subtaskId) {
             super(source, restoredState, parallelism, subtaskId);
         }
 
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java
index 52f0a9d9..e0c8c915 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java
@@ -26,6 +26,7 @@ import 
org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 import org.apache.spark.sql.types.StructType;
 
 import java.util.List;
+import java.util.Map;
 
 public class ContinuousPartition implements InputPartition<InternalRow> {
     protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
@@ -33,14 +34,14 @@ public class ContinuousPartition implements 
InputPartition<InternalRow> {
     protected final Integer subtaskId;
     protected final StructType rowType;
     protected final Integer checkpointId;
-    protected final List<byte[]> restoredState;
+    protected final Map<Integer, List<byte[]>> restoredState;
 
     public ContinuousPartition(SeaTunnelSource<SeaTunnelRow, ?, ?> source,
                                Integer parallelism,
                                Integer subtaskId,
                                StructType rowType,
                                Integer checkpointId,
-                               List<byte[]> restoredState) {
+                               Map<Integer, List<byte[]>> restoredState) {
         this.source = source;
         this.parallelism = parallelism;
         this.subtaskId = subtaskId;
@@ -51,6 +52,6 @@ public class ContinuousPartition implements 
InputPartition<InternalRow> {
 
     @Override
     public InputPartitionReader<InternalRow> createPartitionReader() {
-        return new ContinuousPartitionReader(source, parallelism, subtaskId, 
rowType, checkpointId, restoredState);
+        return new ParallelContinuousPartitionReader(source, parallelism, 
subtaskId, rowType, checkpointId, restoredState);
     }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousParallelSourceReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousSourceReader.java
similarity index 92%
rename from 
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousParallelSourceReader.java
rename to 
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousSourceReader.java
index 1c1f05ac..d4f1db31 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousParallelSourceReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousSourceReader.java
@@ -35,19 +35,20 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
-public class ContinuousParallelSourceReader implements ContinuousReader {
+public class ContinuousSourceReader implements ContinuousReader {
 
     private final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
     private final Integer parallelism;
     private final StructType rowType;
-    private Map<Integer, ReaderState> readerStateMap = new HashMap<>();
+    private final Map<Integer, ReaderState> readerStateMap = new HashMap<>();
     private CoordinationState coordinationState;
     private int checkpointId = 1;
 
-    public ContinuousParallelSourceReader(SeaTunnelSource<SeaTunnelRow, ?, ?> 
source, Integer parallelism, StructType rowType) {
+    public ContinuousSourceReader(SeaTunnelSource<SeaTunnelRow, ?, ?> source, 
Integer parallelism, StructType rowType) {
         this.source = source;
         this.parallelism = parallelism;
         this.rowType = rowType;
+        throw new UnsupportedOperationException("Continuous source is not 
currently supported.");
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ParallelContinuousPartitionReader.java
similarity index 75%
rename from 
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartitionReader.java
rename to 
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ParallelContinuousPartitionReader.java
index 180d3ef2..1276421a 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ParallelContinuousPartitionReader.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.translation.source.ParallelSource;
 import org.apache.seatunnel.translation.spark.source.ReaderState;
-import 
org.apache.seatunnel.translation.spark.source.batch.BatchPartitionReader;
+import 
org.apache.seatunnel.translation.spark.source.batch.ParallelBatchPartitionReader;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import 
org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader;
@@ -30,19 +30,20 @@ import org.apache.spark.sql.types.StructType;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
-public class ContinuousPartitionReader extends BatchPartitionReader implements 
ContinuousInputPartitionReader<InternalRow> {
+public class ParallelContinuousPartitionReader extends 
ParallelBatchPartitionReader implements 
ContinuousInputPartitionReader<InternalRow> {
     protected volatile Integer checkpointId;
-    protected final List<byte[]> restoredState;
+    protected final Map<Integer, List<byte[]>> restoredState;
 
-    public ContinuousPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> 
source, Integer parallelism, Integer subtaskId, StructType rowType, Integer 
checkpointId, List<byte[]> restoredState) {
+    public ParallelContinuousPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, 
?> source, Integer parallelism, Integer subtaskId, StructType rowType, Integer 
checkpointId, Map<Integer, List<byte[]>> restoredState) {
         super(source, parallelism, subtaskId, rowType);
         this.checkpointId = checkpointId;
         this.restoredState = restoredState;
     }
 
     @Override
-    protected ParallelSource<SeaTunnelRow, ?, ?> createParallelSource() {
+    protected ParallelSource<SeaTunnelRow, ?, ?> createInternalSource() {
         return new InternalParallelSource<>(source,
                 restoredState,
                 parallelism,
@@ -51,9 +52,9 @@ public class ContinuousPartitionReader extends 
BatchPartitionReader implements C
 
     @Override
     public PartitionOffset getOffset() {
-        List<byte[]> bytes;
+        Map<Integer, List<byte[]>> bytes;
         try {
-            bytes = parallelSource.snapshotState(checkpointId);
+            bytes = internalSource.snapshotState(checkpointId);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -66,7 +67,7 @@ public class ContinuousPartitionReader extends 
BatchPartitionReader implements C
      * The method is called by RPC
      */
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        parallelSource.notifyCheckpointComplete(checkpointId);
+        internalSource.notifyCheckpointComplete(checkpointId);
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
new file mode 100644
index 00000000..c2482588
--- /dev/null
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.source.micro;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.translation.source.BaseSourceFunction;
+import org.apache.seatunnel.translation.source.CoordinatedSource;
+import org.apache.seatunnel.translation.spark.source.InternalRowCollector;
+import org.apache.seatunnel.translation.spark.source.ReaderState;
+
+import org.apache.spark.sql.types.StructType;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CoordinatedMicroBatchPartitionReader extends 
ParallelMicroBatchPartitionReader {
+    protected final Map<Integer, InternalRowCollector> collectorMap;
+
+    public CoordinatedMicroBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, 
?, ?> source, Integer parallelism, Integer subtaskId, StructType rowType, 
Integer checkpointId, Integer checkpointInterval, String checkpointPath, String 
hdfsRoot, String hdfsUser) {
+        super(source, parallelism, subtaskId, rowType, checkpointId, 
checkpointInterval, checkpointPath, hdfsRoot, hdfsUser);
+        this.collectorMap = new HashMap<>(parallelism);
+        for (int i = 0; i < parallelism; i++) {
+            collectorMap.put(i, new InternalRowCollector(handover, new 
Object(), rowType));
+        }
+    }
+
+    @Override
+    public void virtualCheckpoint() {
+        try {
+            internalCheckpoint(collectorMap.values().iterator(), 0);
+        } catch (Exception e) {
+            throw new RuntimeException("An error occurred in virtual 
checkpoint execution.", e);
+        }
+    }
+
+    private void internalCheckpoint(Iterator<InternalRowCollector> iterator, 
int loop) throws Exception {
+        if (!iterator.hasNext()) {
+            return;
+        }
+        synchronized (iterator.next().getCheckpointLock()) {
+            internalCheckpoint(iterator, ++loop);
+            if (loop != this.parallelism) {
+                // Avoid backtracking calls
+                return;
+            }
+            while (!handover.isEmpty()) {
+                Thread.sleep(CHECKPOINT_SLEEP_INTERVAL);
+            }
+            // Block #next() method
+            synchronized (handover) {
+                final int currentCheckpoint = checkpointId;
+                ReaderState readerState = snapshotState();
+                saveState(readerState, currentCheckpoint);
+                internalSource.notifyCheckpointComplete(currentCheckpoint);
+                running = false;
+            }
+        }
+    }
+
+    @Override
+    protected String getEnumeratorThreadName() {
+        return "coordinated-split-enumerator-executor";
+    }
+
+    @Override
+    protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
+        return new InternalCoordinatedSource<>(source,
+            null,
+            parallelism);
+    }
+
+    public class InternalCoordinatedSource<SplitT extends SourceSplit, StateT> 
extends CoordinatedSource<SeaTunnelRow, SplitT, StateT> {
+        protected final AtomicInteger completedReader = new AtomicInteger(0);
+
+        public InternalCoordinatedSource(SeaTunnelSource<SeaTunnelRow, SplitT, 
StateT> source, Map<Integer, List<byte[]>> restoredState, int parallelism) {
+            super(source, restoredState, parallelism);
+        }
+
+        @Override
+        public void run(Collector<SeaTunnelRow> collector) throws Exception {
+            readerMap.entrySet().parallelStream().forEach(entry -> {
+                final AtomicBoolean flag = 
readerRunningMap.get(entry.getKey());
+                final SourceReader<SeaTunnelRow, SplitT> reader = 
entry.getValue();
+                final Collector<SeaTunnelRow> rowCollector = 
collectorMap.get(entry.getKey());
+                executorService.execute(() -> {
+                    while (flag.get()) {
+                        try {
+                            reader.pollNext(rowCollector);
+                        } catch (Exception e) {
+                            this.running = false;
+                            flag.set(false);
+                            throw new RuntimeException(e);
+                        }
+                    }
+                });
+            });
+            splitEnumerator.run();
+            while (this.running) {
+                Thread.sleep(SLEEP_TIME_INTERVAL);
+            }
+        }
+
+        @Override
+        protected void handleNoMoreElement(int subtaskId) {
+            super.handleNoMoreElement(subtaskId);
+            if (completedReader.incrementAndGet() == this.parallelism) {
+                CoordinatedMicroBatchPartitionReader.this.running = false;
+            }
+        }
+    }
+}
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartition.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartition.java
index 82457f82..e0636d2a 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartition.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartition.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.translation.spark.source.micro;
 
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SupportCoordinate;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -58,6 +59,10 @@ public class MicroBatchPartition implements 
InputPartition<InternalRow> {
 
     @Override
     public InputPartitionReader<InternalRow> createPartitionReader() {
-        return new MicroBatchPartitionReader(source, parallelism, subtaskId, 
rowType, checkpointId, checkpointInterval, checkpointPath, hdfsRoot, hdfsUser);
+        if (source instanceof SupportCoordinate) {
+            return new CoordinatedMicroBatchPartitionReader(source, 
parallelism, subtaskId, rowType, checkpointId, checkpointInterval, 
checkpointPath, hdfsRoot, hdfsUser);
+        } else {
+            return new ParallelMicroBatchPartitionReader(source, parallelism, 
subtaskId, rowType, checkpointId, checkpointInterval, checkpointPath, hdfsRoot, 
hdfsUser);
+        }
     }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchSourceReader.java
similarity index 77%
rename from 
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java
rename to 
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchSourceReader.java
index 0e12d6d2..a2128251 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchSourceReader.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.translation.spark.source.micro;
 
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SupportCoordinate;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 
@@ -31,7 +32,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 
-public class MicroBatchParallelSourceReader implements MicroBatchReader {
+public class MicroBatchSourceReader implements MicroBatchReader {
 
     protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
     protected final Integer parallelism;
@@ -45,7 +46,7 @@ public class MicroBatchParallelSourceReader implements 
MicroBatchReader {
     protected MicroBatchState startOffset;
     protected MicroBatchState endOffset;
 
-    public MicroBatchParallelSourceReader(SeaTunnelSource<SeaTunnelRow, ?, ?> 
source, Integer parallelism, Integer checkpointId, Integer checkpointInterval, 
String checkpointPath, String hdfsRoot, String hdfsUser, StructType rowType) {
+    public MicroBatchSourceReader(SeaTunnelSource<SeaTunnelRow, ?, ?> source, 
Integer parallelism, Integer checkpointId, Integer checkpointInterval, String 
checkpointPath, String hdfsRoot, String hdfsUser, StructType rowType) {
         this.source = source;
         this.parallelism = parallelism;
         this.checkpointId = checkpointId;
@@ -95,9 +96,15 @@ public class MicroBatchParallelSourceReader implements 
MicroBatchReader {
 
     @Override
     public List<InputPartition<InternalRow>> planInputPartitions() {
-        List<InputPartition<InternalRow>> virtualPartitions = new 
ArrayList<>(parallelism);
-        for (int subtaskId = 0; subtaskId < parallelism; subtaskId++) {
-            virtualPartitions.add(new MicroBatchPartition(source, parallelism, 
subtaskId, rowType, checkpointId, checkpointInterval, checkpointPath, hdfsRoot, 
hdfsUser));
+        List<InputPartition<InternalRow>> virtualPartitions;
+        if (source instanceof SupportCoordinate) {
+            virtualPartitions = new ArrayList<>(1);
+            virtualPartitions.add(new MicroBatchPartition(source, parallelism, 
0, rowType, checkpointId, checkpointInterval, checkpointPath, hdfsRoot, 
hdfsUser));
+        } else {
+            virtualPartitions = new ArrayList<>(parallelism);
+            for (int subtaskId = 0; subtaskId < parallelism; subtaskId++) {
+                virtualPartitions.add(new MicroBatchPartition(source, 
parallelism, subtaskId, rowType, checkpointId, checkpointInterval, 
checkpointPath, hdfsRoot, hdfsUser));
+            }
         }
         checkpointId++;
         return virtualPartitions;
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/ParallelMicroBatchPartitionReader.java
similarity index 70%
rename from 
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartitionReader.java
rename to 
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/ParallelMicroBatchPartitionReader.java
index 28f74e8c..33a1427b 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/ParallelMicroBatchPartitionReader.java
@@ -17,17 +17,15 @@
 
 package org.apache.seatunnel.translation.spark.source.micro;
 
-import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.state.CheckpointLock;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.utils.SerializationUtils;
-import org.apache.seatunnel.translation.source.ParallelSource;
-import org.apache.seatunnel.translation.spark.source.InternalRowCollector;
+import org.apache.seatunnel.translation.source.BaseSourceFunction;
 import org.apache.seatunnel.translation.spark.source.ReaderState;
-import 
org.apache.seatunnel.translation.spark.source.batch.BatchPartitionReader;
+import 
org.apache.seatunnel.translation.spark.source.batch.ParallelBatchPartitionReader;
 import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -39,33 +37,33 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-public class MicroBatchPartitionReader extends BatchPartitionReader {
+public class ParallelMicroBatchPartitionReader extends 
ParallelBatchPartitionReader {
     protected static final Integer CHECKPOINT_SLEEP_INTERVAL = 10;
     protected volatile Integer checkpointId;
-
-    protected final CheckpointLock checkpointLock = new 
SingleReaderCheckpointLock();
     protected final Integer checkpointInterval;
     protected final String checkpointPath;
     protected final String hdfsRoot;
     protected final String hdfsUser;
 
-    protected List<byte[]> restoredState;
+    protected Map<Integer, List<byte[]>> restoredState;
     protected ScheduledThreadPoolExecutor executor;
     protected FileSystem fileSystem;
 
-    public MicroBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> 
source,
-                                     Integer parallelism,
-                                     Integer subtaskId,
-                                     StructType rowType,
-                                     Integer checkpointId,
-                                     Integer checkpointInterval,
-                                     String checkpointPath,
-                                     String hdfsRoot,
-                                     String hdfsUser) {
+    public ParallelMicroBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, 
?> source,
+                                             Integer parallelism,
+                                             Integer subtaskId,
+                                             StructType rowType,
+                                             Integer checkpointId,
+                                             Integer checkpointInterval,
+                                             String checkpointPath,
+                                             String hdfsRoot,
+                                             String hdfsUser) {
         super(source, parallelism, subtaskId, rowType);
         this.checkpointId = checkpointId;
         this.checkpointInterval = checkpointInterval;
@@ -75,24 +73,17 @@ public class MicroBatchPartitionReader extends 
BatchPartitionReader {
     }
 
     @Override
-    protected ParallelSource<SeaTunnelRow, ?, ?> createParallelSource() {
+    protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
         return new InternalParallelSource<>(source,
             restoredState,
             parallelism,
             subtaskId);
     }
 
-    @Override
-    protected Collector<SeaTunnelRow> createCollector() {
-        return new InternalRowCollector(handover, checkpointLock, rowType);
-    }
-
     @Override
     protected void prepare() {
-        Configuration configuration = new Configuration();
-        configuration.set("fs.defaultFS", hdfsRoot);
         try {
-            this.fileSystem = FileSystem.get(new URI(hdfsRoot), configuration, 
hdfsUser);
+            this.fileSystem = getFileSystem();
             this.restoredState = restoreState(checkpointId - 1);
         } catch (Exception e) {
             throw new RuntimeException(e);
@@ -101,10 +92,20 @@ public class MicroBatchPartitionReader extends 
BatchPartitionReader {
         prepareCheckpoint();
     }
 
+    protected FileSystem getFileSystem() throws URISyntaxException, 
IOException, InterruptedException {
+        Configuration configuration = new Configuration();
+        configuration.set("fs.defaultFS", hdfsRoot);
+        if (StringUtils.isNotBlank(hdfsUser)) {
+            return FileSystem.get(new URI(hdfsRoot), configuration, hdfsUser);
+        } else {
+            return FileSystem.get(new URI(hdfsRoot), configuration);
+        }
+    }
+
     protected ReaderState snapshotState() {
-        List<byte[]> bytes;
+        Map<Integer, List<byte[]>> bytes;
         try {
-            bytes = parallelSource.snapshotState(checkpointId);
+            bytes = internalSource.snapshotState(checkpointId);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -117,9 +118,8 @@ public class MicroBatchPartitionReader extends 
BatchPartitionReader {
     }
 
     public void virtualCheckpoint() {
-        checkpointLock.lock();
         try {
-            synchronized (collector) {
+            synchronized (checkpointLock) {
                 while (!handover.isEmpty()) {
                     Thread.sleep(CHECKPOINT_SLEEP_INTERVAL);
                 }
@@ -128,19 +128,17 @@ public class MicroBatchPartitionReader extends 
BatchPartitionReader {
                     final int currentCheckpoint = checkpointId;
                     ReaderState readerState = snapshotState();
                     saveState(readerState, currentCheckpoint);
-                    parallelSource.notifyCheckpointComplete(currentCheckpoint);
+                    internalSource.notifyCheckpointComplete(currentCheckpoint);
                     running = false;
                 }
             }
         } catch (Exception e) {
             throw new RuntimeException("An error occurred in virtual 
checkpoint execution.", e);
-        } finally {
-            checkpointLock.unlock();
         }
     }
 
-    private List<byte[]> restoreState(int checkpointId) throws IOException {
-        Path hdfsPath = new Path(checkpointPath + File.separator + 
checkpointId);
+    private Map<Integer, List<byte[]>> restoreState(int checkpointId) throws 
IOException {
+        Path hdfsPath = getCheckpointPathWithId(checkpointId);
         if (!fileSystem.exists(hdfsPath)) {
             return null;
         }
@@ -159,9 +157,9 @@ public class MicroBatchPartitionReader extends 
BatchPartitionReader {
         }
     }
 
-    private void saveState(ReaderState readerState, int checkpointId) throws 
IOException {
+    protected void saveState(ReaderState readerState, int checkpointId) throws 
IOException {
         byte[] bytes = SerializationUtils.serialize(readerState);
-        Path hdfsPath = new Path(checkpointPath + File.separator + 
checkpointId);
+        Path hdfsPath = getCheckpointPathWithId(checkpointId);
         if (!fileSystem.exists(hdfsPath)) {
             fileSystem.createNewFile(hdfsPath);
         }
@@ -173,6 +171,10 @@ public class MicroBatchPartitionReader extends 
BatchPartitionReader {
         }
     }
 
+    private Path getCheckpointPathWithId(int checkpointId) {
+        return new Path(this.checkpointPath + File.separator + this.subtaskId 
+ File.separator + checkpointId);
+    }
+
     @Override
     public void close() throws IOException {
         fileSystem.close();

Reply via email to