This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4da60251729 Stop emitting upon truncate in Java PeriodicSequence
(#25716)
4da60251729 is described below
commit 4da602517292adcd9ffcd7cc0acb8b0c1155aa02
Author: Yi Hu <[email protected]>
AuthorDate: Fri Mar 3 19:04:17 2023 -0500
Stop emitting upon truncate in Java PeriodicSequence (#25716)
---
.../java/org/apache/beam/sdk/transforms/PeriodicSequence.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
index 6f0aeac859c..491b2efa078 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
@@ -194,6 +194,12 @@ public class PeriodicSequence
return new WatermarkEstimators.Manual(state);
}
+ @TruncateRestriction
+ public RestrictionTracker.TruncateResult<OffsetRange> truncate() {
+ // stop emitting immediately upon drain
+ return null;
+ }
+
@ProcessElement
public ProcessContinuation processElement(
@Element SequenceDefinition srcElement,
@@ -207,7 +213,7 @@ public class PeriodicSequence
boolean claimSuccess = true;
- estimator.setWatermark(Instant.ofEpochMilli(restriction.getFrom()));
+ estimator.setWatermark(Instant.ofEpochMilli(nextOutput));
while (claimSuccess && Instant.ofEpochMilli(nextOutput).isBeforeNow()) {
claimSuccess = restrictionTracker.tryClaim(nextOutput);