lukecwik commented on a change in pull request #12706:
URL: https://github.com/apache/beam/pull/12706#discussion_r479380211



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -905,6 +928,19 @@ public final void notifyCheckpointComplete(long 
checkpointId) throws Exception {
       bufferingDoFnRunner.checkpointCompleted(checkpointId);
     }
 
+    List<InMemoryBundleFinalizer.Finalization> finalizations =
+        pendingFinalizations.get(checkpointId);
+    if (finalizations != null) {
+      // remove old finalizations except for the current one
+      pendingFinalizations.clear();
+      pendingFinalizations.put(checkpointId, finalizations);
+
+      // confirm all finalizations
+      for (InMemoryBundleFinalizer.Finalization finalization : finalizations) {
+        finalization.getCallback().onBundleSuccess();

Review comment:
       Between two checkpoint boundaries, the DoFn may have requested multiple 
finalizations (e.g. one per element processed) each having a different side 
effect.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to