mxm commented on a change in pull request #11478:
URL: https://github.com/apache/beam/pull/11478#discussion_r412796641
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
##########
@@ -143,15 +174,15 @@ public void checkpoint(long checkpointId) throws
Exception {
// We are about to get checkpointed. The elements buffered thus far
// have to be added to the global CheckpointElement state which will
// be used to emit elements later when this checkpoint is acknowledged.
- addToBeAcknowledgedCheckpoint(checkpointId, currentStateId);
- currentStateId = generateNewId();
- currentBufferingElementsHandler =
bufferingElementsHandlerFactory.get(currentStateId);
+ addToBeAcknowledgedCheckpoint(checkpointId, getStateIndex());
+ int newStateIndex = rotateAndGetStateIndex();
+ currentBufferingElementsHandler =
bufferingElementsHandlerFactory.get(newStateIndex);
}
/** Should be called when a checkpoint is completed. */
public void checkpointCompleted(long checkpointId) throws Exception {
Review comment:
It's good that you bring this up because I wanted to add a comment to
explain this. Since every checkpoint runs independently we are not guaranteed
to complete in sequential order. In case a later checkpoint completes before an
earlier one, we can safely process the elements of the earlier one. This method
is guaranteed not to overlap because Flink uses a single thread to execute
async task calls lie (a) triggering a checkpoint (b) completing a checkpoint.
When an earlier checkpoint completes after a later one has already completed,
we will do nothing here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]