Repository: incubator-beam Updated Branches: refs/heads/master 41faee4f9 -> bf67d8edc
Remove TriggerBuilder backwards-compatibility adapter Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/244e8e85 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/244e8e85 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/244e8e85 Branch: refs/heads/master Commit: 244e8e85596c944b23e37e59a717be314c496f74 Parents: 82ae661 Author: Kenneth Knowles <[email protected]> Authored: Thu Jun 23 21:03:29 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jun 23 21:32:03 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/util/ReduceFnRunnerTest.java | 3 +- .../apache/beam/sdk/util/ReduceFnTester.java | 5 ++- .../flink/streaming/GroupAlsoByWindowTest.java | 2 +- .../transforms/windowing/AfterWatermark.java | 38 ++++---------------- .../beam/sdk/transforms/windowing/Trigger.java | 7 +--- .../transforms/windowing/TriggerBuilder.java | 29 --------------- .../beam/sdk/transforms/windowing/Window.java | 10 +++--- .../windowing/AfterProcessingTimeTest.java | 3 +- .../windowing/AfterWatermarkTest.java | 24 ++++++------- .../org/apache/beam/sdk/util/TriggerTester.java | 11 +++--- 10 files changed, 33 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/244e8e85/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java index 64fcae3..cd78107 100644 --- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java @@ -111,7 +111,6 @@ public class ReduceFnRunnerTest { MockitoAnnotations.initMocks(this); mockTrigger = mock(Trigger.class, withSettings().serializable()); - when(mockTrigger.buildTrigger()).thenReturn(mockTrigger); @SuppressWarnings("unchecked") PCollectionView<Integer> mockViewUnchecked = @@ -271,7 +270,7 @@ public class ReduceFnRunnerTest { ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining( windowFn, - AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()).buildTrigger(), + AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()), AccumulationMode.DISCARDING_FIRED_PANES, new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/244e8e85/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java index e897f54..fa62583 100644 --- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java @@ -39,7 +39,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.transforms.windowing.TriggerBuilder; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.TimerInternals.TimerData; @@ -131,12 +130,12 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { } public static <W extends BoundedWindow> ReduceFnTester<Integer, Iterable<Integer>, W> - nonCombining(WindowFn<?, W> windowFn, TriggerBuilder trigger, AccumulationMode mode, + nonCombining(WindowFn<?, W> windowFn, Trigger trigger, AccumulationMode mode, Duration allowedDataLateness, ClosingBehavior closingBehavior) throws Exception { WindowingStrategy<?, W> strategy = WindowingStrategy.of(windowFn) .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) - .withTrigger(trigger.buildTrigger()) + .withTrigger(trigger) .withMode(mode) .withAllowedLateness(allowedDataLateness) .withClosingBehavior(closingBehavior); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/244e8e85/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java index 207fb5a..2d83fb6 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java @@ -83,7 +83,7 @@ public class GroupAlsoByWindowTest extends StreamingMultipleProgramsTestBase { private final WindowingStrategy fixedWindowWithCompoundTriggerStrategy = fixedWindowingStrategy.withTrigger( AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(5)) - .withLateFirings(AfterPane.elementCountAtLeast(5)).buildTrigger()); + .withLateFirings(AfterPane.elementCountAtLeast(5))); /** * The default accumulation mode is http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/244e8e85/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java index 019a68d..0d2a878 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java @@ -78,31 +78,9 @@ public class AfterWatermark { } /** - * Interface for building an AfterWatermarkTrigger with early firings already filled in. + * @see AfterWatermark */ - public interface AfterWatermarkEarly extends TriggerBuilder { - /** - * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever - * the given {@code Trigger} fires after the watermark has passed the end of the window. - */ - TriggerBuilder withLateFirings(OnceTrigger lateTrigger); - } - - /** - * Interface for building an AfterWatermarkTrigger with late firings already filled in. - */ - public interface AfterWatermarkLate extends TriggerBuilder { - /** - * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever - * the given {@code Trigger} fires before the watermark has passed the end of the window. - */ - TriggerBuilder withEarlyFirings(OnceTrigger earlyTrigger); - } - - - private static class AfterWatermarkEarlyAndLate - extends Trigger - implements TriggerBuilder, AfterWatermarkEarly, AfterWatermarkLate { + public static class AfterWatermarkEarlyAndLate extends Trigger { private static final int EARLY_INDEX = 0; private static final int LATE_INDEX = 1; @@ -112,7 +90,7 @@ public class AfterWatermark { private final OnceTrigger lateTrigger; @SuppressWarnings("unchecked") - private AfterWatermarkEarlyAndLate(OnceTrigger earlyTrigger, OnceTrigger lateTrigger) { + public AfterWatermarkEarlyAndLate(OnceTrigger earlyTrigger, OnceTrigger lateTrigger) { super(lateTrigger == null ? ImmutableList.<Trigger>of(earlyTrigger) : ImmutableList.<Trigger>of(earlyTrigger, lateTrigger)); @@ -120,13 +98,11 @@ public class AfterWatermark { this.lateTrigger = lateTrigger; } - @Override - public TriggerBuilder withEarlyFirings(OnceTrigger earlyTrigger) { + public Trigger withEarlyFirings(OnceTrigger earlyTrigger) { return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger); } - @Override - public TriggerBuilder withLateFirings(OnceTrigger lateTrigger) { + public Trigger withLateFirings(OnceTrigger lateTrigger) { return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger); } @@ -301,7 +277,7 @@ public class AfterWatermark { * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever * the given {@code Trigger} fires before the watermark has passed the end of the window. */ - public AfterWatermarkEarly withEarlyFirings(OnceTrigger earlyFirings) { + public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTrigger earlyFirings) { checkNotNull(earlyFirings, "Must specify the trigger to use for early firings"); return new AfterWatermarkEarlyAndLate(earlyFirings, null); } @@ -310,7 +286,7 @@ public class AfterWatermark { * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever * the given {@code Trigger} fires after the watermark has passed the end of the window. */ - public AfterWatermarkLate withLateFirings(OnceTrigger lateFirings) { + public AfterWatermarkEarlyAndLate withLateFirings(OnceTrigger lateFirings) { checkNotNull(lateFirings, "Must specify the trigger to use for late firings"); return new AfterWatermarkEarlyAndLate(Never.ever(), lateFirings); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/244e8e85/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java index e97d3bd..86801e8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java @@ -93,7 +93,7 @@ import javax.annotation.Nullable; * state before the callback returns. */ @Experimental(Experimental.Kind.TRIGGER) -public abstract class Trigger implements Serializable, TriggerBuilder { +public abstract class Trigger implements Serializable { /** * Interface for accessing information about the trigger being executed and other triggers in the @@ -495,11 +495,6 @@ public abstract class Trigger implements Serializable, TriggerBuilder { return new OrFinallyTrigger(this, until); } - @Override - public Trigger buildTrigger() { - return this; - } - /** * {@link Trigger}s that are guaranteed to fire at most once should extend from this, rather * than the general {@link Trigger} class to indicate that behavior. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/244e8e85/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TriggerBuilder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TriggerBuilder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TriggerBuilder.java deleted file mode 100644 index e8bd52b..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TriggerBuilder.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -/** - * Anything that can be used to create an instance of a {@code Trigger} implements this interface. - * - * <p>This includes {@code Trigger}s (which can return themselves) and any "enhanced" syntax for - * constructing a trigger. - */ -public interface TriggerBuilder { - /** Return the {@code Trigger} built by this builder. */ - Trigger buildTrigger(); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/244e8e85/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index 324b4d5..86c87a3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -87,7 +87,7 @@ import javax.annotation.Nullable; * * <h2> Triggers </h2> * - * <p>{@link Window.Bound#triggering(TriggerBuilder)} allows specifying a trigger to control when + * <p>{@link Window.Bound#triggering(Trigger)} allows specifying a trigger to control when * (in processing time) results for the given window can be produced. If unspecified, the default * behavior is to trigger first when the watermark passes the end of the window, and then trigger * again every time there is late arriving data. @@ -195,7 +195,7 @@ public class Window { * mode using either {@link #discardingFiredPanes()} or {@link #accumulatingFiredPanes()}. */ @Experimental(Kind.TRIGGER) - public static <T> Bound<T> triggering(TriggerBuilder trigger) { + public static <T> Bound<T> triggering(Trigger trigger) { return new Unbound().triggering(trigger); } @@ -290,7 +290,7 @@ public class Window { * mode using either {@link #discardingFiredPanes()} or {@link #accumulatingFiredPanes()}. */ @Experimental(Kind.TRIGGER) - public <T> Bound<T> triggering(TriggerBuilder trigger) { + public <T> Bound<T> triggering(Trigger trigger) { return new Bound<T>(name).triggering(trigger); } @@ -433,11 +433,11 @@ public class Window { * mode using either {@link #discardingFiredPanes()} or {@link #accumulatingFiredPanes()}. */ @Experimental(Kind.TRIGGER) - public Bound<T> triggering(TriggerBuilder trigger) { + public Bound<T> triggering(Trigger trigger) { return new Bound<T>( name, windowFn, - trigger.buildTrigger(), + trigger, mode, allowedLateness, closingBehavior, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/244e8e85/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java index 8d2b4a1..ea9c2b0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java @@ -176,8 +176,7 @@ public class AfterProcessingTimeTest { Trigger trigger = AfterWatermark.pastEndOfWindow() .withLateFirings(AfterProcessingTime .pastFirstElementInPane() - .plusDelayOf(Duration.standardMinutes(10))) - .buildTrigger(); + .plusDelayOf(Duration.standardMinutes(10))); String expected = "AfterWatermark.pastEndOfWindow()" + ".withLateFirings(AfterProcessingTime" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/244e8e85/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java index d692cbf..418f746 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java @@ -346,28 +346,24 @@ public class AfterWatermarkTest { @Test public void testEarlyFiringsToString() { - Trigger trigger = AfterWatermark.pastEndOfWindow() - .withEarlyFirings(StubTrigger.named("t1")) - .buildTrigger(); + Trigger trigger = AfterWatermark.pastEndOfWindow().withEarlyFirings(StubTrigger.named("t1")); assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1)", trigger.toString()); } @Test public void testLateFiringsToString() { - Trigger trigger = AfterWatermark.pastEndOfWindow() - .withLateFirings(StubTrigger.named("t1")) - .buildTrigger(); + Trigger trigger = AfterWatermark.pastEndOfWindow().withLateFirings(StubTrigger.named("t1")); assertEquals("AfterWatermark.pastEndOfWindow().withLateFirings(t1)", trigger.toString()); } @Test public void testEarlyAndLateFiringsToString() { - Trigger trigger = AfterWatermark.pastEndOfWindow() - .withEarlyFirings(StubTrigger.named("t1")) - .withLateFirings(StubTrigger.named("t2")) - .buildTrigger(); + Trigger trigger = + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(StubTrigger.named("t1")) + .withLateFirings(StubTrigger.named("t2")); assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1).withLateFirings(t2)", trigger.toString()); @@ -375,10 +371,10 @@ public class AfterWatermarkTest { @Test public void testToStringExcludesNeverTrigger() { - Trigger trigger = AfterWatermark.pastEndOfWindow() - .withEarlyFirings(Never.ever()) - .withLateFirings(Never.ever()) - .buildTrigger(); + Trigger trigger = + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(Never.ever()) + .withLateFirings(Never.ever()); assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/244e8e85/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java index c495712..ba42c37 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java @@ -26,7 +26,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.transforms.windowing.TriggerBuilder; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.ActiveWindowSet.MergeCallback; import org.apache.beam.sdk.util.TimerInternals.TimerData; @@ -108,7 +107,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> { private final Map<W, W> windowToMergeResult; /** - * An {@link ExecutableTrigger} built from the {@link Trigger} or {@link TriggerBuilder} + * An {@link ExecutableTrigger} built from the {@link Trigger} or {@link Trigger} * under test. */ private final ExecutableTrigger executableTrigger; @@ -119,10 +118,10 @@ public class TriggerTester<InputT, W extends BoundedWindow> { private final Map<W, FinishedTriggers> finishedSets; public static <W extends BoundedWindow> SimpleTriggerTester<W> forTrigger( - TriggerBuilder trigger, WindowFn<Object, W> windowFn) + Trigger trigger, WindowFn<Object, W> windowFn) throws Exception { WindowingStrategy<Object, W> windowingStrategy = - WindowingStrategy.of(windowFn).withTrigger(trigger.buildTrigger()) + WindowingStrategy.of(windowFn).withTrigger(trigger) // Merging requires accumulation mode or early firings can break up a session. // Not currently an issue with the tester (because we never GC) but we don't want // mystery failures due to violating this need. @@ -134,9 +133,9 @@ public class TriggerTester<InputT, W extends BoundedWindow> { } public static <InputT, W extends BoundedWindow> TriggerTester<InputT, W> forAdvancedTrigger( - TriggerBuilder trigger, WindowFn<Object, W> windowFn) throws Exception { + Trigger trigger, WindowFn<Object, W> windowFn) throws Exception { WindowingStrategy<Object, W> strategy = - WindowingStrategy.of(windowFn).withTrigger(trigger.buildTrigger()) + WindowingStrategy.of(windowFn).withTrigger(trigger) // Merging requires accumulation mode or early firings can break up a session. // Not currently an issue with the tester (because we never GC) but we don't want // mystery failures due to violating this need.
