XComp commented on code in PR #22010:
URL: https://github.com/apache/flink/pull/22010#discussion_r1257913768
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -189,8 +206,28 @@ public static SequenceGenerator<String>
stringGenerator(long start, long end) {
return new SequenceGenerator<String>(start, end) {
@Override
public String next() {
- return valuesToEmit.poll().toString();
+ return nextValue().toString();
}
};
}
+
+ /**
+ * The internal state of the sequence generator, which is used to record
the latest state of the
+ * sequence value sent by the current sequence generator. When recovering
from the state, it is
+ * guaranteed to continue sending the sequence value from the latest state.
+ */
+ private static class InternalState implements Comparable<InternalState> {
Review Comment:
What's your opinion on renaming this internal class into `SubTaskState`?
`InternalState` sounds too generic. :thinking:
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -65,33 +80,45 @@ public void open(
this.checkpointedState == null,
"The " + getClass().getSimpleName() + " has already been
initialized.");
- this.checkpointedState =
- context.getOperatorStateStore()
- .getListState(
- new ListStateDescriptor<>(
- name + "-sequence-state",
LongSerializer.INSTANCE));
- this.valuesToEmit = new ArrayDeque<>();
- if (context.isRestored()) {
- // upon restoring
+ ListStateDescriptor<InternalState> stateDescriptor =
Review Comment:
```suggestion
final ListStateDescriptor<InternalState> stateDescriptor =
```
There are a few other code locations where we could make use of the `final`
keyword for local variables.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -22,29 +22,44 @@
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Queues;
+
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
-import java.util.ArrayDeque;
-import java.util.Deque;
+import java.util.ArrayList;
+import java.util.NoSuchElementException;
+import java.util.Queue;
/**
* A stateful, re-scalable {@link DataGenerator} that emits each number from a
given interval
* exactly once, possibly in parallel.
+ *
+ * <p>It maintains a state internally to record the position of the current
subtask sending
+ * sequence. When the task resumes, it will continue to send the sequence
value according to the
+ * position sent by the state, until all the sequences have been sent.
+ *
+ * <p><b>IMPORTANT NOTE: </b> When the degree of parallelism increases, there
may be cases where
+ * subtasks are running empty. When the degree of parallelism decreases, there
may be cases where
+ * one subtask handles multiple states.
Review Comment:
Shall we also add that decreasing the parallelism might result in an
out-of-orderness of the sequence?
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -65,33 +80,45 @@ public void open(
this.checkpointedState == null,
"The " + getClass().getSimpleName() + " has already been
initialized.");
- this.checkpointedState =
- context.getOperatorStateStore()
- .getListState(
- new ListStateDescriptor<>(
- name + "-sequence-state",
LongSerializer.INSTANCE));
- this.valuesToEmit = new ArrayDeque<>();
- if (context.isRestored()) {
- // upon restoring
+ ListStateDescriptor<InternalState> stateDescriptor =
+ new ListStateDescriptor<>(
+ name + "-sequence-state",
TypeInformation.of(InternalState.class));
+ this.checkpointedState =
context.getOperatorStateStore().getListState(stateDescriptor);
+ this.internalStates = Queues.newPriorityQueue();
- for (Long v : this.checkpointedState.get()) {
- this.valuesToEmit.add(v);
- }
+ if (context.isRestored()) {
+ checkpointedState.get().forEach(state ->
internalStates.offer(state));
} else {
- // the first time the job is executed
- final int stepSize = runtimeContext.getNumberOfParallelSubtasks();
- final int taskIdx = runtimeContext.getIndexOfThisSubtask();
- final long congruence = start + taskIdx;
-
- long totalNoOfElements = Math.abs(end - start + 1);
- final int baseSize = safeDivide(totalNoOfElements, stepSize);
- final int toCollect =
- (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 :
baseSize;
-
- for (long collected = 0; collected < toCollect; collected++) {
- this.valuesToEmit.add(collected * stepSize + congruence);
+ // The first time the job is executed.
+ final int startOffset = runtimeContext.getIndexOfThisSubtask();
+ final long stepSize = runtimeContext.getNumberOfParallelSubtasks();
+ InternalState state = new InternalState(stepSize, start +
startOffset);
+ internalStates.offer(state);
+ }
+ }
+
+ public Long nextValue() {
+ if (internalStates.isEmpty()) {
+ // Before calling nextValue method, you should call hasNext to
check.
Review Comment:
```suggestion
```
This comment doesn't add any value, don't you think? :thinking: If you want
to give this hint to the developer, you might want to add it to the error
message. Otherwise, you're expecting the developer to do a code investigation
to find the hint of this comment.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -22,29 +22,44 @@
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Queues;
Review Comment:
```suggestion
import org.apache.flink.shaded.guava31.com.google.common.collect.Queues;
```
This won't work anymore because the `flink-shaded` dependencies were
upgraded (FLINK-32032). Rebasing the branch might reveal other affected code
locations.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -65,33 +80,45 @@ public void open(
this.checkpointedState == null,
"The " + getClass().getSimpleName() + " has already been
initialized.");
- this.checkpointedState =
- context.getOperatorStateStore()
- .getListState(
- new ListStateDescriptor<>(
- name + "-sequence-state",
LongSerializer.INSTANCE));
- this.valuesToEmit = new ArrayDeque<>();
- if (context.isRestored()) {
- // upon restoring
+ ListStateDescriptor<InternalState> stateDescriptor =
+ new ListStateDescriptor<>(
+ name + "-sequence-state",
TypeInformation.of(InternalState.class));
+ this.checkpointedState =
context.getOperatorStateStore().getListState(stateDescriptor);
+ this.internalStates = Queues.newPriorityQueue();
Review Comment:
```suggestion
this.internalStates = Queues.newPriorityQueue();
```
Can't we make `internalStates` a `final` field? We don't need to
re-instantiate it. I'm then wondering whether we could add a Precondition that
checks here that the queue is empty when calling the `open` method or whether
there are situations where the queue could still have entries (in which case we
would have to call `clear()`)?
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -22,29 +22,44 @@
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Queues;
+
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
-import java.util.ArrayDeque;
-import java.util.Deque;
+import java.util.ArrayList;
+import java.util.NoSuchElementException;
+import java.util.Queue;
/**
* A stateful, re-scalable {@link DataGenerator} that emits each number from a
given interval
* exactly once, possibly in parallel.
+ *
+ * <p>It maintains a state internally to record the position of the current
subtask sending
+ * sequence. When the task resumes, it will continue to send the sequence
value according to the
+ * position sent by the state, until all the sequences have been sent.
+ *
+ * <p><b>IMPORTANT NOTE: </b> When the degree of parallelism increases, there
may be cases where
+ * subtasks are running empty. When the degree of parallelism decreases, there
may be cases where
+ * one subtask handles multiple states.
*/
@Experimental
public abstract class SequenceGenerator<T> implements DataGenerator<T> {
private final long start;
private final long end;
+ /**
+ * Save the intermediate state of the data to be sent by the current
subtask,when the state
Review Comment:
```suggestion
* Save the intermediate state of the data to be sent by the current
subtask, when the state
```
nit
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -189,8 +206,28 @@ public static SequenceGenerator<String>
stringGenerator(long start, long end) {
return new SequenceGenerator<String>(start, end) {
@Override
public String next() {
- return valuesToEmit.poll().toString();
+ return nextValue().toString();
}
};
}
+
+ /**
+ * The internal state of the sequence generator, which is used to record
the latest state of the
Review Comment:
```suggestion
* The internal state of the sequence generator's subtask(s), which is
used to record the latest state of the
```
Is this more specific?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]