[ 
https://issues.apache.org/jira/browse/FLINK-5163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15715165#comment-15715165
 ] 

ASF GitHub Bot commented on FLINK-5163:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2871#discussion_r90641176
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
 ---
    @@ -18,25 +18,44 @@
     package org.apache.flink.streaming.api.functions.source;
     
     import org.apache.flink.annotation.PublicEvolving;
    -import org.apache.flink.api.common.functions.RuntimeContext;
    -import org.apache.flink.streaming.api.checkpoint.Checkpointed;
    +import org.apache.flink.api.common.state.ListState;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.ArrayDeque;
    +import java.util.Deque;
     
     /**
      * A stateful streaming source that emits each number from a given 
interval exactly once,
      * possibly in parallel.
    + * <p>
    + * For the source to be re-scalable, the first time the job is run, we 
precompute all the elements
    + * that each of the tasks should emit and upon checkpointing, each element 
constitutes its own
    + * partition. When rescaling, these partitions will be randomly 
re-assigned to the new tasks.
    + * <p>
    + * This strategy guarantees that each element will be emitted 
exactly-once, but elements will not
    + * necessarily be emitted in ascending order, even for the same tasks.
      */
     @PublicEvolving
    -public class StatefulSequenceSource extends 
RichParallelSourceFunction<Long> implements Checkpointed<Long> {
    +public class StatefulSequenceSource extends 
RichParallelSourceFunction<Long> implements CheckpointedFunction {
        
        private static final long serialVersionUID = 1L;
     
        private final long start;
        private final long end;
     
    -   private long collected;
    -
        private volatile boolean isRunning = true;
     
    +   private transient Deque<Long> valuesToEmit;
    +
    +   private static final String STATEFUL_SOURCE_STATE = 
"stateful-source-state";
    --- End diff --
    
    Instead of this you could also have
    ```
    private static final ListStateDescriptor<Long> STATE_DESCRIPTOR =
        new ListStateDescriptor<>("stateful-source-state", 
LongSerializer.INSTANCE);
    ```
    
    and then use this descriptor later directly instead of initialising with 
this field.
    
    That's just a personal style nitpick. Your version is also fine. 😃 


> Make the production functions rescalable (apart from the Rolling/Bucketing 
> Sinks)
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-5163
>                 URL: https://issues.apache.org/jira/browse/FLINK-5163
>             Project: Flink
>          Issue Type: Improvement
>    Affects Versions: 1.2.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>             Fix For: 1.2.0
>
>
> This issue targets porting all the functions in the production code to the 
> new state abstractions. These functions are:
> 1) StatefulSequenceSource
> 2) MessageAcknowledgingSourceBase
> 3) FromElementsFunction
> 4) ContinuousFileMonitoringFunction



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to