Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3669#discussion_r130081876
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
 ---
    @@ -61,52 +65,89 @@
         * @param end End of the range of numbers to emit.
         */
        public StatefulSequenceSource(long start, long end) {
    +           Preconditions.checkArgument(start <= end);
                this.start = start;
                this.end = end;
        }
     
        @Override
        public void initializeState(FunctionInitializationContext context) 
throws Exception {
     
    -           Preconditions.checkState(this.checkpointedState == null,
    +           Preconditions.checkState(checkpointedState == null,
                        "The " + getClass().getSimpleName() + " has already 
been initialized.");
     
                this.checkpointedState = 
context.getOperatorStateStore().getOperatorState(
                        new ListStateDescriptor<>(
    -                           "stateful-sequence-source-state",
    -                           LongSerializer.INSTANCE
    +                           "stateful-sequence-source-state", 
    +                                   new TupleSerializer<>(
    +                                                   (Class<Tuple2<Long, 
Long>>) (Class<?>) Tuple2.class,
    +                                                   new TypeSerializer<?>[] 
{ LongSerializer.INSTANCE, LongSerializer.INSTANCE }
    +                                   )
                        )
                );
     
    -           this.valuesToEmit = new ArrayDeque<>();
    +           this.endToNextOffsetMapping = new HashMap<>();
                if (context.isRestored()) {
    -                   // upon restoring
    -
    -                   for (Long v : this.checkpointedState.get()) {
    -                           this.valuesToEmit.add(v);
    +                   for (Tuple2<Long, Long> partitionInfo: 
checkpointedState.get()) {
    +                           Long prev = 
endToNextOffsetMapping.put(partitionInfo.f0, partitionInfo.f1);
    +                           Preconditions.checkState(prev == null,
    +                                           getClass().getSimpleName() + " 
: Duplicate entry when restoring.");
                        }
                } else {
    -                   // the first time the job is executed
    -
    -                   final int stepSize = 
getRuntimeContext().getNumberOfParallelSubtasks();
                        final int taskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
    -                   final long congruence = start + taskIdx;
    +                   final int parallelTasks = 
getRuntimeContext().getNumberOfParallelSubtasks();
    +
    +                   final long totalElements = Math.abs(end - start + 1L);
    +                   final int maxParallelism = 
getRuntimeContext().getMaxNumberOfParallelSubtasks();
    +                   final int totalPartitions = totalElements < 
Integer.MAX_VALUE ? Math.min(maxParallelism, (int) totalElements) : 
maxParallelism;
     
    -                   long totalNoOfElements = Math.abs(end - start + 1);
    -                   final int baseSize = safeDivide(totalNoOfElements, 
stepSize);
    -                   final int toCollect = (totalNoOfElements % stepSize > 
taskIdx) ? baseSize + 1 : baseSize;
    +                   Tuple2<Integer, Integer> localPartitionRange = 
getLocalRange(totalPartitions, parallelTasks, taskIdx);
    +                   int localStartIdx = localPartitionRange.f0;
    +                   int localEndIdx = localStartIdx + 
localPartitionRange.f1;
     
    -                   for (long collected = 0; collected < toCollect; 
collected++) {
    -                           this.valuesToEmit.add(collected * stepSize + 
congruence);
    +                   for (int partIdx = localStartIdx; partIdx < 
localEndIdx; partIdx++) {
    +                           Tuple2<Long, Long> limits = 
getPartitionLimits(totalElements, totalPartitions, partIdx);
    +                           endToNextOffsetMapping.put(limits.f1, 
limits.f0);
                        }
                }
        }
     
    +   private Tuple2<Integer, Integer> getLocalRange(int totalPartitions, int 
parallelTasks, int taskIdx) {
    +           int minPartitionSliceSize = totalPartitions / parallelTasks;
    +           int remainingPartitions = totalPartitions - 
minPartitionSliceSize * parallelTasks;
    +
    +           int localRangeStartIdx = taskIdx * minPartitionSliceSize + 
Math.min(taskIdx, remainingPartitions);
    +           int localRangeSize = taskIdx < remainingPartitions ? 
minPartitionSliceSize + 1 : minPartitionSliceSize;
    +
    +           return new Tuple2<>(localRangeStartIdx, localRangeSize);
    +   }
    +
    +   private Tuple2<Long, Long> getPartitionLimits(long totalElements, int 
totalPartitions, long partitionIdx) {
    +           long minElementPartitionSize = totalElements / totalPartitions;
    +           long remainingElements = totalElements - 
minElementPartitionSize * totalPartitions;
    +           long startOffset = start;
    +
    +           for (int idx = 0; idx < partitionIdx; idx++) {
    +                   long partitionSize = idx < remainingElements ? 
minElementPartitionSize + 1L : minElementPartitionSize;
    +                   startOffset += partitionSize;
    +           }
    +
    +           long partitionSize = partitionIdx < remainingElements ? 
minElementPartitionSize + 1L : minElementPartitionSize;
    +           return new Tuple2<>(startOffset, startOffset + partitionSize);
    +   }
    +
        @Override
        public void run(SourceContext<Long> ctx) throws Exception {
    -           while (isRunning && !this.valuesToEmit.isEmpty()) {
    -                   synchronized (ctx.getCheckpointLock()) {
    -                           ctx.collect(this.valuesToEmit.poll());
    +           for (Map.Entry<Long, Long> partition: 
endToNextOffsetMapping.entrySet()) {
    --- End diff --
    
    I wonder if it would make sense to emit local ranges by order of increasing 
"end offsets". That way at least the emitted values are still always increasing.
    While we can't really guarantee ordering with this new rescalable 
implementation, we could still do a best effort on that locally. What do you 
think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to