[
https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16104907#comment-16104907
]
ASF GitHub Bot commented on FLINK-6215:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3669#discussion_r130081125
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
---
@@ -61,52 +65,89 @@
* @param end End of the range of numbers to emit.
*/
public StatefulSequenceSource(long start, long end) {
+ Preconditions.checkArgument(start <= end);
this.start = start;
this.end = end;
}
@Override
public void initializeState(FunctionInitializationContext context)
throws Exception {
- Preconditions.checkState(this.checkpointedState == null,
+ Preconditions.checkState(checkpointedState == null,
"The " + getClass().getSimpleName() + " has already
been initialized.");
this.checkpointedState =
context.getOperatorStateStore().getOperatorState(
new ListStateDescriptor<>(
- "stateful-sequence-source-state",
- LongSerializer.INSTANCE
+ "stateful-sequence-source-state",
+ new TupleSerializer<>(
+ (Class<Tuple2<Long,
Long>>) (Class<?>) Tuple2.class,
+ new TypeSerializer<?>[]
{ LongSerializer.INSTANCE, LongSerializer.INSTANCE }
+ )
)
);
- this.valuesToEmit = new ArrayDeque<>();
+ this.endToNextOffsetMapping = new HashMap<>();
if (context.isRestored()) {
- // upon restoring
-
- for (Long v : this.checkpointedState.get()) {
- this.valuesToEmit.add(v);
+ for (Tuple2<Long, Long> partitionInfo:
checkpointedState.get()) {
--- End diff --
nit: empty spaces should surround ":"
> Make the StatefulSequenceSource scalable.
> -----------------------------------------
>
> Key: FLINK-6215
> URL: https://issues.apache.org/jira/browse/FLINK-6215
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.3.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Fix For: 1.4.0
>
>
> Currently the {{StatefulSequenceSource}} instantiates all the elements to
> emit first and keeps them in memory. This is not scalable as for large
> sequences of elements this can lead to out of memory exceptions.
> To solve this, we can pre-partition the sequence of elements based on the
> {{maxParallelism}} parameter, and just keep state (to checkpoint) per such
> partition.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)