Abacn commented on code in PR #28272:
URL: https://github.com/apache/beam/pull/28272#discussion_r1316438641


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java:
##########
@@ -34,28 +37,57 @@
  */
 public class PeriodicImpulse extends PTransform<PBegin, PCollection<Instant>> {
 
-  Instant startTimestamp = Instant.now();
-  Instant stopTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
-  Duration fireInterval = Duration.standardMinutes(1);
+  Instant startTimestamp;
+  Instant stopTimestamp;
+  @Nullable Duration stopDuration;
+  Duration fireInterval;
   boolean applyWindowing = false;
   boolean catchUpToNow = true;
 
-  private PeriodicImpulse() {}
+  private PeriodicImpulse() {
+    this.startTimestamp = Instant.now();
+    this.stopTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
+    this.fireInterval = Duration.standardMinutes(1);
+  }
 
   public static PeriodicImpulse create() {
     return new PeriodicImpulse();
   }
 
+  /**
+   * Assign a timestamp when the pipeliene starts to produce data.
+   *
+   * <p>Cannot be used along with {@link #stopAfter}.
+   */
   public PeriodicImpulse startAt(Instant startTime) {
+    checkArgument(stopDuration == null, "startAt and stopAfter cannot be set 
at the same time");
     this.startTimestamp = startTime;
     return this;
   }
 
+  /**
+   * Assign a timestamp when the pipeliene stops producing data.
+   *
+   * <p>Cannot be used along with {@link #stopAfter}.
+   */
   public PeriodicImpulse stopAt(Instant stopTime) {
+    checkArgument(stopDuration == null, "stopAt and stopAfter cannot be set at 
the same time");
     this.stopTimestamp = stopTime;
     return this;
   }
 
+  /**
+   * <b><i>For internal use only; no backwards-compatibility 
guarantees.</i></b>
+   *
+   * <p>Assign a time interval at which the pipeliene produces data. This is 
different from setting
+   * {@link #startAt} and {@link #stopAt}, as the first timestamp is 
determined at run time
+   * (pipeline starts processing).
+   */
+  public PeriodicImpulse stopAfter(Duration duration) {

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to