WindowingStrategy: add OnTimeBehavior to control whether to emit empty ON_TIME pane.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/38dd12df Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/38dd12df Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/38dd12df Branch: refs/heads/gearpump-runner Commit: 38dd12df6dee2ada31ad9c52f8d9dc99225f1bc2 Parents: b1ece01 Author: Pei He <[email protected]> Authored: Tue Jun 20 16:09:26 2017 -0700 Committer: Pei He <[email protected]> Committed: Thu Jun 29 14:01:54 2017 +0800 ---------------------------------------------------------------------- .../WindowingStrategyTranslation.java | 26 ++- .../beam/runners/core/ReduceFnRunner.java | 6 +- .../beam/runners/core/ReduceFnRunnerTest.java | 161 +++++++++++++++++++ .../src/main/proto/beam_runner_api.proto | 14 ++ .../beam/sdk/transforms/windowing/Window.java | 32 ++++ .../beam/sdk/values/WindowingStrategy.java | 46 ++++-- 6 files changed, 273 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/38dd12df/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java index 718efe7..88ebc01 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; +import org.apache.beam.sdk.transforms.windowing.Window.OnTimeBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.WindowingStrategy; @@ -119,6 +120,27 @@ public class WindowingStrategyTranslation implements Serializable { } } + + public static OnTimeBehavior fromProto(RunnerApi.OnTimeBehavior proto) { + switch (proto) { + case FIRE_ALWAYS: + return OnTimeBehavior.FIRE_ALWAYS; + case FIRE_IF_NONEMPTY: + return OnTimeBehavior.FIRE_IF_NON_EMPTY; + case UNRECOGNIZED: + default: + // Whether or not it is proto that cannot recognize it (due to the version of the + // generated code we link to) or the switch hasn't been updated to handle it, + // the situation is the same: we don't know what this OutputTime means + throw new IllegalArgumentException( + String.format( + "Cannot convert unknown %s to %s: %s", + RunnerApi.OnTimeBehavior.class.getCanonicalName(), + OnTimeBehavior.class.getCanonicalName(), + proto)); + } + } + public static RunnerApi.OutputTime toProto(TimestampCombiner timestampCombiner) { switch(timestampCombiner) { case EARLIEST: @@ -323,13 +345,15 @@ public class WindowingStrategyTranslation implements Serializable { Trigger trigger = TriggerTranslation.fromProto(proto.getTrigger()); ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior()); Duration allowedLateness = Duration.millis(proto.getAllowedLateness()); + OnTimeBehavior onTimeBehavior = fromProto(proto.getOnTimeBehavior()); return WindowingStrategy.of(windowFn) .withAllowedLateness(allowedLateness) .withMode(accumulationMode) .withTrigger(trigger) .withTimestampCombiner(timestampCombiner) - .withClosingBehavior(closingBehavior); + .withClosingBehavior(closingBehavior) + .withOnTimeBehavior(onTimeBehavior); } public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec) http://git-wip-us.apache.org/repos/asf/beam/blob/38dd12df/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 75b6acd..a33bac1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowTracing; @@ -920,8 +921,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { // The pane has elements. return true; } - if (timing == Timing.ON_TIME) { - // This is the unique ON_TIME pane. + if (timing == Timing.ON_TIME + && windowingStrategy.getOnTimeBehavior() == Window.OnTimeBehavior.FIRE_ALWAYS) { + // This is an empty ON_TIME pane. return true; } if (isFinished && windowingStrategy.getClosingBehavior() == ClosingBehavior.FIRE_ALWAYS) { http://git-wip-us.apache.org/repos/asf/beam/blob/38dd12df/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 4f68038..3a2c220 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -67,6 +67,7 @@ import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; @@ -1423,6 +1424,166 @@ public class ReduceFnRunnerTest { } /** + * Test that it won't fire an empty on-time pane when OnTimeBehavior is FIRE_IF_NON_EMPTY. + */ + @Test + public void testEmptyOnTimeWithOnTimeBehaviorFireIfNonEmpty() throws Exception { + + WindowingStrategy<?, IntervalWindow> strategy = + WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) + .withTimestampCombiner(TimestampCombiner.EARLIEST) + .withTrigger( + AfterEach.<IntervalWindow>inOrder( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(new Duration(5))) + .orFinally(AfterWatermark.pastEndOfWindow()), + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(new Duration(25))))) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.millis(100)) + .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS) + .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY); + + ReduceFnTester<Integer, Integer, IntervalWindow> tester = + ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); + + tester.advanceInputWatermark(new Instant(0)); + tester.advanceProcessingTime(new Instant(0)); + + // Processing time timer for 5 + tester.injectElements( + TimestampedValue.of(1, new Instant(1)), + TimestampedValue.of(1, new Instant(3)), + TimestampedValue.of(1, new Instant(7)), + TimestampedValue.of(1, new Instant(5))); + + // Should fire early pane + tester.advanceProcessingTime(new Instant(6)); + + // Should not fire empty on time pane + tester.advanceInputWatermark(new Instant(11)); + + // Should fire final GC pane + tester.advanceInputWatermark(new Instant(10 + 100)); + List<WindowedValue<Integer>> output = tester.extractOutput(); + assertEquals(2, output.size()); + + assertThat(output.get(0), WindowMatchers.isSingleWindowedValue(4, 1, 0, 10)); + assertThat(output.get(1), WindowMatchers.isSingleWindowedValue(4, 9, 0, 10)); + + assertThat( + output.get(0), + WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1))); + assertThat( + output.get(1), + WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 1, 0))); + } + + /** + * Test that it fires an empty on-time isFinished pane when OnTimeBehavior is FIRE_ALWAYS + * and ClosingBehavior is FIRE_IF_NON_EMPTY. + * + * <p>This is a test just for backward compatibility. + */ + @Test + public void testEmptyOnTimeWithOnTimeBehaviorBackwardCompatibility() throws Exception { + + WindowingStrategy<?, IntervalWindow> strategy = + WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) + .withTimestampCombiner(TimestampCombiner.EARLIEST) + .withTrigger(AfterWatermark.pastEndOfWindow() + .withEarlyFirings(AfterPane.elementCountAtLeast(1))) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.millis(0)) + .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY); + + ReduceFnTester<Integer, Integer, IntervalWindow> tester = + ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); + + tester.advanceInputWatermark(new Instant(0)); + tester.advanceProcessingTime(new Instant(0)); + + tester.injectElements( + TimestampedValue.of(1, new Instant(1))); + + // Should fire empty on time isFinished pane + tester.advanceInputWatermark(new Instant(11)); + + List<WindowedValue<Integer>> output = tester.extractOutput(); + assertEquals(2, output.size()); + + assertThat( + output.get(0), + WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1))); + assertThat( + output.get(1), + WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.ON_TIME, 1, 0))); + } + + /** + * Test that it won't fire an empty on-time pane when OnTimeBehavior is FIRE_IF_NON_EMPTY + * and when receiving late data. + */ + @Test + public void testEmptyOnTimeWithOnTimeBehaviorFireIfNonEmptyAndLateData() throws Exception { + + WindowingStrategy<?, IntervalWindow> strategy = + WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) + .withTimestampCombiner(TimestampCombiner.EARLIEST) + .withTrigger( + AfterEach.<IntervalWindow>inOrder( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(new Duration(5))) + .orFinally(AfterWatermark.pastEndOfWindow()), + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(new Duration(25))))) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.millis(100)) + .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY); + + ReduceFnTester<Integer, Integer, IntervalWindow> tester = + ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); + + tester.advanceInputWatermark(new Instant(0)); + tester.advanceProcessingTime(new Instant(0)); + + // Processing time timer for 5 + tester.injectElements( + TimestampedValue.of(1, new Instant(1)), + TimestampedValue.of(1, new Instant(3)), + TimestampedValue.of(1, new Instant(7)), + TimestampedValue.of(1, new Instant(5))); + + // Should fire early pane + tester.advanceProcessingTime(new Instant(6)); + + // Should not fire empty on time pane + tester.advanceInputWatermark(new Instant(11)); + + // Processing late data, and should fire late pane + tester.injectElements( + TimestampedValue.of(1, new Instant(9))); + tester.advanceProcessingTime(new Instant(6 + 25 + 1)); + + List<WindowedValue<Integer>> output = tester.extractOutput(); + assertEquals(2, output.size()); + + assertThat(output.get(0), WindowMatchers.isSingleWindowedValue(4, 1, 0, 10)); + assertThat(output.get(1), WindowMatchers.isSingleWindowedValue(5, 9, 0, 10)); + + assertThat( + output.get(0), + WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1))); + assertThat( + output.get(1), + WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.LATE, 1, 0))); + } + + /** * Tests for processing time firings after the watermark passes the end of the window. * Specifically, verify the proper triggerings and pane-info of a typical speculative/on-time/late * when the on-time pane is non-empty. http://git-wip-us.apache.org/repos/asf/beam/blob/38dd12df/sdks/common/runner-api/src/main/proto/beam_runner_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto index 039ecb0..24e907a 100644 --- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -433,6 +433,9 @@ message WindowingStrategy { // (Required) The duration, in milliseconds, beyond the end of a window at // which the window becomes droppable. int64 allowed_lateness = 8; + + // (Required) Indicate whether empty on-time panes should be omitted. + OnTimeBehavior OnTimeBehavior = 9; } // Whether or not a PCollection's WindowFn is non-merging, merging, or @@ -478,6 +481,17 @@ enum ClosingBehavior { EMIT_IF_NONEMPTY = 1; } +// Controls whether or not an aggregating transform should output data +// when an on-time pane is empty. +enum OnTimeBehavior { + // Always fire the on-time pane. Even if there is no new data since + // the previous firing, an element will be produced. + FIRE_ALWAYS = 0; + + // Only fire the on-time pane if there is new data since the previous firing. + FIRE_IF_NONEMPTY = 1; +} + // When a number of windowed, timestamped inputs are aggregated, the timestamp // for the resulting output. enum OutputTime { http://git-wip-us.apache.org/repos/asf/beam/blob/38dd12df/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index 105ebfb..a12be6d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -163,6 +163,24 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T } /** + * Specifies the conditions under which an on-time pane will be created when a window is closed. + */ + public enum OnTimeBehavior { + /** + * Always fire the on-time pane. Even if there is no new data since the previous firing, + * an element will be produced. + * + * <p>This is the default behavior. + */ + FIRE_ALWAYS, + /** + * Only fire the on-time pane if there is new data since the previous firing. + */ + FIRE_IF_NON_EMPTY + + } + + /** * Creates a {@code Window} {@code PTransform} that uses the given * {@link WindowFn} to window the data. * @@ -195,6 +213,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T @Nullable abstract AccumulationMode getAccumulationMode(); @Nullable abstract Duration getAllowedLateness(); @Nullable abstract ClosingBehavior getClosingBehavior(); + @Nullable abstract OnTimeBehavior getOnTimeBehavior(); @Nullable abstract TimestampCombiner getTimestampCombiner(); abstract Builder<T> toBuilder(); @@ -206,6 +225,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T abstract Builder<T> setAccumulationMode(AccumulationMode mode); abstract Builder<T> setAllowedLateness(Duration allowedLateness); abstract Builder<T> setClosingBehavior(ClosingBehavior closingBehavior); + abstract Builder<T> setOnTimeBehavior(OnTimeBehavior onTimeBehavior); abstract Builder<T> setTimestampCombiner(TimestampCombiner timestampCombiner); abstract Window<T> build(); @@ -299,6 +319,15 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T } /** + * <b><i>(Experimental)</i></b> Override the default {@link OnTimeBehavior}, to control + * whether to output an empty on-time pane. + */ + @Experimental(Kind.TRIGGER) + public Window<T> withOnTimeBehavior(OnTimeBehavior behavior) { + return toBuilder().setOnTimeBehavior(behavior).build(); + } + + /** * Get the output strategy of this {@link Window Window PTransform}. For internal use * only. */ @@ -321,6 +350,9 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T if (getClosingBehavior() != null) { result = result.withClosingBehavior(getClosingBehavior()); } + if (getOnTimeBehavior() != null) { + result = result.withOnTimeBehavior(getOnTimeBehavior()); + } if (getTimestampCombiner() != null) { result = result.withTimestampCombiner(getTimestampCombiner()); } http://git-wip-us.apache.org/repos/asf/beam/blob/38dd12df/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java index 8a773e2..3b74e69 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; +import org.apache.beam.sdk.transforms.windowing.Window.OnTimeBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.joda.time.Duration; @@ -59,6 +60,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab private final AccumulationMode mode; private final Duration allowedLateness; private final ClosingBehavior closingBehavior; + private final OnTimeBehavior onTimeBehavior; private final TimestampCombiner timestampCombiner; private final boolean triggerSpecified; private final boolean modeSpecified; @@ -71,7 +73,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab AccumulationMode mode, boolean modeSpecified, Duration allowedLateness, boolean allowedLatenessSpecified, TimestampCombiner timestampCombiner, boolean timestampCombinerSpecified, - ClosingBehavior closingBehavior) { + ClosingBehavior closingBehavior, + OnTimeBehavior onTimeBehavior) { this.windowFn = windowFn; this.trigger = trigger; this.triggerSpecified = triggerSpecified; @@ -80,6 +83,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab this.allowedLateness = allowedLateness; this.allowedLatenessSpecified = allowedLatenessSpecified; this.closingBehavior = closingBehavior; + this.onTimeBehavior = onTimeBehavior; this.timestampCombiner = timestampCombiner; this.timestampCombinerSpecified = timestampCombinerSpecified; } @@ -98,7 +102,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab AccumulationMode.DISCARDING_FIRED_PANES, false, DEFAULT_ALLOWED_LATENESS, false, TimestampCombiner.END_OF_WINDOW, false, - ClosingBehavior.FIRE_IF_NON_EMPTY); + ClosingBehavior.FIRE_IF_NON_EMPTY, + OnTimeBehavior.FIRE_ALWAYS); } public WindowFn<T, W> getWindowFn() { @@ -133,6 +138,10 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab return closingBehavior; } + public OnTimeBehavior getOnTimeBehavior() { + return onTimeBehavior; + } + public TimestampCombiner getTimestampCombiner() { return timestampCombiner; } @@ -152,7 +161,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab mode, modeSpecified, allowedLateness, allowedLatenessSpecified, timestampCombiner, timestampCombinerSpecified, - closingBehavior); + closingBehavior, + onTimeBehavior); } /** @@ -166,7 +176,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab mode, true, allowedLateness, allowedLatenessSpecified, timestampCombiner, timestampCombinerSpecified, - closingBehavior); + closingBehavior, + onTimeBehavior); } /** @@ -183,7 +194,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab mode, modeSpecified, allowedLateness, allowedLatenessSpecified, timestampCombiner, timestampCombinerSpecified, - closingBehavior); + closingBehavior, + onTimeBehavior); } /** @@ -197,7 +209,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab mode, modeSpecified, allowedLateness, true, timestampCombiner, timestampCombinerSpecified, - closingBehavior); + closingBehavior, + onTimeBehavior); } public WindowingStrategy<T, W> withClosingBehavior(ClosingBehavior closingBehavior) { @@ -207,7 +220,19 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab mode, modeSpecified, allowedLateness, allowedLatenessSpecified, timestampCombiner, timestampCombinerSpecified, - closingBehavior); + closingBehavior, + onTimeBehavior); + } + + public WindowingStrategy<T, W> withOnTimeBehavior(OnTimeBehavior onTimeBehavior) { + return new WindowingStrategy<T, W>( + windowFn, + trigger, triggerSpecified, + mode, modeSpecified, + allowedLateness, allowedLatenessSpecified, + timestampCombiner, timestampCombinerSpecified, + closingBehavior, + onTimeBehavior); } @Experimental(Experimental.Kind.OUTPUT_TIME) @@ -219,7 +244,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab mode, modeSpecified, allowedLateness, allowedLatenessSpecified, timestampCombiner, true, - closingBehavior); + closingBehavior, + onTimeBehavior); } @Override @@ -246,6 +272,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab && getMode().equals(other.getMode()) && getAllowedLateness().equals(other.getAllowedLateness()) && getClosingBehavior().equals(other.getClosingBehavior()) + && getOnTimeBehavior().equals(other.getOnTimeBehavior()) && getTrigger().equals(other.getTrigger()) && getTimestampCombiner().equals(other.getTimestampCombiner()) && getWindowFn().equals(other.getWindowFn()); @@ -278,6 +305,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab mode, true, allowedLateness, true, timestampCombiner, true, - closingBehavior); + closingBehavior, + onTimeBehavior); } }
