Add test reproducing BEAM-2505, ignored

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fda589c0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fda589c0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fda589c0

Branch: refs/heads/gearpump-runner
Commit: fda589c00c8920e76cfc9aaa87cecfa94077599d
Parents: 50c43d9
Author: Kenneth Knowles <[email protected]>
Authored: Thu Jun 22 13:04:23 2017 -0700
Committer: Kenneth Knowles <[email protected]>
Committed: Thu Jun 22 13:58:08 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/ReduceFnRunnerTest.java   | 31 ++++++++++++++++++++
 1 file changed, 31 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fda589c0/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 2b66162..fa5ba8b 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -78,6 +78,7 @@ import 
org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -349,6 +350,36 @@ public class ReduceFnRunnerTest {
     assertThat(tester.extractOutput(), contains(isWindowedValue(equalTo(55))));
   }
 
+  /**
+   * Tests that if end-of-window and GC timers come in together, that the pane 
is correctly
+   * marked as final.
+   */
+  @Test
+  @Ignore("https://issues.apache.org/jira/browse/BEAM-2505";)
+  public void testCombiningAccumulatingEventTime() throws Exception {
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) 
FixedWindows.of(Duration.millis(100)))
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.millis(1))
+            .withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow()));
+
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+    injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
+    injectElement(tester, 5);
+
+    tester.advanceInputWatermark(new Instant(1000));
+
+    assertThat(
+        tester.extractOutput(),
+        contains(
+            isSingleWindowedValue(
+                equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, 
Timing.ON_TIME, 0, 0))));
+  }
+
+
   @Test
   public void testOnElementCombiningAccumulating() throws Exception {
     // Test basic execution of a trigger using a non-combining window set and 
accumulating mode.

Reply via email to