RyanSkraba commented on code in PR #22010:
URL: https://github.com/apache/flink/pull/22010#discussion_r1206825892


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -65,33 +71,46 @@ public void open(
                 this.checkpointedState == null,
                 "The " + getClass().getSimpleName() + " has already been 
initialized.");
 
-        this.checkpointedState =
-                context.getOperatorStateStore()
-                        .getListState(
-                                new ListStateDescriptor<>(
-                                        name + "-sequence-state", 
LongSerializer.INSTANCE));
-        this.valuesToEmit = new ArrayDeque<>();
-        if (context.isRestored()) {
-            // upon restoring
+        ListStateDescriptor<InternalState> stateDescriptor =
+                new ListStateDescriptor<>(
+                        name + "-sequence-state", 
TypeInformation.of(InternalState.class));
+        this.checkpointedState = 
context.getOperatorStateStore().getListState(stateDescriptor);
+        this.internalStates = Lists.newArrayList();
 
-            for (Long v : this.checkpointedState.get()) {
-                this.valuesToEmit.add(v);
-            }
+        if (context.isRestored()) {
+            checkpointedState.get().forEach(state -> 
internalStates.add(state));
         } else {
-            // the first time the job is executed
-            final int stepSize = runtimeContext.getNumberOfParallelSubtasks();
+            // The first time the job is executed.
             final int taskIdx = runtimeContext.getIndexOfThisSubtask();
-            final long congruence = start + taskIdx;
-
-            long totalNoOfElements = Math.abs(end - start + 1);
-            final int baseSize = safeDivide(totalNoOfElements, stepSize);
-            final int toCollect =
-                    (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : 
baseSize;
+            final long stepSize = runtimeContext.getNumberOfParallelSubtasks();
+            InternalState state = new InternalState(taskIdx, stepSize, start + 
taskIdx);
+            internalStates.add(state);
+        }
+    }
 
-            for (long collected = 0; collected < toCollect; collected++) {
-                this.valuesToEmit.add(collected * stepSize + congruence);
+    public Long nextValue() {
+        Iterator<InternalState> iterator = internalStates.iterator();

Review Comment:
   @xuzhiwen1255 This sounds perfect!
   > But I have an idea, that is, for each subtask, we let him generate more 
InternalState
   
   That is to say, instead of having N internal states when there are N tasks 
initially, we would have 2N, or 10N, or 100N.  That number doesn't have to be 
extremely high to avoid skew here, and still benefit from the "concise" and 
efficient InternalState you've implemented here.
   
   I think there might be some cool way of splitting up the ranges differently, 
but I'd like to think about it!  This should probably go on the future JIRA 
though :D 
   
   @XComp Yes, this issue would happen to any source that created "one split 
per task" on initialization, where the split corresponds to the operator state 
-- but the previous implementation didn't do that.  It was one state per 
element to emit, so it _could_ be perfectly rebalanced (but overall pretty 
inefficiently).
   
   I agree that it's not a problem for this PR!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to