This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 556012371 [api-draft][flink-1.13] Ensure checkpoint execution for all 
data (#1988)
556012371 is described below

commit 5560123716a4b2f638d7fb71401bdfcca8e53331
Author: Zongwen Li <[email protected]>
AuthorDate: Thu Jul 14 21:47:43 2022 +0800

    [api-draft][flink-1.13] Ensure checkpoint execution for all data (#1988)
    
    In the current version(version < 1.14.0), when the operator state of the 
source changes to FINISHED, jobs cannot be checkpoint executed.
---
 .../flink/source/BaseSeaTunnelSourceFunction.java        | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
index 93f6c3c6e..88b0ee719 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 public abstract class BaseSeaTunnelSourceFunction extends 
RichSourceFunction<Row>
     implements CheckpointListener, ResultTypeQueryable<Row>, 
CheckpointedFunction {
@@ -54,6 +55,9 @@ public abstract class BaseSeaTunnelSourceFunction extends 
RichSourceFunction<Row
     protected transient ListState<Map<Integer, List<byte[]>>> sourceState;
     protected transient volatile Map<Integer, List<byte[]>> restoredState = 
new HashMap<>();
 
+    protected final AtomicLong latestCompletedCheckpointId = new AtomicLong(0);
+    protected final AtomicLong latestTriggerCheckpointId = new AtomicLong(0);
+
     /**
      * Flag indicating whether the consumer is still running.
      */
@@ -72,9 +76,16 @@ public abstract class BaseSeaTunnelSourceFunction extends 
RichSourceFunction<Row
 
     protected abstract BaseSourceFunction<SeaTunnelRow> createInternalSource();
 
+    @SuppressWarnings("checkstyle:MagicNumber")
     @Override
     public void run(SourceFunction.SourceContext<Row> sourceContext) throws 
Exception {
         internalSource.run(new RowCollector(sourceContext, 
sourceContext.getCheckpointLock(), source.getProducedType()));
+        // Wait for a checkpoint to complete:
+        // In the current version(version < 1.14.0), when the operator state 
of the source changes to FINISHED, jobs cannot be checkpoint executed.
+        final long prevCheckpointId = latestTriggerCheckpointId.get();
+        while (running && prevCheckpointId >= 
latestCompletedCheckpointId.get()) {
+            Thread.sleep(100);
+        }
     }
 
     @Override
@@ -90,6 +101,7 @@ public abstract class BaseSeaTunnelSourceFunction extends 
RichSourceFunction<Row
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         internalSource.notifyCheckpointComplete(checkpointId);
+        latestCompletedCheckpointId.set(checkpointId);
     }
 
     @Override
@@ -105,11 +117,13 @@ public abstract class BaseSeaTunnelSourceFunction extends 
RichSourceFunction<Row
 
     @Override
     public void snapshotState(FunctionSnapshotContext snapshotContext) throws 
Exception {
+        final long checkpointId = snapshotContext.getCheckpointId();
+        latestTriggerCheckpointId.set(checkpointId);
         if (!running) {
             LOG.debug("snapshotState() called on closed source");
         } else {
             sourceState.clear();
-            
sourceState.add(internalSource.snapshotState(snapshotContext.getCheckpointId()));
+            sourceState.add(internalSource.snapshotState(checkpointId));
         }
     }
 

Reply via email to