XComp commented on code in PR #22010:
URL: https://github.com/apache/flink/pull/22010#discussion_r1205133307


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -65,33 +71,46 @@ public void open(
                 this.checkpointedState == null,
                 "The " + getClass().getSimpleName() + " has already been 
initialized.");
 
-        this.checkpointedState =
-                context.getOperatorStateStore()
-                        .getListState(
-                                new ListStateDescriptor<>(
-                                        name + "-sequence-state", 
LongSerializer.INSTANCE));
-        this.valuesToEmit = new ArrayDeque<>();
-        if (context.isRestored()) {
-            // upon restoring
+        ListStateDescriptor<InternalState> stateDescriptor =
+                new ListStateDescriptor<>(
+                        name + "-sequence-state", 
TypeInformation.of(InternalState.class));
+        this.checkpointedState = 
context.getOperatorStateStore().getListState(stateDescriptor);
+        this.internalStates = Lists.newArrayList();
 
-            for (Long v : this.checkpointedState.get()) {
-                this.valuesToEmit.add(v);
-            }
+        if (context.isRestored()) {
+            checkpointedState.get().forEach(state -> 
internalStates.add(state));
         } else {
-            // the first time the job is executed
-            final int stepSize = runtimeContext.getNumberOfParallelSubtasks();
+            // The first time the job is executed.
             final int taskIdx = runtimeContext.getIndexOfThisSubtask();
-            final long congruence = start + taskIdx;
-
-            long totalNoOfElements = Math.abs(end - start + 1);
-            final int baseSize = safeDivide(totalNoOfElements, stepSize);
-            final int toCollect =
-                    (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : 
baseSize;
+            final long stepSize = runtimeContext.getNumberOfParallelSubtasks();
+            InternalState state = new InternalState(taskIdx, stepSize, start + 
taskIdx);
+            internalStates.add(state);
+        }
+    }
 
-            for (long collected = 0; collected < toCollect; collected++) {
-                this.valuesToEmit.add(collected * stepSize + congruence);
+    public Long nextValue() {
+        Iterator<InternalState> iterator = internalStates.iterator();

Review Comment:
   Yes, it feels like `internalState` should be some kind of priority queue 
that serves the internal state with the lowest element. I really would prefer 
having this put into some kind of unit test as well. :thinking: WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to