[FLINK-8116] [DataStream] Provide proper checkpointed source function example in Javadocs
This closes #5121. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e637c54 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e637c54 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e637c54 Branch: refs/heads/release-1.4 Commit: 1e637c54c2ad1b9a8d9ad6d3f9c8aa55605d7e8e Parents: 6884ce9 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Mon Dec 18 14:57:01 2017 -0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Fri Jan 5 22:01:59 2018 -0800 ---------------------------------------------------------------------- .../api/functions/source/SourceFunction.java | 48 +++++++++++++++++--- 1 file changed, 41 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1e637c54/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java index cb2e15f..5a15df7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java @@ -41,15 +41,19 @@ import java.io.Serializable; * elements are not done concurrently. This is achieved by using the provided checkpointing lock * object to protect update of state and emission of elements in a synchronized block. * - * <p>This is the basic pattern one should follow when implementing a (checkpointed) source: + * <p>This is the basic pattern one should follow when implementing a checkpointed source: * * <pre>{@code - * public class ExampleSource<T> implements SourceFunction<T>, CheckpointedFunction { + * public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction { * private long count = 0L; * private volatile boolean isRunning = true; * + * private transient ListState<Long> checkpointedCount; + * * public void run(SourceContext<T> ctx) { * while (isRunning && count < 1000) { + * // this synchronized block ensures that state checkpointing, + * // internal state updates and emission of elements are an atomic operation * synchronized (ctx.getCheckpointLock()) { * ctx.collect(count); * count++; @@ -61,9 +65,22 @@ import java.io.Serializable; * isRunning = false; * } * - * public void snapshotState(FunctionSnapshotContext context) { } + * public void initializeState(FunctionInitializationContext context) { + * this.checkpointedCount = context + * .getOperatorStateStore() + * .getListState(new ListStateDescriptor<>("count", Long.class)); + * + * if (context.isRestored()) { + * for (Long count : this.checkpointedCount.get()) { + * this.count = count; + * } + * } + * } * - * public void initializeState(FunctionInitializationContext context) { } + * public void snapshotState(FunctionSnapshotContext context) { + * this.checkpointedCount.clear(); + * this.checkpointedCount.add(count); + * } * } * }</pre> * @@ -101,12 +118,16 @@ public interface SourceFunction<T> extends Function, Serializable { * state and emitting elements, to make both an atomic operation: * * <pre>{@code - * public class ExampleSource<T> implements SourceFunction<T>, CheckpointedFunction<Long> { + * public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction { * private long count = 0L; * private volatile boolean isRunning = true; * + * private transient ListState<Long> checkpointedCount; + * * public void run(SourceContext<T> ctx) { * while (isRunning && count < 1000) { + * // this synchronized block ensures that state checkpointing, + * // internal state updates and emission of elements are an atomic operation * synchronized (ctx.getCheckpointLock()) { * ctx.collect(count); * count++; @@ -118,9 +139,22 @@ public interface SourceFunction<T> extends Function, Serializable { * isRunning = false; * } * - * public Long snapshotState(long checkpointId, long checkpointTimestamp) { return count; } + * public void initializeState(FunctionInitializationContext context) { + * this.checkpointedCount = context + * .getOperatorStateStore() + * .getListState(new ListStateDescriptor<>("count", Long.class)); + * + * if (context.isRestored()) { + * for (Long count : this.checkpointedCount.get()) { + * this.count = count; + * } + * } + * } * - * public void restoreState(Long state) { this.count = state; } + * public void snapshotState(FunctionSnapshotContext context) { + * this.checkpointedCount.clear(); + * this.checkpointedCount.add(count); + * } * } * }</pre> *
