xuzhiwen1255 commented on code in PR #22010:
URL: https://github.com/apache/flink/pull/22010#discussion_r1206414704


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -65,33 +71,46 @@ public void open(
                 this.checkpointedState == null,
                 "The " + getClass().getSimpleName() + " has already been 
initialized.");
 
-        this.checkpointedState =
-                context.getOperatorStateStore()
-                        .getListState(
-                                new ListStateDescriptor<>(
-                                        name + "-sequence-state", 
LongSerializer.INSTANCE));
-        this.valuesToEmit = new ArrayDeque<>();
-        if (context.isRestored()) {
-            // upon restoring
+        ListStateDescriptor<InternalState> stateDescriptor =
+                new ListStateDescriptor<>(
+                        name + "-sequence-state", 
TypeInformation.of(InternalState.class));
+        this.checkpointedState = 
context.getOperatorStateStore().getListState(stateDescriptor);
+        this.internalStates = Lists.newArrayList();
 
-            for (Long v : this.checkpointedState.get()) {
-                this.valuesToEmit.add(v);
-            }
+        if (context.isRestored()) {
+            checkpointedState.get().forEach(state -> 
internalStates.add(state));
         } else {
-            // the first time the job is executed
-            final int stepSize = runtimeContext.getNumberOfParallelSubtasks();
+            // The first time the job is executed.
             final int taskIdx = runtimeContext.getIndexOfThisSubtask();
-            final long congruence = start + taskIdx;
-
-            long totalNoOfElements = Math.abs(end - start + 1);
-            final int baseSize = safeDivide(totalNoOfElements, stepSize);
-            final int toCollect =
-                    (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : 
baseSize;
+            final long stepSize = runtimeContext.getNumberOfParallelSubtasks();
+            InternalState state = new InternalState(taskIdx, stepSize, start + 
taskIdx);
+            internalStates.add(state);
+        }
+    }
 
-            for (long collected = 0; collected < toCollect; collected++) {
-                this.valuesToEmit.add(collected * stepSize + congruence);
+    public Long nextValue() {
+        Iterator<InternalState> iterator = internalStates.iterator();

Review Comment:
   > * Avoid performance skew after reducing parallelism.  Currently, going 
from N tasks to N-1 tasks will double the length of time it takes, since one 
task will have twice as many values to emit as the others.  This is a 
undesirable consequence of this PR, but can probably also be discussed and 
implemented in a subsequent step.
   
   Yes, I noticed this problem, but I haven't thought of a better solution.
   
   But I have an idea, that is, for each subtask, we let him generate more 
InternalState, and the general process is described as follows:
   
   Assuming that the parallelism is 5 and sending 100 pieces of data, we let 
each subtaks be logically divided into multiple InternalStates
   The current InternalState is as follows
   ```java
   InternalState{
            long stepSize;
            long nextValue;
   }
   ```
   When subtask-0 is initialized: only one `InternalState(5,0)` will be 
initialized;
   
   
   So my idea is to add an end to indicate the end value of the current 
InternalState
   ```java
   InternalState{
            long stepSize;
            long nextValue;
            long end;
   }
   ```
   Cutting when subtask-0 is initialized produces two: `InternalState(5,0,50)`, 
 `InternalState(5,0,100)`;
   Of course, it doesn’t have to be two, it can be multiple, so that when the 
parallelism is readjusted, one task can avoid processing multiple 
InternalStates.
   @XComp @RyanSkraba  WDYT?
   
   At that time, I will raise a jira to track this issue, and then we will 
discuss this issue.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to