This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 605a0861c [hotfix][api-draft] Coordinated source cannot be stopped in 
offline job (#2040)
605a0861c is described below

commit 605a0861c67c3bfb5d0ecc6456bf1fd7d7a582e5
Author: Zongwen Li <[email protected]>
AuthorDate: Tue Jun 21 17:39:25 2022 +0800

    [hotfix][api-draft] Coordinated source cannot be stopped in offline job 
(#2040)
---
 .../org/apache/seatunnel/translation/source/CoordinatedSource.java  | 6 ++++++
 .../spark/source/batch/CoordinatedBatchPartitionReader.java         | 2 --
 .../spark/source/micro/CoordinatedMicroBatchPartitionReader.java    | 2 --
 3 files changed, 6 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 7c6f6043d..549ce2464 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -36,6 +36,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 public class CoordinatedSource<T, SplitT extends SourceSplit, StateT> 
implements BaseSourceFunction<T> {
@@ -54,6 +55,7 @@ public class CoordinatedSource<T, SplitT extends SourceSplit, 
StateT> implements
     protected transient volatile SourceSplitEnumerator<SplitT, StateT> 
splitEnumerator;
     protected transient Map<Integer, SourceReader<T, SplitT>> readerMap = new 
ConcurrentHashMap<>();
     protected final Map<Integer, AtomicBoolean> readerRunningMap;
+    protected final AtomicInteger completedReader = new AtomicInteger(0);
     protected transient volatile ScheduledThreadPoolExecutor executorService;
 
     /**
@@ -108,6 +110,7 @@ public class CoordinatedSource<T, SplitT extends 
SourceSplit, StateT> implements
         for (int subtaskId = 0; subtaskId < this.parallelism; subtaskId++) {
             CoordinatedReaderContext readerContext = new 
CoordinatedReaderContext(this, source.getBoundedness(), subtaskId);
             readerContextMap.put(subtaskId, readerContext);
+            readerRunningMap.put(subtaskId, new AtomicBoolean(true));
             SourceReader<T, SplitT> reader = 
source.createReader(readerContext);
             readerMap.put(subtaskId, reader);
         }
@@ -231,6 +234,9 @@ public class CoordinatedSource<T, SplitT extends 
SourceSplit, StateT> implements
     protected void handleNoMoreElement(int subtaskId) {
         readerRunningMap.get(subtaskId).set(false);
         readerContextMap.remove(subtaskId);
+        if (completedReader.incrementAndGet() == this.parallelism) {
+            this.running = false;
+        }
     }
 
     protected void handleSplitRequest(int subtaskId) {
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
index 2c00aafe6..235e26a13 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
@@ -32,7 +32,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 public class CoordinatedBatchPartitionReader extends 
ParallelBatchPartitionReader {
 
@@ -59,7 +58,6 @@ public class CoordinatedBatchPartitionReader extends 
ParallelBatchPartitionReade
     }
 
     public class InternalCoordinatedSource<SplitT extends SourceSplit, StateT> 
extends CoordinatedSource<SeaTunnelRow, SplitT, StateT> {
-        protected final AtomicInteger completedReader = new AtomicInteger(0);
 
         public InternalCoordinatedSource(SeaTunnelSource<SeaTunnelRow, SplitT, 
StateT> source, Map<Integer, List<byte[]>> restoredState, int parallelism) {
             super(source, restoredState, parallelism);
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
index 4df4d585b..d5e18baa7 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
@@ -34,7 +34,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 public class CoordinatedMicroBatchPartitionReader extends 
ParallelMicroBatchPartitionReader {
     protected final Map<Integer, InternalRowCollector> collectorMap;
@@ -93,7 +92,6 @@ public class CoordinatedMicroBatchPartitionReader extends 
ParallelMicroBatchPart
     }
 
     public class InternalCoordinatedSource<SplitT extends SourceSplit, StateT> 
extends CoordinatedSource<SeaTunnelRow, SplitT, StateT> {
-        protected final AtomicInteger completedReader = new AtomicInteger(0);
 
         public InternalCoordinatedSource(SeaTunnelSource<SeaTunnelRow, SplitT, 
StateT> source, Map<Integer, List<byte[]>> restoredState, int parallelism) {
             super(source, restoredState, parallelism);

Reply via email to