[ 
https://issues.apache.org/jira/browse/BEAM-10114?focusedWorklogId=557711&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-557711
 ]

ASF GitHub Bot logged work on BEAM-10114:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Feb/21 03:48
            Start Date: 25/Feb/21 03:48
    Worklog Time Spent: 10m 
      Work Description: boyuanzz commented on a change in pull request #14069:
URL: https://github.com/apache/beam/pull/14069#discussion_r582514995



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java
##########
@@ -81,15 +84,22 @@ public ProcessContinuation processElement(
           .lastClaimed()
           .ifPresent(
               lastClaimedOffset ->
-                  finalizer.afterBundleCommit(
-                      Instant.ofEpochMilli(Long.MAX_VALUE),
-                      () -> {
-                        Committer committer = 
committerFactory.apply(subscriptionPartition);
-                        committer.startAsync().awaitRunning();
-                        // Commit the next-to-deliver offset.
-                        
committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get();
-                        committer.stopAsync().awaitTerminated();
-                      }));
+              /* TODO(boyuanzz): When default dataflow can use finalizers, 
undo this.
+              finalizer.afterBundleCommit(
+                  Instant.ofEpochMilli(Long.MAX_VALUE),
+                  () -> */ {
+                Committer committer = 
committerFactory.apply(subscriptionPartition);
+                committer.startAsync().awaitRunning();
+                // Commit the next-to-deliver offset.
+                try {
+                  committer.commitOffset(Offset.of(lastClaimedOffset.value() + 
1)).get();

Review comment:
       For both 1&2, as long as the committing offset doesn't means we are no 
longer able to read from that offset, I think that would be fine.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 557711)
    Time Spent: 24h 10m  (was: 24h)

> Add Pub/Sub Lite IO to beam builtin
> -----------------------------------
>
>                 Key: BEAM-10114
>                 URL: https://issues.apache.org/jira/browse/BEAM-10114
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-gcp
>            Reporter: Daniel Collins
>            Priority: P3
>          Time Spent: 24h 10m
>  Remaining Estimate: 0h
>
> The IO currently lives [on the pubsub lite 
> github|[https://github.com/googleapis/java-pubsublite/tree/master/pubsublite-beam-io]]
>  but should be moved to being part of beam.
> As of release 2.28.0: The version in the beam repo is currently nonfunctional 
> on dataflow. Install and use com.google.cloud.pubsublite.beam.PubsubLiteIO, 
> found on the linked github repo.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to