This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 18e94e44d1ebda41ad8595eac03393ebb17bdfbf Author: Stephan Ewen <[email protected]> AuthorDate: Tue Aug 24 15:36:12 2021 +0200 [hotfix] Deduplicate JavaDocs in SourceFunction --- .../api/functions/source/SourceFunction.java | 55 ++++------------------ 1 file changed, 10 insertions(+), 45 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java index 90c83b6..0cd513d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java @@ -23,6 +23,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.Function; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.watermark.Watermark; @@ -98,53 +99,17 @@ import java.io.Serializable; public interface SourceFunction<T> extends Function, Serializable { /** - * Starts the source. Implementations can use the {@link SourceContext} emit elements. + * Starts the source. Implementations use the {@link SourceContext} to emit elements. Sources + * that checkpoint their state for fault tolerance should use the {@link + * SourceContext#getCheckpointLock()} checkpoint lock} to ensure consistency between the + * bookkeeping and emitting the elements. * - * <p>Sources that implement {@link - * org.apache.flink.streaming.api.checkpoint.CheckpointedFunction} must lock on the checkpoint - * lock (using a synchronized block) before updating internal state and emitting elements, to - * make both an atomic operation: + * <p>Sources that implement {@link CheckpointedFunction} must lock on the {@link + * SourceContext#getCheckpointLock()} checkpoint lock} checkpoint lock (using a synchronized + * block) before updating internal state and emitting elements, to make both an atomic + * operation. * - * <pre>{@code - * public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction { - * private long count = 0L; - * private volatile boolean isRunning = true; - * - * private transient ListState<Long> checkpointedCount; - * - * public void run(SourceContext<T> ctx) { - * while (isRunning && count < 1000) { - * // this synchronized block ensures that state checkpointing, - * // internal state updates and emission of elements are an atomic operation - * synchronized (ctx.getCheckpointLock()) { - * ctx.collect(count); - * count++; - * } - * } - * } - * - * public void cancel() { - * isRunning = false; - * } - * - * public void initializeState(FunctionInitializationContext context) { - * this.checkpointedCount = context - * .getOperatorStateStore() - * .getListState(new ListStateDescriptor<>("count", Long.class)); - * - * if (context.isRestored()) { - * for (Long count : this.checkpointedCount.get()) { - * this.count = count; - * } - * } - * } - * - * public void snapshotState(FunctionSnapshotContext context) { - * this.checkpointedCount.clear(); - * this.checkpointedCount.add(count); - * } - * } - * }</pre> + * <p>Refer to the {@link SourceFunction top-level class docs} for an example. * * @param ctx The context to emit elements to and for accessing locks. */
