This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4da60251729 Stop emitting upon truncate in Java PeriodicSequence 
(#25716)
4da60251729 is described below

commit 4da602517292adcd9ffcd7cc0acb8b0c1155aa02
Author: Yi Hu <[email protected]>
AuthorDate: Fri Mar 3 19:04:17 2023 -0500

    Stop emitting upon truncate in Java PeriodicSequence (#25716)
---
 .../java/org/apache/beam/sdk/transforms/PeriodicSequence.java     | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
index 6f0aeac859c..491b2efa078 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
@@ -194,6 +194,12 @@ public class PeriodicSequence
       return new WatermarkEstimators.Manual(state);
     }
 
+    @TruncateRestriction
+    public RestrictionTracker.TruncateResult<OffsetRange> truncate() {
+      // stop emitting immediately upon drain
+      return null;
+    }
+
     @ProcessElement
     public ProcessContinuation processElement(
         @Element SequenceDefinition srcElement,
@@ -207,7 +213,7 @@ public class PeriodicSequence
 
       boolean claimSuccess = true;
 
-      estimator.setWatermark(Instant.ofEpochMilli(restriction.getFrom()));
+      estimator.setWatermark(Instant.ofEpochMilli(nextOutput));
 
       while (claimSuccess && Instant.ofEpochMilli(nextOutput).isBeforeNow()) {
         claimSuccess = restrictionTracker.tryClaim(nextOutput);

Reply via email to