This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 556012371 [api-draft][flink-1.13] Ensure checkpoint execution for all
data (#1988)
556012371 is described below
commit 5560123716a4b2f638d7fb71401bdfcca8e53331
Author: Zongwen Li <[email protected]>
AuthorDate: Thu Jul 14 21:47:43 2022 +0800
[api-draft][flink-1.13] Ensure checkpoint execution for all data (#1988)
In the current version(version < 1.14.0), when the operator state of the
source changes to FINISHED, jobs cannot be checkpoint executed.
---
.../flink/source/BaseSeaTunnelSourceFunction.java | 16 +++++++++++++++-
1 file changed, 15 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
index 93f6c3c6e..88b0ee719 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
+++
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
public abstract class BaseSeaTunnelSourceFunction extends
RichSourceFunction<Row>
implements CheckpointListener, ResultTypeQueryable<Row>,
CheckpointedFunction {
@@ -54,6 +55,9 @@ public abstract class BaseSeaTunnelSourceFunction extends
RichSourceFunction<Row
protected transient ListState<Map<Integer, List<byte[]>>> sourceState;
protected transient volatile Map<Integer, List<byte[]>> restoredState =
new HashMap<>();
+ protected final AtomicLong latestCompletedCheckpointId = new AtomicLong(0);
+ protected final AtomicLong latestTriggerCheckpointId = new AtomicLong(0);
+
/**
* Flag indicating whether the consumer is still running.
*/
@@ -72,9 +76,16 @@ public abstract class BaseSeaTunnelSourceFunction extends
RichSourceFunction<Row
protected abstract BaseSourceFunction<SeaTunnelRow> createInternalSource();
+ @SuppressWarnings("checkstyle:MagicNumber")
@Override
public void run(SourceFunction.SourceContext<Row> sourceContext) throws
Exception {
internalSource.run(new RowCollector(sourceContext,
sourceContext.getCheckpointLock(), source.getProducedType()));
+ // Wait for a checkpoint to complete:
+ // In the current version(version < 1.14.0), when the operator state
of the source changes to FINISHED, jobs cannot be checkpoint executed.
+ final long prevCheckpointId = latestTriggerCheckpointId.get();
+ while (running && prevCheckpointId >=
latestCompletedCheckpointId.get()) {
+ Thread.sleep(100);
+ }
}
@Override
@@ -90,6 +101,7 @@ public abstract class BaseSeaTunnelSourceFunction extends
RichSourceFunction<Row
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
internalSource.notifyCheckpointComplete(checkpointId);
+ latestCompletedCheckpointId.set(checkpointId);
}
@Override
@@ -105,11 +117,13 @@ public abstract class BaseSeaTunnelSourceFunction extends
RichSourceFunction<Row
@Override
public void snapshotState(FunctionSnapshotContext snapshotContext) throws
Exception {
+ final long checkpointId = snapshotContext.getCheckpointId();
+ latestTriggerCheckpointId.set(checkpointId);
if (!running) {
LOG.debug("snapshotState() called on closed source");
} else {
sourceState.clear();
-
sourceState.add(internalSource.snapshotState(snapshotContext.getCheckpointId()));
+ sourceState.add(internalSource.snapshotState(checkpointId));
}
}