[FLINK-8116] [DataStream] Provide proper checkpointed source function example 
in Javadocs

This closes #5121.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e637c54
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e637c54
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e637c54

Branch: refs/heads/release-1.4
Commit: 1e637c54c2ad1b9a8d9ad6d3f9c8aa55605d7e8e
Parents: 6884ce9
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Mon Dec 18 14:57:01 2017 -0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Fri Jan 5 22:01:59 2018 -0800

----------------------------------------------------------------------
 .../api/functions/source/SourceFunction.java    | 48 +++++++++++++++++---
 1 file changed, 41 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1e637c54/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
index cb2e15f..5a15df7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -41,15 +41,19 @@ import java.io.Serializable;
  * elements are not done concurrently. This is achieved by using the provided 
checkpointing lock
  * object to protect update of state and emission of elements in a 
synchronized block.
  *
- * <p>This is the basic pattern one should follow when implementing a 
(checkpointed) source:
+ * <p>This is the basic pattern one should follow when implementing a 
checkpointed source:
  *
  * <pre>{@code
- *  public class ExampleSource<T> implements SourceFunction<T>, 
CheckpointedFunction {
+ *  public class ExampleCountSource implements SourceFunction<Long>, 
CheckpointedFunction {
  *      private long count = 0L;
  *      private volatile boolean isRunning = true;
  *
+ *      private transient ListState<Long> checkpointedCount;
+ *
  *      public void run(SourceContext<T> ctx) {
  *          while (isRunning && count < 1000) {
+ *              // this synchronized block ensures that state checkpointing,
+ *              // internal state updates and emission of elements are an 
atomic operation
  *              synchronized (ctx.getCheckpointLock()) {
  *                  ctx.collect(count);
  *                  count++;
@@ -61,9 +65,22 @@ import java.io.Serializable;
  *          isRunning = false;
  *      }
  *
- *      public void snapshotState(FunctionSnapshotContext context) {  }
+ *      public void initializeState(FunctionInitializationContext context) {
+ *          this.checkpointedCount = context
+ *              .getOperatorStateStore()
+ *              .getListState(new ListStateDescriptor<>("count", Long.class));
+ *
+ *          if (context.isRestored()) {
+ *              for (Long count : this.checkpointedCount.get()) {
+ *                  this.count = count;
+ *              }
+ *          }
+ *      }
  *
- *      public void initializeState(FunctionInitializationContext context) {  }
+ *      public void snapshotState(FunctionSnapshotContext context) {
+ *          this.checkpointedCount.clear();
+ *          this.checkpointedCount.add(count);
+ *      }
  * }
  * }</pre>
  *
@@ -101,12 +118,16 @@ public interface SourceFunction<T> extends Function, 
Serializable {
         * state and emitting elements, to make both an atomic operation:
         *
         * <pre>{@code
-        *  public class ExampleSource<T> implements SourceFunction<T>, 
CheckpointedFunction<Long> {
+        *  public class ExampleCountSource implements SourceFunction<Long>, 
CheckpointedFunction {
         *      private long count = 0L;
         *      private volatile boolean isRunning = true;
         *
+        *      private transient ListState<Long> checkpointedCount;
+        *
         *      public void run(SourceContext<T> ctx) {
         *          while (isRunning && count < 1000) {
+        *              // this synchronized block ensures that state 
checkpointing,
+        *              // internal state updates and emission of elements are 
an atomic operation
         *              synchronized (ctx.getCheckpointLock()) {
         *                  ctx.collect(count);
         *                  count++;
@@ -118,9 +139,22 @@ public interface SourceFunction<T> extends Function, 
Serializable {
         *          isRunning = false;
         *      }
         *
-        *      public Long snapshotState(long checkpointId, long 
checkpointTimestamp) { return count; }
+        *      public void initializeState(FunctionInitializationContext 
context) {
+        *          this.checkpointedCount = context
+        *              .getOperatorStateStore()
+        *              .getListState(new ListStateDescriptor<>("count", 
Long.class));
+        *
+        *          if (context.isRestored()) {
+        *              for (Long count : this.checkpointedCount.get()) {
+        *                  this.count = count;
+        *              }
+        *          }
+        *      }
         *
-        *      public void restoreState(Long state) { this.count = state; }
+        *      public void snapshotState(FunctionSnapshotContext context) {
+        *          this.checkpointedCount.clear();
+        *          this.checkpointedCount.add(count);
+        *      }
         * }
         * }</pre>
         *

Reply via email to