Add test reproducing BEAM-2505, ignored
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fda589c0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fda589c0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fda589c0 Branch: refs/heads/gearpump-runner Commit: fda589c00c8920e76cfc9aaa87cecfa94077599d Parents: 50c43d9 Author: Kenneth Knowles <[email protected]> Authored: Thu Jun 22 13:04:23 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jun 22 13:58:08 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/ReduceFnRunnerTest.java | 31 ++++++++++++++++++++ 1 file changed, 31 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/fda589c0/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 2b66162..fa5ba8b 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -78,6 +78,7 @@ import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -349,6 +350,36 @@ public class ReduceFnRunnerTest { assertThat(tester.extractOutput(), contains(isWindowedValue(equalTo(55)))); } + /** + * Tests that if end-of-window and GC timers come in together, that the pane is correctly + * marked as final. + */ + @Test + @Ignore("https://issues.apache.org/jira/browse/BEAM-2505") + public void testCombiningAccumulatingEventTime() throws Exception { + WindowingStrategy<?, IntervalWindow> strategy = + WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100))) + .withTimestampCombiner(TimestampCombiner.EARLIEST) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.millis(1)) + .withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow())); + + ReduceFnTester<Integer, Integer, IntervalWindow> tester = + ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); + + injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100 + injectElement(tester, 5); + + tester.advanceInputWatermark(new Instant(1000)); + + assertThat( + tester.extractOutput(), + contains( + isSingleWindowedValue( + equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)))); + } + + @Test public void testOnElementCombiningAccumulating() throws Exception { // Test basic execution of a trigger using a non-combining window set and accumulating mode.
