Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2871#discussion_r90641176
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
 ---
    @@ -18,25 +18,44 @@
     package org.apache.flink.streaming.api.functions.source;
     
     import org.apache.flink.annotation.PublicEvolving;
    -import org.apache.flink.api.common.functions.RuntimeContext;
    -import org.apache.flink.streaming.api.checkpoint.Checkpointed;
    +import org.apache.flink.api.common.state.ListState;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.ArrayDeque;
    +import java.util.Deque;
     
     /**
      * A stateful streaming source that emits each number from a given 
interval exactly once,
      * possibly in parallel.
    + * <p>
    + * For the source to be re-scalable, the first time the job is run, we 
precompute all the elements
    + * that each of the tasks should emit and upon checkpointing, each element 
constitutes its own
    + * partition. When rescaling, these partitions will be randomly 
re-assigned to the new tasks.
    + * <p>
    + * This strategy guarantees that each element will be emitted 
exactly-once, but elements will not
    + * necessarily be emitted in ascending order, even for the same tasks.
      */
     @PublicEvolving
    -public class StatefulSequenceSource extends 
RichParallelSourceFunction<Long> implements Checkpointed<Long> {
    +public class StatefulSequenceSource extends 
RichParallelSourceFunction<Long> implements CheckpointedFunction {
        
        private static final long serialVersionUID = 1L;
     
        private final long start;
        private final long end;
     
    -   private long collected;
    -
        private volatile boolean isRunning = true;
     
    +   private transient Deque<Long> valuesToEmit;
    +
    +   private static final String STATEFUL_SOURCE_STATE = 
"stateful-source-state";
    --- End diff --
    
    Instead of this you could also have
    ```
    private static final ListStateDescriptor<Long> STATE_DESCRIPTOR =
        new ListStateDescriptor<>("stateful-source-state", 
LongSerializer.INSTANCE);
    ```
    
    and then use this descriptor later directly instead of initialising with 
this field.
    
    That's just a personal style nitpick. Your version is also fine. 😃 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to