This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5acda5a14 [flink-1.13] esured checkpoint enable (#2178)
5acda5a14 is described below

commit 5acda5a149c03bfbc83f3aada0665990d3e99c57
Author: Zongwen Li <[email protected]>
AuthorDate: Fri Jul 15 09:58:59 2022 +0800

    [flink-1.13] esured checkpoint enable (#2178)
---
 .../translation/flink/source/BaseSeaTunnelSourceFunction.java     | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
index 88b0ee719..911c9b0a3 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.types.Row;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,8 +84,11 @@ public abstract class BaseSeaTunnelSourceFunction extends 
RichSourceFunction<Row
         // Wait for a checkpoint to complete:
         // In the current version(version < 1.14.0), when the operator state 
of the source changes to FINISHED, jobs cannot be checkpoint executed.
         final long prevCheckpointId = latestTriggerCheckpointId.get();
-        while (running && prevCheckpointId >= 
latestCompletedCheckpointId.get()) {
-            Thread.sleep(100);
+        // Ensured Checkpoint enabled
+        if (getRuntimeContext() instanceof StreamingRuntimeContext && 
((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()) {
+            while (running && prevCheckpointId >= 
latestCompletedCheckpointId.get()) {
+                Thread.sleep(100);
+            }
         }
     }
 

Reply via email to