Repository: incubator-beam Updated Branches: refs/heads/master bf67d8edc -> bd21ead63
Use only WindowFn in TriggerTester This change is preparatory for separating trigger syntax from implementation. Previously, the whole WindowingStrategy was passed in, but not used. Since the tester is really a test of the state machine, it will be moved to runners-core alongside the trigger implementation. The requirement to provide a WindowingStrategy with the original syntax is extraneous. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2cdc2be4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2cdc2be4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2cdc2be4 Branch: refs/heads/master Commit: 2cdc2be45cd7722232b56e8fdef5670b33c337d8 Parents: 82ae661 Author: Kenneth Knowles <[email protected]> Authored: Thu Jun 23 21:24:22 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jun 23 21:30:08 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/util/ReduceFnRunner.java | 3 ++- .../org/apache/beam/sdk/util/TriggerContextFactory.java | 11 ++++++----- .../java/org/apache/beam/sdk/util/TriggerTester.java | 2 +- 3 files changed, 9 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2cdc2be4/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java index 864e8e7..2efc859 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java @@ -236,7 +236,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { this.triggerRunner = new TriggerRunner<>( windowingStrategy.getTrigger(), - new TriggerContextFactory<>(windowingStrategy, stateInternals, activeWindows)); + new TriggerContextFactory<>( + windowingStrategy.getWindowFn(), stateInternals, activeWindows)); } private ActiveWindowSet<W> createActiveWindowSet() { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2cdc2be4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java index 4855654..f7635d3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java @@ -22,6 +22,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Trigger.MergingTriggerInfo; import org.apache.beam.sdk.transforms.windowing.Trigger.TriggerInfo; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.state.MergingStateAccessor; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateAccessor; @@ -50,19 +51,19 @@ import javax.annotation.Nullable; */ public class TriggerContextFactory<W extends BoundedWindow> { - private final WindowingStrategy<?, W> windowingStrategy; + private final WindowFn<?, W> windowFn; private StateInternals<?> stateInternals; // Future triggers may be able to exploit the active window to state address window mapping. @SuppressWarnings("unused") private ActiveWindowSet<W> activeWindows; private final Coder<W> windowCoder; - public TriggerContextFactory(WindowingStrategy<?, W> windowingStrategy, + public TriggerContextFactory(WindowFn<?, W> windowFn, StateInternals<?> stateInternals, ActiveWindowSet<W> activeWindows) { - this.windowingStrategy = windowingStrategy; + this.windowFn = windowFn; this.stateInternals = stateInternals; this.activeWindows = activeWindows; - this.windowCoder = windowingStrategy.getWindowFn().windowCoder(); + this.windowCoder = windowFn.windowCoder(); } public Trigger.TriggerContext base(W window, Timers timers, @@ -106,7 +107,7 @@ public class TriggerContextFactory<W extends BoundedWindow> { @Override public boolean isMerging() { - return !windowingStrategy.getWindowFn().isNonMerging(); + return !windowFn.isNonMerging(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2cdc2be4/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java index c495712..5af9ae9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java @@ -160,7 +160,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> { this.windowToMergeResult = new HashMap<>(); this.contextFactory = - new TriggerContextFactory<>(windowingStrategy, stateInternals, activeWindows); + new TriggerContextFactory<>(windowingStrategy.getWindowFn(), stateInternals, activeWindows); } /**
