This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 18e94e44d1ebda41ad8595eac03393ebb17bdfbf
Author: Stephan Ewen <[email protected]>
AuthorDate: Tue Aug 24 15:36:12 2021 +0200

    [hotfix] Deduplicate JavaDocs in SourceFunction
---
 .../api/functions/source/SourceFunction.java       | 55 ++++------------------
 1 file changed, 10 insertions(+), 45 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
index 90c83b6..0cd513d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.watermark.Watermark;
 
@@ -98,53 +99,17 @@ import java.io.Serializable;
 public interface SourceFunction<T> extends Function, Serializable {
 
     /**
-     * Starts the source. Implementations can use the {@link SourceContext} 
emit elements.
+     * Starts the source. Implementations use the {@link SourceContext} to 
emit elements. Sources
+     * that checkpoint their state for fault tolerance should use the {@link
+     * SourceContext#getCheckpointLock()} checkpoint lock} to ensure 
consistency between the
+     * bookkeeping and emitting the elements.
      *
-     * <p>Sources that implement {@link
-     * org.apache.flink.streaming.api.checkpoint.CheckpointedFunction} must 
lock on the checkpoint
-     * lock (using a synchronized block) before updating internal state and 
emitting elements, to
-     * make both an atomic operation:
+     * <p>Sources that implement {@link CheckpointedFunction} must lock on the 
{@link
+     * SourceContext#getCheckpointLock()} checkpoint lock} checkpoint lock 
(using a synchronized
+     * block) before updating internal state and emitting elements, to make 
both an atomic
+     * operation.
      *
-     * <pre>{@code
-     *  public class ExampleCountSource implements SourceFunction<Long>, 
CheckpointedFunction {
-     *      private long count = 0L;
-     *      private volatile boolean isRunning = true;
-     *
-     *      private transient ListState<Long> checkpointedCount;
-     *
-     *      public void run(SourceContext<T> ctx) {
-     *          while (isRunning && count < 1000) {
-     *              // this synchronized block ensures that state 
checkpointing,
-     *              // internal state updates and emission of elements are an 
atomic operation
-     *              synchronized (ctx.getCheckpointLock()) {
-     *                  ctx.collect(count);
-     *                  count++;
-     *              }
-     *          }
-     *      }
-     *
-     *      public void cancel() {
-     *          isRunning = false;
-     *      }
-     *
-     *      public void initializeState(FunctionInitializationContext context) 
{
-     *          this.checkpointedCount = context
-     *              .getOperatorStateStore()
-     *              .getListState(new ListStateDescriptor<>("count", 
Long.class));
-     *
-     *          if (context.isRestored()) {
-     *              for (Long count : this.checkpointedCount.get()) {
-     *                  this.count = count;
-     *              }
-     *          }
-     *      }
-     *
-     *      public void snapshotState(FunctionSnapshotContext context) {
-     *          this.checkpointedCount.clear();
-     *          this.checkpointedCount.add(count);
-     *      }
-     * }
-     * }</pre>
+     * <p>Refer to the {@link SourceFunction top-level class docs} for an 
example.
      *
      * @param ctx The context to emit elements to and for accessing locks.
      */

Reply via email to