lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r402426932
########## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java ########## @@ -40,92 +50,197 @@ @AutoValue public abstract class Timer<T> { - /** Returns a timer for the given timestamp with a {@code null} payload. */ - public static Timer<Void> of(Instant time) { - return of(time, (Void) null); + /** + * Returns a non-cleared timer for the given userKey, dynamicTimerTag, fireTimestamp, + * holdTimestamp, windows and pane. + */ + public static <T> Timer<T> of( + T userKey, + String dynamicTimerTag, + Instant fireTimestamp, + Instant holdTimestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + return new AutoValue_Timer( + userKey, dynamicTimerTag, false, fireTimestamp, holdTimestamp, windows, pane); } - /** Returns a timer for the given timestamp with a user specified payload. */ - public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) { - return new AutoValue_Timer(timestamp, payload); + /** Returns a cleared timer for the given userKey, dynamicTimerTag. */ + public static <T> Timer<T> cleared(T userKey, String dynamicTimerTag) { + return new AutoValue_Timer(userKey, dynamicTimerTag, true, null, null, null, null); } + /** Returns the key that the timer is set on. */ + public abstract T getUserKey(); + + /** + * Returns the tag that the timer is set on. This tag is empty string only when the timer is set + * from TimerSpec. + */ + public abstract String getDynamicTimerTag(); + + /** Returns a boolean which indicate whether the timer is going to be cleared. */ + public abstract boolean getClearBit(); + /** - * Returns the timestamp of when the timer is scheduled to fire. + * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when + * the clearBit is true. * * <p>The time is relative to the time domain defined in the {@link * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this * timer. */ - public abstract Instant getTimestamp(); + @Nullable + public abstract Instant getFireTimestamp(); - /** A user supplied payload. */ + /** + * Returns the watermark that the timer is supposed to be held. This field is nullable only when + * the clearBit is true. + */ @Nullable - public abstract T getPayload(); + public abstract Instant getHoldTimestamp(); + + /** + * Returns the windows which is associated with the timer. This field is nullable only when the + * clearBit is true. + */ + @Nullable + public abstract Collection<? extends BoundedWindow> getWindows(); + + /** + * Returns the paneinfo that is related to the timer. This field is nullable only when the + * clearBit is true. + */ + @Nullable + public abstract PaneInfo getPane(); + + @Override + public boolean equals(Object other) { + if (!(other instanceof Timer)) { + return false; + } else { + Timer<?> that = (Timer<?>) other; + + return Objects.equals(this.getUserKey(), that.getUserKey()) + && this.getDynamicTimerTag().equals(that.getDynamicTimerTag()) + && (this.getClearBit() == that.getClearBit()) + && this.getFireTimestamp().equals(that.getFireTimestamp()) + && this.getHoldTimestamp().equals(that.getHoldTimestamp()) + && Objects.equals(this.getWindows(), that.getWindows()) + && Objects.equals(this.getPane(), that.getPane()); + } + } + + @Override + public int hashCode() { + // Hash only the millis of the timestamp to be consistent with equals + return Objects.hash( + getUserKey(), + getDynamicTimerTag(), + getClearBit(), + getFireTimestamp().getMillis(), + getHoldTimestamp().getMillis(), + getWindows(), + getPane()); + } /** * A {@link org.apache.beam.sdk.coders.Coder} for timers. * - * <p>This coder is deterministic if the payload coder is deterministic. + * <p>This coder is deterministic if both the key coder and window coder are deterministic. * - * <p>This coder is inexpensive for size estimation of elements if the payload coder is - * inexpensive for size estimation. + * <p>This coder is inexpensive for size estimation of elements if the key coder is inexpensive + * for size estimation. */ public static class Coder<T> extends StructuredCoder<Timer<T>> { - public static <T> Coder of(org.apache.beam.sdk.coders.Coder<T> payloadCoder) { - return new Coder(payloadCoder); + public static <T> Coder<T> of( + org.apache.beam.sdk.coders.Coder<T> keyCoder, + org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) { + return new Coder<>(keyCoder, windowCoder); } - private final org.apache.beam.sdk.coders.Coder<T> payloadCoder; - - private Coder(org.apache.beam.sdk.coders.Coder<T> payloadCoder) { - this.payloadCoder = payloadCoder; + private final org.apache.beam.sdk.coders.Coder<T> keyCoder; + private final org.apache.beam.sdk.coders.Coder<Collection<? extends BoundedWindow>> + windowsCoder; + private final org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder; + + private Coder( + org.apache.beam.sdk.coders.Coder<T> keyCoder, + org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) { + this.windowCoder = windowCoder; + this.keyCoder = keyCoder; + this.windowsCoder = (org.apache.beam.sdk.coders.Coder) CollectionCoder.of(windowCoder); } @Override public void encode(Timer<T> timer, OutputStream outStream) throws CoderException, IOException { - InstantCoder.of().encode(timer.getTimestamp(), outStream); - payloadCoder.encode(timer.getPayload(), outStream); + keyCoder.encode(timer.getUserKey(), outStream); + StringUtf8Coder.of().encode(timer.getDynamicTimerTag(), outStream); + BooleanCoder.of().encode(timer.getClearBit(), outStream); + if (!timer.getClearBit()) { + InstantCoder.of().encode(timer.getFireTimestamp(), outStream); + InstantCoder.of().encode(timer.getHoldTimestamp(), outStream); + windowsCoder.encode(timer.getWindows(), outStream); + PaneInfoCoder.INSTANCE.encode(timer.getPane(), outStream); + } Review comment: This was missed during the doc review but the timer will always need to encode the windowing information otherwise we won't know what windows the timer is in to clear. We'll need to update the documentation for this as well. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services