dpcollins-google commented on a change in pull request #14069:
URL: https://github.com/apache/beam/pull/14069#discussion_r582400673
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java
##########
@@ -81,15 +84,22 @@ public ProcessContinuation processElement(
.lastClaimed()
.ifPresent(
lastClaimedOffset ->
- finalizer.afterBundleCommit(
- Instant.ofEpochMilli(Long.MAX_VALUE),
- () -> {
- Committer committer =
committerFactory.apply(subscriptionPartition);
- committer.startAsync().awaitRunning();
- // Commit the next-to-deliver offset.
-
committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get();
- committer.stopAsync().awaitTerminated();
- }));
+ /* TODO(boyuanzz): When default dataflow can use finalizers,
undo this.
+ finalizer.afterBundleCommit(
+ Instant.ofEpochMilli(Long.MAX_VALUE),
+ () -> */ {
+ Committer committer =
committerFactory.apply(subscriptionPartition);
+ committer.startAsync().awaitRunning();
+ // Commit the next-to-deliver offset.
+ try {
+ committer.commitOffset(Offset.of(lastClaimedOffset.value() +
1)).get();
Review comment:
1) I don't know that there's a way to prevent reprocessing without
finalizer. Even the kafka method (commits in a PTransform afterwards) can do
this, as PCollections are unordered so you can potentially (even if unlikely)
commit in opposite orders, leading to backwards commits. This, if I understand
the execution model correctly, can lead to duplicate commits only in the case
of an internal dataflow failure, in which case there is spurious progress.
2) For "Pipeline crashes", do you mean the entire dataflow job can fail or
the downstream processing just does `throw new RuntimeException()`? In the
second, doesn't BundleFinalizer also have this issue? If the first... I'm not
sure its worth protecting something in the runner against runner incorrectness.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]