[ 
https://issues.apache.org/jira/browse/BEAM-10114?focusedWorklogId=557623&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-557623
 ]

ASF GitHub Bot logged work on BEAM-10114:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Feb/21 00:14
            Start Date: 25/Feb/21 00:14
    Worklog Time Spent: 10m 
      Work Description: boyuanzz commented on a change in pull request #14069:
URL: https://github.com/apache/beam/pull/14069#discussion_r582398133



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java
##########
@@ -81,15 +84,22 @@ public ProcessContinuation processElement(
           .lastClaimed()
           .ifPresent(
               lastClaimedOffset ->
-                  finalizer.afterBundleCommit(
-                      Instant.ofEpochMilli(Long.MAX_VALUE),
-                      () -> {
-                        Committer committer = 
committerFactory.apply(subscriptionPartition);
-                        committer.startAsync().awaitRunning();
-                        // Commit the next-to-deliver offset.
-                        
committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get();
-                        committer.stopAsync().awaitTerminated();
-                      }));
+              /* TODO(boyuanzz): When default dataflow can use finalizers, 
undo this.
+              finalizer.afterBundleCommit(
+                  Instant.ofEpochMilli(Long.MAX_VALUE),
+                  () -> */ {
+                Committer committer = 
committerFactory.apply(subscriptionPartition);
+                committer.startAsync().awaitRunning();
+                // Commit the next-to-deliver offset.
+                try {
+                  committer.commitOffset(Offset.of(lastClaimedOffset.value() + 
1)).get();

Review comment:
       There are several concerns around committing offset in the `DoFn`:
   
   - It's possible that Dataflow fails to process one element and re-process it 
again. In this case, one offset might be committed for more than once.
   - It's also possible that one offset is committed successfully but the 
pipeline crashes before downstream actually  consumes the output. That might 
result in data loss.
   
   If these concerns are not true for your source, then that's ok to do so.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 557623)
    Time Spent: 23h 50m  (was: 23h 40m)

> Add Pub/Sub Lite IO to beam builtin
> -----------------------------------
>
>                 Key: BEAM-10114
>                 URL: https://issues.apache.org/jira/browse/BEAM-10114
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-gcp
>            Reporter: Daniel Collins
>            Priority: P3
>          Time Spent: 23h 50m
>  Remaining Estimate: 0h
>
> The IO currently lives [on the pubsub lite 
> github|[https://github.com/googleapis/java-pubsublite/tree/master/pubsublite-beam-io]]
>  but should be moved to being part of beam.
> As of release 2.28.0: The version in the beam repo is currently nonfunctional 
> on dataflow. Install and use com.google.cloud.pubsublite.beam.PubsubLiteIO, 
> found on the linked github repo.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to