This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5acda5a14 [flink-1.13] esured checkpoint enable (#2178)
5acda5a14 is described below
commit 5acda5a149c03bfbc83f3aada0665990d3e99c57
Author: Zongwen Li <[email protected]>
AuthorDate: Fri Jul 15 09:58:59 2022 +0800
[flink-1.13] esured checkpoint enable (#2178)
---
.../translation/flink/source/BaseSeaTunnelSourceFunction.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
index 88b0ee719..911c9b0a3 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
+++
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,8 +84,11 @@ public abstract class BaseSeaTunnelSourceFunction extends
RichSourceFunction<Row
// Wait for a checkpoint to complete:
// In the current version(version < 1.14.0), when the operator state
of the source changes to FINISHED, jobs cannot be checkpoint executed.
final long prevCheckpointId = latestTriggerCheckpointId.get();
- while (running && prevCheckpointId >=
latestCompletedCheckpointId.get()) {
- Thread.sleep(100);
+ // Ensured Checkpoint enabled
+ if (getRuntimeContext() instanceof StreamingRuntimeContext &&
((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()) {
+ while (running && prevCheckpointId >=
latestCompletedCheckpointId.get()) {
+ Thread.sleep(100);
+ }
}
}