This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 605a0861c [hotfix][api-draft] Coordinated source cannot be stopped in
offline job (#2040)
605a0861c is described below
commit 605a0861c67c3bfb5d0ecc6456bf1fd7d7a582e5
Author: Zongwen Li <[email protected]>
AuthorDate: Tue Jun 21 17:39:25 2022 +0800
[hotfix][api-draft] Coordinated source cannot be stopped in offline job
(#2040)
---
.../org/apache/seatunnel/translation/source/CoordinatedSource.java | 6 ++++++
.../spark/source/batch/CoordinatedBatchPartitionReader.java | 2 --
.../spark/source/micro/CoordinatedMicroBatchPartitionReader.java | 2 --
3 files changed, 6 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 7c6f6043d..549ce2464 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -36,6 +36,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
public class CoordinatedSource<T, SplitT extends SourceSplit, StateT>
implements BaseSourceFunction<T> {
@@ -54,6 +55,7 @@ public class CoordinatedSource<T, SplitT extends SourceSplit,
StateT> implements
protected transient volatile SourceSplitEnumerator<SplitT, StateT>
splitEnumerator;
protected transient Map<Integer, SourceReader<T, SplitT>> readerMap = new
ConcurrentHashMap<>();
protected final Map<Integer, AtomicBoolean> readerRunningMap;
+ protected final AtomicInteger completedReader = new AtomicInteger(0);
protected transient volatile ScheduledThreadPoolExecutor executorService;
/**
@@ -108,6 +110,7 @@ public class CoordinatedSource<T, SplitT extends
SourceSplit, StateT> implements
for (int subtaskId = 0; subtaskId < this.parallelism; subtaskId++) {
CoordinatedReaderContext readerContext = new
CoordinatedReaderContext(this, source.getBoundedness(), subtaskId);
readerContextMap.put(subtaskId, readerContext);
+ readerRunningMap.put(subtaskId, new AtomicBoolean(true));
SourceReader<T, SplitT> reader =
source.createReader(readerContext);
readerMap.put(subtaskId, reader);
}
@@ -231,6 +234,9 @@ public class CoordinatedSource<T, SplitT extends
SourceSplit, StateT> implements
protected void handleNoMoreElement(int subtaskId) {
readerRunningMap.get(subtaskId).set(false);
readerContextMap.remove(subtaskId);
+ if (completedReader.incrementAndGet() == this.parallelism) {
+ this.running = false;
+ }
}
protected void handleSplitRequest(int subtaskId) {
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
index 2c00aafe6..235e26a13 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
@@ -32,7 +32,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
public class CoordinatedBatchPartitionReader extends
ParallelBatchPartitionReader {
@@ -59,7 +58,6 @@ public class CoordinatedBatchPartitionReader extends
ParallelBatchPartitionReade
}
public class InternalCoordinatedSource<SplitT extends SourceSplit, StateT>
extends CoordinatedSource<SeaTunnelRow, SplitT, StateT> {
- protected final AtomicInteger completedReader = new AtomicInteger(0);
public InternalCoordinatedSource(SeaTunnelSource<SeaTunnelRow, SplitT,
StateT> source, Map<Integer, List<byte[]>> restoredState, int parallelism) {
super(source, restoredState, parallelism);
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
index 4df4d585b..d5e18baa7 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
@@ -34,7 +34,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
public class CoordinatedMicroBatchPartitionReader extends
ParallelMicroBatchPartitionReader {
protected final Map<Integer, InternalRowCollector> collectorMap;
@@ -93,7 +92,6 @@ public class CoordinatedMicroBatchPartitionReader extends
ParallelMicroBatchPart
}
public class InternalCoordinatedSource<SplitT extends SourceSplit, StateT>
extends CoordinatedSource<SeaTunnelRow, SplitT, StateT> {
- protected final AtomicInteger completedReader = new AtomicInteger(0);
public InternalCoordinatedSource(SeaTunnelSource<SeaTunnelRow, SplitT,
StateT> source, Map<Integer, List<byte[]>> restoredState, int parallelism) {
super(source, restoredState, parallelism);