lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r402689183
########## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java ########## @@ -40,92 +52,201 @@ @AutoValue public abstract class Timer<T> { - /** Returns a timer for the given timestamp with a {@code null} payload. */ - public static Timer<Void> of(Instant time) { - return of(time, (Void) null); + /** + * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code + * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}. + */ + public static <T> Timer<T> of( + T userKey, + String dynamicTimerTag, + Collection<? extends BoundedWindow> windows, + Instant fireTimestamp, + Instant holdTimestamp, + PaneInfo pane) { + return new AutoValue_Timer( + userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane); } - /** Returns a timer for the given timestamp with a user specified payload. */ - public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) { - return new AutoValue_Timer(timestamp, payload); + /** Returns a cleared timer for the given userKey, dynamicTimerTag and windows. */ + public static <T> Timer<T> cleared( + T userKey, String dynamicTimerTag, Collection<? extends BoundedWindow> windows) { + return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null); } + /** Returns the key that the timer is set on. */ + public abstract T getUserKey(); + + /** + * Returns the tag that the timer is set on. This tag is {@code ""} when the timer is for a {@link + * TimerSpec}. + */ + public abstract String getDynamicTimerTag(); + + /** Returns the windows which is associated with the timer. */ + public abstract Collection<? extends BoundedWindow> getWindows(); + + /** Returns whether the timer is going to be cleared. */ + public abstract boolean getClearBit(); + /** - * Returns the timestamp of when the timer is scheduled to fire. + * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when + * the timer is being cleared. * - * <p>The time is relative to the time domain defined in the {@link + * <p>The time is absolute to the time domain defined in the {@link * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this * timer. */ - public abstract Instant getTimestamp(); + @Nullable + public abstract Instant getFireTimestamp(); - /** A user supplied payload. */ + /** + * Returns the watermark that the timer is supposed to be held. This field is nullable only when + * the timer is being cleared. + */ @Nullable - public abstract T getPayload(); + public abstract Instant getHoldTimestamp(); + + /** + * Returns the paneinfo that is related to the timer. This field is nullable only when the + * timer is being cleared. + */ + @Nullable + public abstract PaneInfo getPane(); + + @Override + public boolean equals(Object other) { + if (!(other instanceof Timer)) { + return false; + } + Timer<?> that = (Timer<?>) other; + if (this.getClearBit()) { + return Objects.equals(this.getUserKey(), that.getUserKey()) + && this.getDynamicTimerTag().equals(that.getDynamicTimerTag()) + && (this.getClearBit() == that.getClearBit()) + && Objects.equals(this.getWindows(), that.getWindows()); + } + return Objects.equals(this.getUserKey(), that.getUserKey()) + && this.getDynamicTimerTag().equals(that.getDynamicTimerTag()) + && (this.getClearBit() == that.getClearBit()) + && this.getFireTimestamp().equals(that.getFireTimestamp()) + && this.getHoldTimestamp().equals(that.getHoldTimestamp()) + && Objects.equals(this.getWindows(), that.getWindows()) + && Objects.equals(this.getPane(), that.getPane()); + } + + @Override + public int hashCode() { + // Hash only the millis of the timestamp to be consistent with equals + if (getClearBit()) { + return Objects.hash(getUserKey(), getDynamicTimerTag(), getClearBit(), getWindows()); + } + return Objects.hash( + getUserKey(), + getDynamicTimerTag(), + getClearBit(), + getFireTimestamp().getMillis(), + getHoldTimestamp().getMillis(), + getWindows(), + getPane()); + } /** * A {@link org.apache.beam.sdk.coders.Coder} for timers. * - * <p>This coder is deterministic if the payload coder is deterministic. + * <p>This coder is deterministic if both the key coder and window coder are deterministic. * - * <p>This coder is inexpensive for size estimation of elements if the payload coder is - * inexpensive for size estimation. + * <p>This coder is inexpensive for size estimation of elements if the key coder is inexpensive + * for size estimation. Review comment: ```suggestion * <p>This coder is inexpensive for size estimation of elements if the key coder and window coder are inexpensive * for size estimation. ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services