RyanSkraba commented on code in PR #22010:
URL: https://github.com/apache/flink/pull/22010#discussion_r1191359995
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -65,33 +72,46 @@ public void open(
this.checkpointedState == null,
"The " + getClass().getSimpleName() + " has already been
initialized.");
- this.checkpointedState =
- context.getOperatorStateStore()
- .getListState(
- new ListStateDescriptor<>(
- name + "-sequence-state",
LongSerializer.INSTANCE));
- this.valuesToEmit = new ArrayDeque<>();
- if (context.isRestored()) {
- // upon restoring
+ ListStateDescriptor<InternalState> stateDescriptor =
+ new ListStateDescriptor<>(
+ name + "-sequence-state",
TypeInformation.of(InternalState.class));
+ this.checkpointedState =
context.getOperatorStateStore().getListState(stateDescriptor);
+ this.internalStates = Lists.newArrayList();
- for (Long v : this.checkpointedState.get()) {
- this.valuesToEmit.add(v);
- }
+ totalNoOfElements = Math.abs(end - start + 1);
+ if (context.isRestored()) {
+ checkpointedState.get().forEach(state ->
internalStates.add(state));
} else {
// the first time the job is executed
- final int stepSize = runtimeContext.getNumberOfParallelSubtasks();
final int taskIdx = runtimeContext.getIndexOfThisSubtask();
- final long congruence = start + taskIdx;
+ final long stepSize = runtimeContext.getNumberOfParallelSubtasks();
+ internalStates.add(new InternalState(0, taskIdx, stepSize));
+ }
+ }
- long totalNoOfElements = Math.abs(end - start + 1);
- final int baseSize = safeDivide(totalNoOfElements, stepSize);
- final int toCollect =
- (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 :
baseSize;
+ private long toCollect(long baseSize, long stepSize, int taskIdx) {
+ return (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 :
baseSize;
+ }
- for (long collected = 0; collected < toCollect; collected++) {
- this.valuesToEmit.add(collected * stepSize + congruence);
+ public Long nextValue() {
+ Iterator<InternalState> iterator = internalStates.iterator();
+ if (iterator.hasNext()) {
+ InternalState state = iterator.next();
+ long nextSequence = state.collected * state.stepSize + (start +
state.taskId);
+ state.collected++;
Review Comment:
If we are updating the current state at every iteration anyway: instead of
incrementing `collected`, why not just retain the value in the current state?
It's easy to calculate the *next* (just add stepSize) and you know to drop the
state from the list if it goes over `this.end`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]