lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer 
encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402426932
 
 

 ##########
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +50,197 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given userKey, dynamicTimerTag, 
fireTimestamp,
+   * holdTimestamp, windows and pane.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, false, fireTimestamp, holdTimestamp, 
windows, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag. */
+  public static <T> Timer<T> cleared(T userKey, String dynamicTimerTag) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, true, null, null, 
null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is empty string only 
when the timer is set
+   * from TimerSpec.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns a boolean which indicate whether the timer is going to be 
cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field 
is nullable only when
+   * the clearBit is true.
    *
    * <p>The time is relative to the time domain defined in the {@link
    * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is 
associated with this
    * timer.
    */
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field 
is nullable only when
+   * the clearBit is true.
+   */
   @Nullable
-  public abstract T getPayload();
+  public abstract Instant getHoldTimestamp();
+
+  /**
+   * Returns the windows which is associated with the timer. This field is 
nullable only when the
+   * clearBit is true.
+   */
+  @Nullable
+  public abstract Collection<? extends BoundedWindow> getWindows();
+
+  /**
+   * Returns the paneinfo that is related to the timer. This field is nullable 
only when the
+   * clearBit is true.
+   */
+  @Nullable
+  public abstract PaneInfo getPane();
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof Timer)) {
+      return false;
+    } else {
+      Timer<?> that = (Timer<?>) other;
+
+      return Objects.equals(this.getUserKey(), that.getUserKey())
+          && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+          && (this.getClearBit() == that.getClearBit())
+          && this.getFireTimestamp().equals(that.getFireTimestamp())
+          && this.getHoldTimestamp().equals(that.getHoldTimestamp())
+          && Objects.equals(this.getWindows(), that.getWindows())
+          && Objects.equals(this.getPane(), that.getPane());
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    // Hash only the millis of the timestamp to be consistent with equals
+    return Objects.hash(
+        getUserKey(),
+        getDynamicTimerTag(),
+        getClearBit(),
+        getFireTimestamp().getMillis(),
+        getHoldTimestamp().getMillis(),
+        getWindows(),
+        getPane());
+  }
 
   /**
    * A {@link org.apache.beam.sdk.coders.Coder} for timers.
    *
-   * <p>This coder is deterministic if the payload coder is deterministic.
+   * <p>This coder is deterministic if both the key coder and window coder are 
deterministic.
    *
-   * <p>This coder is inexpensive for size estimation of elements if the 
payload coder is
-   * inexpensive for size estimation.
+   * <p>This coder is inexpensive for size estimation of elements if the key 
coder is inexpensive
+   * for size estimation.
    */
   public static class Coder<T> extends StructuredCoder<Timer<T>> {
 
-    public static <T> Coder of(org.apache.beam.sdk.coders.Coder<T> 
payloadCoder) {
-      return new Coder(payloadCoder);
+    public static <T> Coder<T> of(
+        org.apache.beam.sdk.coders.Coder<T> keyCoder,
+        org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) 
{
+      return new Coder<>(keyCoder, windowCoder);
     }
 
-    private final org.apache.beam.sdk.coders.Coder<T> payloadCoder;
-
-    private Coder(org.apache.beam.sdk.coders.Coder<T> payloadCoder) {
-      this.payloadCoder = payloadCoder;
+    private final org.apache.beam.sdk.coders.Coder<T> keyCoder;
+    private final org.apache.beam.sdk.coders.Coder<Collection<? extends 
BoundedWindow>>
+        windowsCoder;
+    private final org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> 
windowCoder;
+
+    private Coder(
+        org.apache.beam.sdk.coders.Coder<T> keyCoder,
+        org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) 
{
+      this.windowCoder = windowCoder;
+      this.keyCoder = keyCoder;
+      this.windowsCoder = (org.apache.beam.sdk.coders.Coder) 
CollectionCoder.of(windowCoder);
     }
 
     @Override
     public void encode(Timer<T> timer, OutputStream outStream) throws 
CoderException, IOException {
-      InstantCoder.of().encode(timer.getTimestamp(), outStream);
-      payloadCoder.encode(timer.getPayload(), outStream);
+      keyCoder.encode(timer.getUserKey(), outStream);
+      StringUtf8Coder.of().encode(timer.getDynamicTimerTag(), outStream);
+      BooleanCoder.of().encode(timer.getClearBit(), outStream);
+      if (!timer.getClearBit()) {
+        InstantCoder.of().encode(timer.getFireTimestamp(), outStream);
+        InstantCoder.of().encode(timer.getHoldTimestamp(), outStream);
+        windowsCoder.encode(timer.getWindows(), outStream);
+        PaneInfoCoder.INSTANCE.encode(timer.getPane(), outStream);
+      }
 
 Review comment:
   This was missed during the doc review but the timer will always need to 
encode the windowing information otherwise we won't know what windows the timer 
is in to clear.
   
   We'll need to update the documentation for this as well.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to