XComp commented on code in PR #22010:
URL: https://github.com/apache/flink/pull/22010#discussion_r1206745426


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -65,33 +71,46 @@ public void open(
                 this.checkpointedState == null,
                 "The " + getClass().getSimpleName() + " has already been 
initialized.");
 
-        this.checkpointedState =
-                context.getOperatorStateStore()
-                        .getListState(
-                                new ListStateDescriptor<>(
-                                        name + "-sequence-state", 
LongSerializer.INSTANCE));
-        this.valuesToEmit = new ArrayDeque<>();
-        if (context.isRestored()) {
-            // upon restoring
+        ListStateDescriptor<InternalState> stateDescriptor =
+                new ListStateDescriptor<>(
+                        name + "-sequence-state", 
TypeInformation.of(InternalState.class));
+        this.checkpointedState = 
context.getOperatorStateStore().getListState(stateDescriptor);
+        this.internalStates = Lists.newArrayList();
 
-            for (Long v : this.checkpointedState.get()) {
-                this.valuesToEmit.add(v);
-            }
+        if (context.isRestored()) {
+            checkpointedState.get().forEach(state -> 
internalStates.add(state));
         } else {
-            // the first time the job is executed
-            final int stepSize = runtimeContext.getNumberOfParallelSubtasks();
+            // The first time the job is executed.
             final int taskIdx = runtimeContext.getIndexOfThisSubtask();
-            final long congruence = start + taskIdx;
-
-            long totalNoOfElements = Math.abs(end - start + 1);
-            final int baseSize = safeDivide(totalNoOfElements, stepSize);
-            final int toCollect =
-                    (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : 
baseSize;
+            final long stepSize = runtimeContext.getNumberOfParallelSubtasks();
+            InternalState state = new InternalState(taskIdx, stepSize, start + 
taskIdx);
+            internalStates.add(state);
+        }
+    }
 
-            for (long collected = 0; collected < toCollect; collected++) {
-                this.valuesToEmit.add(collected * stepSize + congruence);
+    public Long nextValue() {
+        Iterator<InternalState> iterator = internalStates.iterator();

Review Comment:
   Thanks for sharing your view, @RyanSkraba. I think you're right - the 
implementation in master (i.e. without this PR's change) doesn't support 
reducing the parallelism properly, too. 
   ```
   Avoid performance skew after reducing parallelism. Currently, going from N 
tasks to N-1 tasks will double the length of time it takes, since one task will 
have twice as many values to emit as the others. This is a undesirable 
consequence of this PR, but can probably also be discussed and implemented in a 
subsequent step.
   ```
   I don't understand that part: Isn't the performance skew issue you're 
describing more general and not rely connected to the `DataGenerator`? The skew 
you're mentioning is happening in any case when reducing the parallelism. It's 
more of a question how Flink would distribute the operator state after the 
parallelism was reduced. :thinking: Therefore, I don't see this as a problem 
for this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to