XComp commented on code in PR #22010:
URL: https://github.com/apache/flink/pull/22010#discussion_r1204355579
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -65,33 +71,46 @@ public void open(
this.checkpointedState == null,
"The " + getClass().getSimpleName() + " has already been
initialized.");
- this.checkpointedState =
- context.getOperatorStateStore()
- .getListState(
- new ListStateDescriptor<>(
- name + "-sequence-state",
LongSerializer.INSTANCE));
- this.valuesToEmit = new ArrayDeque<>();
- if (context.isRestored()) {
- // upon restoring
+ ListStateDescriptor<InternalState> stateDescriptor =
+ new ListStateDescriptor<>(
+ name + "-sequence-state",
TypeInformation.of(InternalState.class));
+ this.checkpointedState =
context.getOperatorStateStore().getListState(stateDescriptor);
+ this.internalStates = Lists.newArrayList();
- for (Long v : this.checkpointedState.get()) {
- this.valuesToEmit.add(v);
- }
+ if (context.isRestored()) {
+ checkpointedState.get().forEach(state ->
internalStates.add(state));
} else {
- // the first time the job is executed
- final int stepSize = runtimeContext.getNumberOfParallelSubtasks();
+ // The first time the job is executed.
final int taskIdx = runtimeContext.getIndexOfThisSubtask();
- final long congruence = start + taskIdx;
-
- long totalNoOfElements = Math.abs(end - start + 1);
- final int baseSize = safeDivide(totalNoOfElements, stepSize);
- final int toCollect =
- (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 :
baseSize;
+ final long stepSize = runtimeContext.getNumberOfParallelSubtasks();
+ InternalState state = new InternalState(taskIdx, stepSize, start +
taskIdx);
Review Comment:
It's not that easy to grasp why we use `taskId` in this context. Could we
come up with a variable name that describes the purpose of it in a better way
(tbh, I'm struggling to come up with a proposal here).
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -65,33 +71,46 @@ public void open(
this.checkpointedState == null,
"The " + getClass().getSimpleName() + " has already been
initialized.");
- this.checkpointedState =
- context.getOperatorStateStore()
- .getListState(
- new ListStateDescriptor<>(
- name + "-sequence-state",
LongSerializer.INSTANCE));
- this.valuesToEmit = new ArrayDeque<>();
- if (context.isRestored()) {
- // upon restoring
+ ListStateDescriptor<InternalState> stateDescriptor =
+ new ListStateDescriptor<>(
+ name + "-sequence-state",
TypeInformation.of(InternalState.class));
+ this.checkpointedState =
context.getOperatorStateStore().getListState(stateDescriptor);
+ this.internalStates = Lists.newArrayList();
- for (Long v : this.checkpointedState.get()) {
- this.valuesToEmit.add(v);
- }
+ if (context.isRestored()) {
+ checkpointedState.get().forEach(state ->
internalStates.add(state));
} else {
- // the first time the job is executed
- final int stepSize = runtimeContext.getNumberOfParallelSubtasks();
+ // The first time the job is executed.
final int taskIdx = runtimeContext.getIndexOfThisSubtask();
- final long congruence = start + taskIdx;
-
- long totalNoOfElements = Math.abs(end - start + 1);
- final int baseSize = safeDivide(totalNoOfElements, stepSize);
- final int toCollect =
- (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 :
baseSize;
+ final long stepSize = runtimeContext.getNumberOfParallelSubtasks();
+ InternalState state = new InternalState(taskIdx, stepSize, start +
taskIdx);
Review Comment:
Why do we pass the `taskIdx` as a dedicated parameter? It doesn't look like
we're using it anywhere? :thinking:
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -65,33 +71,46 @@ public void open(
this.checkpointedState == null,
"The " + getClass().getSimpleName() + " has already been
initialized.");
- this.checkpointedState =
- context.getOperatorStateStore()
- .getListState(
- new ListStateDescriptor<>(
- name + "-sequence-state",
LongSerializer.INSTANCE));
- this.valuesToEmit = new ArrayDeque<>();
- if (context.isRestored()) {
- // upon restoring
+ ListStateDescriptor<InternalState> stateDescriptor =
+ new ListStateDescriptor<>(
+ name + "-sequence-state",
TypeInformation.of(InternalState.class));
+ this.checkpointedState =
context.getOperatorStateStore().getListState(stateDescriptor);
+ this.internalStates = Lists.newArrayList();
- for (Long v : this.checkpointedState.get()) {
- this.valuesToEmit.add(v);
- }
+ if (context.isRestored()) {
+ checkpointedState.get().forEach(state ->
internalStates.add(state));
} else {
- // the first time the job is executed
- final int stepSize = runtimeContext.getNumberOfParallelSubtasks();
+ // The first time the job is executed.
final int taskIdx = runtimeContext.getIndexOfThisSubtask();
- final long congruence = start + taskIdx;
-
- long totalNoOfElements = Math.abs(end - start + 1);
- final int baseSize = safeDivide(totalNoOfElements, stepSize);
- final int toCollect =
- (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 :
baseSize;
+ final long stepSize = runtimeContext.getNumberOfParallelSubtasks();
+ InternalState state = new InternalState(taskIdx, stepSize, start +
taskIdx);
+ internalStates.add(state);
+ }
+ }
- for (long collected = 0; collected < toCollect; collected++) {
- this.valuesToEmit.add(collected * stepSize + congruence);
+ public Long nextValue() {
+ Iterator<InternalState> iterator = internalStates.iterator();
+ if (iterator.hasNext()) {
+ InternalState state = iterator.next();
+ long currentValue = state.nextValue;
+ try {
+ state.nextValue = Math.addExact(currentValue, state.stepSize);
+ // All sequence values are cleared from the stateList after
they have been sent.
+ if (state.nextValue > this.end) {
+ iterator.remove();
+ }
+ } catch (ArithmeticException e) {
+ // When the limit is exceeded, it means that the data has been
sent and needs to
+ // be cleared from the state.
+ iterator.remove();
}
+
+ return currentValue;
}
+
+ // Before calling nextValue method, you should call hasNext to check.
+ throw new IllegalStateException(
Review Comment:
The `nextValue` method is called by `Iterator.next()`. The `Iterator`'s
`next()` method has a clear contract for this kind of situation which states
that a `NoSuchElementException` should be thrown. We might want to follow that,
if I'm not mistaken.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -65,33 +71,46 @@ public void open(
this.checkpointedState == null,
"The " + getClass().getSimpleName() + " has already been
initialized.");
- this.checkpointedState =
- context.getOperatorStateStore()
- .getListState(
- new ListStateDescriptor<>(
- name + "-sequence-state",
LongSerializer.INSTANCE));
- this.valuesToEmit = new ArrayDeque<>();
- if (context.isRestored()) {
- // upon restoring
+ ListStateDescriptor<InternalState> stateDescriptor =
+ new ListStateDescriptor<>(
+ name + "-sequence-state",
TypeInformation.of(InternalState.class));
+ this.checkpointedState =
context.getOperatorStateStore().getListState(stateDescriptor);
+ this.internalStates = Lists.newArrayList();
- for (Long v : this.checkpointedState.get()) {
- this.valuesToEmit.add(v);
- }
+ if (context.isRestored()) {
+ checkpointedState.get().forEach(state ->
internalStates.add(state));
} else {
- // the first time the job is executed
- final int stepSize = runtimeContext.getNumberOfParallelSubtasks();
+ // The first time the job is executed.
final int taskIdx = runtimeContext.getIndexOfThisSubtask();
- final long congruence = start + taskIdx;
-
- long totalNoOfElements = Math.abs(end - start + 1);
- final int baseSize = safeDivide(totalNoOfElements, stepSize);
- final int toCollect =
- (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 :
baseSize;
+ final long stepSize = runtimeContext.getNumberOfParallelSubtasks();
+ InternalState state = new InternalState(taskIdx, stepSize, start +
taskIdx);
+ internalStates.add(state);
+ }
+ }
- for (long collected = 0; collected < toCollect; collected++) {
- this.valuesToEmit.add(collected * stepSize + congruence);
+ public Long nextValue() {
+ Iterator<InternalState> iterator = internalStates.iterator();
Review Comment:
When do we have the case that there is more than one state in the
`internalStates`? Is that when we're reducing the parallelism and multiple
states get picked up by a single operator? Is it enough to always select the
first `InternalState`? It feels to me that we're processing one state first
until we reach the end and would then process the next state which has a way
lower number again? :thinking:
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -42,9 +44,13 @@
private final long start;
private final long end;
+ /**
+ * Save the intermediate state of the data to be sent by the current
subtask,when the state
+ * restore, the sequence values continue to be sent based on the
intermediate state.
+ */
+ private transient ArrayList<InternalState> internalStates;
Review Comment:
```suggestion
private transient Queue<InternalState> internalStates;
```
nit: Wouldn't `Queue` be the better interface? :thinking:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]