ReduceFnTester assertion for windows that have data buffered
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/795760d3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/795760d3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/795760d3 Branch: refs/heads/gearpump-runner Commit: 795760d370bcbe28e1f0ca373ad4c8c841e6e6b5 Parents: 1c1f239 Author: Kenneth Knowles <[email protected]> Authored: Thu Jun 22 12:53:15 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jun 22 13:58:08 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/runners/core/SystemReduceFn.java | 6 ++++++ .../org/apache/beam/runners/core/ReduceFnTester.java | 13 +++++++++++++ 2 files changed, 19 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/795760d3/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java index c189b0d..3144bd6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.core; +import com.google.common.annotations.VisibleForTesting; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.state.BagState; import org.apache.beam.sdk.state.CombiningState; @@ -103,6 +104,11 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound this.bufferTag = bufferTag; } + @VisibleForTesting + StateTag<? extends GroupingState<InputT, OutputT>> getBufferTag() { + return bufferTag; + } + @Override public void processValue(ProcessValueContext c) throws Exception { c.state().access(bufferTag).add(c.value()); http://git-wip-us.apache.org/repos/asf/beam/blob/795760d3/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index ab9fd6e..1fe8f73 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -318,6 +318,19 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { } @SafeVarargs + public final void assertHasOnlyGlobalAndStateFor(W... expectedWindows) { + assertHasOnlyGlobalAndAllowedTags( + ImmutableSet.copyOf(expectedWindows), + ImmutableSet.<StateTag<?>>of( + ((SystemReduceFn<?, ?, ?, ?, ?>) reduceFn).getBufferTag(), + TriggerStateMachineRunner.FINISHED_BITS_TAG, + PaneInfoTracker.PANE_INFO_TAG, + WatermarkHold.watermarkHoldTagForTimestampCombiner( + objectStrategy.getTimestampCombiner()), + WatermarkHold.EXTRA_HOLD_TAG)); + } + + @SafeVarargs public final void assertHasOnlyGlobalAndFinishedSetsAndPaneInfoFor(W... expectedWindows) { assertHasOnlyGlobalAndAllowedTags( ImmutableSet.copyOf(expectedWindows),
