[ 
https://issues.apache.org/jira/browse/BEAM-10114?focusedWorklogId=557625&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-557625
 ]

ASF GitHub Bot logged work on BEAM-10114:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Feb/21 00:20
            Start Date: 25/Feb/21 00:20
    Worklog Time Spent: 10m 
      Work Description: dpcollins-google commented on a change in pull request 
#14069:
URL: https://github.com/apache/beam/pull/14069#discussion_r582400673



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java
##########
@@ -81,15 +84,22 @@ public ProcessContinuation processElement(
           .lastClaimed()
           .ifPresent(
               lastClaimedOffset ->
-                  finalizer.afterBundleCommit(
-                      Instant.ofEpochMilli(Long.MAX_VALUE),
-                      () -> {
-                        Committer committer = 
committerFactory.apply(subscriptionPartition);
-                        committer.startAsync().awaitRunning();
-                        // Commit the next-to-deliver offset.
-                        
committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get();
-                        committer.stopAsync().awaitTerminated();
-                      }));
+              /* TODO(boyuanzz): When default dataflow can use finalizers, 
undo this.
+              finalizer.afterBundleCommit(
+                  Instant.ofEpochMilli(Long.MAX_VALUE),
+                  () -> */ {
+                Committer committer = 
committerFactory.apply(subscriptionPartition);
+                committer.startAsync().awaitRunning();
+                // Commit the next-to-deliver offset.
+                try {
+                  committer.commitOffset(Offset.of(lastClaimedOffset.value() + 
1)).get();

Review comment:
       1) I don't know that there's a way to prevent reprocessing without 
finalizer. Even the kafka method (commits in a PTransform afterwards) can do 
this, as PCollections are unordered so you can potentially (even if unlikely) 
commit in opposite orders, leading to backwards commits. This, if I understand 
the execution model correctly, can lead to duplicate commits only in the case 
of an internal dataflow failure, in which case there is spurious progress.
   
   2) For "Pipeline crashes", do you mean the entire dataflow job can fail or 
the downstream processing just does `throw new RuntimeException()`? In the 
second, doesn't BundleFinalizer also have this issue? If the first... I'm not 
sure its worth protecting something in the runner against runner incorrectness.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 557625)
    Time Spent: 24h  (was: 23h 50m)

> Add Pub/Sub Lite IO to beam builtin
> -----------------------------------
>
>                 Key: BEAM-10114
>                 URL: https://issues.apache.org/jira/browse/BEAM-10114
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-gcp
>            Reporter: Daniel Collins
>            Priority: P3
>          Time Spent: 24h
>  Remaining Estimate: 0h
>
> The IO currently lives [on the pubsub lite 
> github|[https://github.com/googleapis/java-pubsublite/tree/master/pubsublite-beam-io]]
>  but should be moved to being part of beam.
> As of release 2.28.0: The version in the beam repo is currently nonfunctional 
> on dataflow. Install and use com.google.cloud.pubsublite.beam.PubsubLiteIO, 
> found on the linked github repo.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to