RyanSkraba commented on code in PR #22010:
URL: https://github.com/apache/flink/pull/22010#discussion_r1206364163


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -65,33 +71,46 @@ public void open(
                 this.checkpointedState == null,
                 "The " + getClass().getSimpleName() + " has already been 
initialized.");
 
-        this.checkpointedState =
-                context.getOperatorStateStore()
-                        .getListState(
-                                new ListStateDescriptor<>(
-                                        name + "-sequence-state", 
LongSerializer.INSTANCE));
-        this.valuesToEmit = new ArrayDeque<>();
-        if (context.isRestored()) {
-            // upon restoring
+        ListStateDescriptor<InternalState> stateDescriptor =
+                new ListStateDescriptor<>(
+                        name + "-sequence-state", 
TypeInformation.of(InternalState.class));
+        this.checkpointedState = 
context.getOperatorStateStore().getListState(stateDescriptor);
+        this.internalStates = Lists.newArrayList();
 
-            for (Long v : this.checkpointedState.get()) {
-                this.valuesToEmit.add(v);
-            }
+        if (context.isRestored()) {
+            checkpointedState.get().forEach(state -> 
internalStates.add(state));
         } else {
-            // the first time the job is executed
-            final int stepSize = runtimeContext.getNumberOfParallelSubtasks();
+            // The first time the job is executed.
             final int taskIdx = runtimeContext.getIndexOfThisSubtask();
-            final long congruence = start + taskIdx;
-
-            long totalNoOfElements = Math.abs(end - start + 1);
-            final int baseSize = safeDivide(totalNoOfElements, stepSize);
-            final int toCollect =
-                    (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : 
baseSize;
+            final long stepSize = runtimeContext.getNumberOfParallelSubtasks();
+            InternalState state = new InternalState(taskIdx, stepSize, start + 
taskIdx);
+            internalStates.add(state);
+        }
+    }
 
-            for (long collected = 0; collected < toCollect; collected++) {
-                this.valuesToEmit.add(collected * stepSize + congruence);
+    public Long nextValue() {
+        Iterator<InternalState> iterator = internalStates.iterator();

Review Comment:
   This seems like a good idea, and it's probably appropriate to create some 
JIRAs to implement in subsequent, incremental steps.
   
   The current internal states (before this PR) for a generator from 1 to 1M 
would have 1 million values: `1,2,3...1000000`.  If it were checkpointed, it 
would **(1)** take quite a bit of space and **(2)** I'm not sure that restoring 
it to a lower parallelism retains any lowest-first semantics.
   
   This PR solves **(1)**, but doesn't change much in terms of **(2)**.  The 
internal states would be `(1,3), (2,3), (3,3)` (assuming a parallelism of 3).  
   
   If it was restored to a parallelism of two (after the PR), and one task got 
`(1,3), (2,3)` as its internal states, it would emit `1,4,7,10 ... 1000000, 2, 
5, 8 ...`.  
   
   If my understanding is correct, emitting `2` _after_ `1000000` is a 
possibility in both scenarios.  There's an opportunity here:
   
   * Improve the logic so that a task always emits the lowest of the possible 
values from the internal states that it possesses (although a priority queue is 
probably unnecessary, it should end up roughly equivalent to sorting and 
cycling through the states).  This is really a new improvement that could be 
applied to the current state.
   * Avoid performance skew after reducing parallelism.  Currently, going from 
N tasks to N-1 tasks will double the length of time it takes, since one task 
will have twice as many values to emit as the others.  This is a undesirable 
consequence of this PR, but can probably also be discussed and implemented in a 
subsequent step.
   
   One thing that I really like about this PR is that it enables huge sequences 
that aren't possible today, in an efficient manner.  This is a really 
compelling advantage over the current state in every way!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to