mxm commented on a change in pull request #11478:
URL: https://github.com/apache/beam/pull/11478#discussion_r412796641



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
##########
@@ -143,15 +174,15 @@ public void checkpoint(long checkpointId) throws 
Exception {
     // We are about to get checkpointed. The elements buffered thus far
     // have to be added to the global CheckpointElement state which will
     // be used to emit elements later when this checkpoint is acknowledged.
-    addToBeAcknowledgedCheckpoint(checkpointId, currentStateId);
-    currentStateId = generateNewId();
-    currentBufferingElementsHandler = 
bufferingElementsHandlerFactory.get(currentStateId);
+    addToBeAcknowledgedCheckpoint(checkpointId, getStateIndex());
+    int newStateIndex = rotateAndGetStateIndex();
+    currentBufferingElementsHandler = 
bufferingElementsHandlerFactory.get(newStateIndex);
   }
 
   /** Should be called when a checkpoint is completed. */
   public void checkpointCompleted(long checkpointId) throws Exception {

Review comment:
       It's good that you bring this up because I wanted to add a comment to 
explain this. Since every checkpoint runs independently we are not guaranteed 
to complete in sequential order. In case a later checkpoint completes before an 
earlier one, we can safely process the elements of the earlier one. This method 
is guaranteed not to overlap because Flink uses a single thread to execute 
async task calls lie (a) triggering a checkpoint (b) completing a checkpoint. 
When an earlier checkpoint completes after a later one has already completed, 
we will do nothing here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to