lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer 
encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402689183
 
 

 ##########
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +52,201 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code 
dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Collection<? extends BoundedWindow> windows,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, windows, false, fireTimestamp, 
holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag and 
windows. */
+  public static <T> Timer<T> cleared(
+      T userKey, String dynamicTimerTag, Collection<? extends BoundedWindow> 
windows) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, 
null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is {@code ""} when the 
timer is for a {@link
+   * TimerSpec}.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns the windows which is associated with the timer. */
+  public abstract Collection<? extends BoundedWindow> getWindows();
+
+  /** Returns whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field 
is nullable only when
+   * the timer is being cleared.
    *
-   * <p>The time is relative to the time domain defined in the {@link
+   * <p>The time is absolute to the time domain defined in the {@link
    * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is 
associated with this
    * timer.
    */
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field 
is nullable only when
+   * the timer is being cleared.
+   */
   @Nullable
-  public abstract T getPayload();
+  public abstract Instant getHoldTimestamp();
+
+  /**
+   * Returns the paneinfo that is related to the timer. This field is nullable 
only when the
+   * timer is being cleared.
+   */
+  @Nullable
+  public abstract PaneInfo getPane();
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof Timer)) {
+      return false;
+    }
+    Timer<?> that = (Timer<?>) other;
+    if (this.getClearBit()) {
+      return Objects.equals(this.getUserKey(), that.getUserKey())
+          && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+          && (this.getClearBit() == that.getClearBit())
+          && Objects.equals(this.getWindows(), that.getWindows());
+    }
+    return Objects.equals(this.getUserKey(), that.getUserKey())
+        && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+        && (this.getClearBit() == that.getClearBit())
+        && this.getFireTimestamp().equals(that.getFireTimestamp())
+        && this.getHoldTimestamp().equals(that.getHoldTimestamp())
+        && Objects.equals(this.getWindows(), that.getWindows())
+        && Objects.equals(this.getPane(), that.getPane());
+  }
+
+  @Override
+  public int hashCode() {
+    // Hash only the millis of the timestamp to be consistent with equals
+    if (getClearBit()) {
+      return Objects.hash(getUserKey(), getDynamicTimerTag(), getClearBit(), 
getWindows());
+    }
+    return Objects.hash(
+        getUserKey(),
+        getDynamicTimerTag(),
+        getClearBit(),
+        getFireTimestamp().getMillis(),
+        getHoldTimestamp().getMillis(),
+        getWindows(),
+        getPane());
+  }
 
   /**
    * A {@link org.apache.beam.sdk.coders.Coder} for timers.
    *
-   * <p>This coder is deterministic if the payload coder is deterministic.
+   * <p>This coder is deterministic if both the key coder and window coder are 
deterministic.
    *
-   * <p>This coder is inexpensive for size estimation of elements if the 
payload coder is
-   * inexpensive for size estimation.
+   * <p>This coder is inexpensive for size estimation of elements if the key 
coder is inexpensive
+   * for size estimation.
 
 Review comment:
   ```suggestion
      * <p>This coder is inexpensive for size estimation of elements if the key 
coder and window coder are inexpensive
      * for size estimation.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to