[
https://issues.apache.org/jira/browse/FLINK-5163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15715165#comment-15715165
]
ASF GitHub Bot commented on FLINK-5163:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2871#discussion_r90641176
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
---
@@ -18,25 +18,44 @@
package org.apache.flink.streaming.api.functions.source;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
/**
* A stateful streaming source that emits each number from a given
interval exactly once,
* possibly in parallel.
+ * <p>
+ * For the source to be re-scalable, the first time the job is run, we
precompute all the elements
+ * that each of the tasks should emit and upon checkpointing, each element
constitutes its own
+ * partition. When rescaling, these partitions will be randomly
re-assigned to the new tasks.
+ * <p>
+ * This strategy guarantees that each element will be emitted
exactly-once, but elements will not
+ * necessarily be emitted in ascending order, even for the same tasks.
*/
@PublicEvolving
-public class StatefulSequenceSource extends
RichParallelSourceFunction<Long> implements Checkpointed<Long> {
+public class StatefulSequenceSource extends
RichParallelSourceFunction<Long> implements CheckpointedFunction {
private static final long serialVersionUID = 1L;
private final long start;
private final long end;
- private long collected;
-
private volatile boolean isRunning = true;
+ private transient Deque<Long> valuesToEmit;
+
+ private static final String STATEFUL_SOURCE_STATE =
"stateful-source-state";
--- End diff --
Instead of this you could also have
```
private static final ListStateDescriptor<Long> STATE_DESCRIPTOR =
new ListStateDescriptor<>("stateful-source-state",
LongSerializer.INSTANCE);
```
and then use this descriptor later directly instead of initialising with
this field.
That's just a personal style nitpick. Your version is also fine. 😃
> Make the production functions rescalable (apart from the Rolling/Bucketing
> Sinks)
> ---------------------------------------------------------------------------------
>
> Key: FLINK-5163
> URL: https://issues.apache.org/jira/browse/FLINK-5163
> Project: Flink
> Issue Type: Improvement
> Affects Versions: 1.2.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> This issue targets porting all the functions in the production code to the
> new state abstractions. These functions are:
> 1) StatefulSequenceSource
> 2) MessageAcknowledgingSourceBase
> 3) FromElementsFunction
> 4) ContinuousFileMonitoringFunction
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)