dpcollins-google commented on a change in pull request #14069:
URL: https://github.com/apache/beam/pull/14069#discussion_r582400673



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java
##########
@@ -81,15 +84,22 @@ public ProcessContinuation processElement(
           .lastClaimed()
           .ifPresent(
               lastClaimedOffset ->
-                  finalizer.afterBundleCommit(
-                      Instant.ofEpochMilli(Long.MAX_VALUE),
-                      () -> {
-                        Committer committer = 
committerFactory.apply(subscriptionPartition);
-                        committer.startAsync().awaitRunning();
-                        // Commit the next-to-deliver offset.
-                        
committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get();
-                        committer.stopAsync().awaitTerminated();
-                      }));
+              /* TODO(boyuanzz): When default dataflow can use finalizers, 
undo this.
+              finalizer.afterBundleCommit(
+                  Instant.ofEpochMilli(Long.MAX_VALUE),
+                  () -> */ {
+                Committer committer = 
committerFactory.apply(subscriptionPartition);
+                committer.startAsync().awaitRunning();
+                // Commit the next-to-deliver offset.
+                try {
+                  committer.commitOffset(Offset.of(lastClaimedOffset.value() + 
1)).get();

Review comment:
       1) I don't know that there's a way to prevent reprocessing without 
finalizer. Even the kafka method (commits in a PTransform afterwards) can do 
this, as PCollections are unordered so you can potentially (even if unlikely) 
commit in opposite orders, leading to backwards commits. This, if I understand 
the execution model correctly, can lead to duplicate commits only in the case 
of an internal dataflow failure, in which case there is spurious progress.
   
   2) For "Pipeline crashes", do you mean the entire dataflow job can fail or 
the downstream processing just does `throw new RuntimeException()`? In the 
second, doesn't BundleFinalizer also have this issue? If the first... I'm not 
sure its worth protecting something in the runner against runner incorrectness.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to