http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java deleted file mode 100644 index fb2b4d5..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link Never}. - */ -@RunWith(JUnit4.class) -public class NeverTest { - private SimpleTriggerTester<IntervalWindow> triggerTester; - - @Before - public void setup() throws Exception { - triggerTester = - TriggerTester.forTrigger( - Never.ever(), FixedWindows.of(Duration.standardMinutes(5))); - } - - @Test - public void falseAfterEndOfWindow() throws Exception { - triggerTester.injectElements(TimestampedValue.of(1, new Instant(1))); - IntervalWindow window = - new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardMinutes(5))); - assertThat(triggerTester.shouldFire(window), is(false)); - triggerTester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); - assertThat(triggerTester.shouldFire(window), is(false)); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java deleted file mode 100644 index 7289d97..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link OrFinallyTrigger}. - */ -@RunWith(JUnit4.class) -public class OrFinallyTriggerTest { - - private SimpleTriggerTester<IntervalWindow> tester; - - /** - * Tests that for {@code OrFinally(actual, ...)} when {@code actual} - * fires and finishes, the {@code OrFinally} also fires and finishes. - */ - @Test - public void testActualFiresAndFinishes() throws Exception { - tester = TriggerTester.forTrigger( - new OrFinallyTrigger( - AfterPane.elementCountAtLeast(2), - AfterPane.elementCountAtLeast(100)), - FixedWindows.of(Duration.millis(100))); - - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); - - // Not yet firing - tester.injectElements(1); - assertFalse(tester.shouldFire(window)); - assertFalse(tester.isMarkedFinished(window)); - - // The actual fires and finishes - tester.injectElements(2); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertTrue(tester.isMarkedFinished(window)); - } - - /** - * Tests that for {@code OrFinally(actual, ...)} when {@code actual} - * fires but does not finish, the {@code OrFinally} also fires and also does not - * finish. - */ - @Test - public void testActualFiresOnly() throws Exception { - tester = TriggerTester.forTrigger( - new OrFinallyTrigger( - Repeatedly.forever(AfterPane.elementCountAtLeast(2)), - AfterPane.elementCountAtLeast(100)), - FixedWindows.of(Duration.millis(100))); - - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); - - // Not yet firing - tester.injectElements(1); - assertFalse(tester.shouldFire(window)); - assertFalse(tester.isMarkedFinished(window)); - - // The actual fires but does not finish - tester.injectElements(2); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.isMarkedFinished(window)); - - // And again - tester.injectElements(3, 4); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.isMarkedFinished(window)); - } - - /** - * Tests that if the first trigger rewinds to be non-finished in the merged window, - * then it becomes the currently active trigger again, with real triggers. - */ - @Test - public void testShouldFireAfterMerge() throws Exception { - tester = TriggerTester.forTrigger( - AfterEach.inOrder( - AfterPane.elementCountAtLeast(5) - .orFinally(AfterWatermark.pastEndOfWindow()), - Repeatedly.forever(AfterPane.elementCountAtLeast(1))), - Sessions.withGapDuration(Duration.millis(10))); - - // Finished the orFinally in the first window - tester.injectElements(1); - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); - assertFalse(tester.shouldFire(firstWindow)); - tester.advanceInputWatermark(new Instant(11)); - assertTrue(tester.shouldFire(firstWindow)); - tester.fireIfShouldFire(firstWindow); - - // Set up second window where it is not done - tester.injectElements(5); - IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); - assertFalse(tester.shouldFire(secondWindow)); - - // Merge them, if the merged window were on the second trigger, it would be ready - tester.mergeWindows(); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); - assertFalse(tester.shouldFire(mergedWindow)); - - // Now adding 3 more makes the main trigger ready to fire - tester.injectElements(1, 2, 3, 4, 5); - tester.mergeWindows(); - assertTrue(tester.shouldFire(mergedWindow)); - } - - /** - * Tests that for {@code OrFinally(actual, until)} when {@code actual} - * fires but does not finish, then {@code until} fires and finishes, the - * whole thing fires and finished. - */ - @Test - public void testActualFiresButUntilFinishes() throws Exception { - tester = TriggerTester.forTrigger( - new OrFinallyTrigger( - Repeatedly.forever(AfterPane.elementCountAtLeast(2)), - AfterPane.elementCountAtLeast(3)), - FixedWindows.of(Duration.millis(10))); - - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); - - // Before any firing - tester.injectElements(1); - assertFalse(tester.shouldFire(window)); - assertFalse(tester.isMarkedFinished(window)); - - // The actual fires but doesn't finish - tester.injectElements(2); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.isMarkedFinished(window)); - - // The until fires and finishes; the trigger is finished - tester.injectElements(3); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertTrue(tester.isMarkedFinished(window)); - } - - @Test - public void testFireDeadline() throws Exception { - BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10)); - - assertEquals(new Instant(9), - Repeatedly.forever(AfterWatermark.pastEndOfWindow()) - .getWatermarkThatGuaranteesFiring(window)); - assertEquals(new Instant(9), Repeatedly.forever(AfterWatermark.pastEndOfWindow()) - .orFinally(AfterPane.elementCountAtLeast(1)) - .getWatermarkThatGuaranteesFiring(window)); - assertEquals(new Instant(9), Repeatedly.forever(AfterPane.elementCountAtLeast(1)) - .orFinally(AfterWatermark.pastEndOfWindow()) - .getWatermarkThatGuaranteesFiring(window)); - assertEquals(new Instant(9), - AfterPane.elementCountAtLeast(100) - .orFinally(AfterWatermark.pastEndOfWindow()) - .getWatermarkThatGuaranteesFiring(window)); - - assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, - Repeatedly.forever(AfterPane.elementCountAtLeast(1)) - .orFinally(AfterPane.elementCountAtLeast(10)) - .getWatermarkThatGuaranteesFiring(window)); - } - - @Test - public void testContinuation() throws Exception { - OnceTrigger triggerA = AfterProcessingTime.pastFirstElementInPane(); - OnceTrigger triggerB = AfterWatermark.pastEndOfWindow(); - Trigger aOrFinallyB = triggerA.orFinally(triggerB); - Trigger bOrFinallyA = triggerB.orFinally(triggerA); - assertEquals( - Repeatedly.forever( - triggerA.getContinuationTrigger().orFinally(triggerB.getContinuationTrigger())), - aOrFinallyB.getContinuationTrigger()); - assertEquals( - Repeatedly.forever( - triggerB.getContinuationTrigger().orFinally(triggerA.getContinuationTrigger())), - bOrFinallyA.getContinuationTrigger()); - } - - @Test - public void testToString() { - Trigger trigger = StubTrigger.named("t1").orFinally(StubTrigger.named("t2")); - assertEquals("t1.orFinally(t2)", trigger.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java deleted file mode 100644 index 6e8930d..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; - -/** - * Tests for {@link Repeatedly}. - */ -@RunWith(JUnit4.class) -public class RepeatedlyTest { - - @Mock private Trigger mockTrigger; - private SimpleTriggerTester<IntervalWindow> tester; - private static Trigger.TriggerContext anyTriggerContext() { - return Mockito.<Trigger.TriggerContext>any(); - } - - public void setUp(WindowFn<Object, IntervalWindow> windowFn) throws Exception { - MockitoAnnotations.initMocks(this); - tester = TriggerTester.forTrigger(Repeatedly.forever(mockTrigger), windowFn); - } - - /** - * Tests that onElement correctly passes the data on to the subtrigger. - */ - @Test - public void testOnElement() throws Exception { - setUp(FixedWindows.of(Duration.millis(10))); - tester.injectElements(37); - verify(mockTrigger).onElement(Mockito.<Trigger.OnElementContext>any()); - } - - /** - * Tests that the repeatedly is ready to fire whenever the subtrigger is ready. - */ - @Test - public void testShouldFire() throws Exception { - setUp(FixedWindows.of(Duration.millis(10))); - - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - assertTrue(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10)))); - - when(mockTrigger.shouldFire(Mockito.<Trigger.TriggerContext>any())) - .thenReturn(false); - assertFalse(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10)))); - } - - /** - * Tests that the watermark that guarantees firing is that of the subtrigger. - */ - @Test - public void testFireDeadline() throws Exception { - setUp(FixedWindows.of(Duration.millis(10))); - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); - Instant arbitraryInstant = new Instant(34957849); - - when(mockTrigger.getWatermarkThatGuaranteesFiring(Mockito.<IntervalWindow>any())) - .thenReturn(arbitraryInstant); - - assertThat( - Repeatedly.forever(mockTrigger).getWatermarkThatGuaranteesFiring(window), - equalTo(arbitraryInstant)); - } - - @Test - public void testContinuation() throws Exception { - Trigger trigger = AfterProcessingTime.pastFirstElementInPane(); - Trigger repeatedly = Repeatedly.forever(trigger); - assertEquals( - Repeatedly.forever(trigger.getContinuationTrigger()), repeatedly.getContinuationTrigger()); - assertEquals( - Repeatedly.forever(trigger.getContinuationTrigger().getContinuationTrigger()), - repeatedly.getContinuationTrigger().getContinuationTrigger()); - } - - @Test - public void testShouldFireAfterMerge() throws Exception { - tester = TriggerTester.forTrigger( - Repeatedly.forever(AfterPane.elementCountAtLeast(2)), - Sessions.withGapDuration(Duration.millis(10))); - - tester.injectElements(1); - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); - assertFalse(tester.shouldFire(firstWindow)); - - tester.injectElements(5); - IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); - assertFalse(tester.shouldFire(secondWindow)); - - // Merge them, if the merged window were on the second trigger, it would be ready - tester.mergeWindows(); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); - assertTrue(tester.shouldFire(mergedWindow)); - } - - @Test - public void testRepeatedlyAfterFirstElementCount() throws Exception { - SimpleTriggerTester<GlobalWindow> tester = - TriggerTester.forTrigger( - Repeatedly.forever( - AfterFirst.of( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(Duration.standardMinutes(15)), - AfterPane.elementCountAtLeast(5))), - new GlobalWindows()); - - GlobalWindow window = GlobalWindow.INSTANCE; - - tester.injectElements(1); - assertFalse(tester.shouldFire(window)); - - tester.injectElements(2, 3, 4, 5); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.shouldFire(window)); - } - - @Test - public void testRepeatedlyAfterFirstProcessingTime() throws Exception { - SimpleTriggerTester<GlobalWindow> tester = - TriggerTester.forTrigger( - Repeatedly.forever( - AfterFirst.of( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(Duration.standardMinutes(15)), - AfterPane.elementCountAtLeast(5))), - new GlobalWindows()); - - GlobalWindow window = GlobalWindow.INSTANCE; - - tester.injectElements(1); - assertFalse(tester.shouldFire(window)); - - tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15))); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.shouldFire(window)); - } - - @Test - public void testRepeatedlyElementCount() throws Exception { - SimpleTriggerTester<GlobalWindow> tester = - TriggerTester.forTrigger( - Repeatedly.forever(AfterPane.elementCountAtLeast(5)), - new GlobalWindows()); - - GlobalWindow window = GlobalWindow.INSTANCE; - - tester.injectElements(1); - assertFalse(tester.shouldFire(window)); - - tester.injectElements(2, 3, 4, 5); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.shouldFire(window)); - } - - @Test - public void testRepeatedlyProcessingTime() throws Exception { - SimpleTriggerTester<GlobalWindow> tester = - TriggerTester.forTrigger( - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(Duration.standardMinutes(15))), - new GlobalWindows()); - - GlobalWindow window = GlobalWindow.INSTANCE; - - tester.injectElements(1); - assertFalse(tester.shouldFire(window)); - - tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15))); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.shouldFire(window)); - } - - - @Test - public void testToString() { - Trigger trigger = Repeatedly.forever(new StubTrigger() { - @Override - public String toString() { - return "innerTrigger"; - } - }); - - assertEquals("Repeatedly.forever(innerTrigger)", trigger.toString()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java deleted file mode 100644 index 83077f4..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link ReshuffleTrigger}. - */ -@RunWith(JUnit4.class) -public class ReshuffleTriggerTest { - - /** Public so that other tests can instantiate {@link ReshuffleTrigger}. */ - public static <W extends BoundedWindow> ReshuffleTrigger<W> forTest() { - return new ReshuffleTrigger<>(); - } - - @Test - public void testShouldFire() throws Exception { - TriggerTester<Integer, IntervalWindow> tester = TriggerTester.forTrigger( - new ReshuffleTrigger<IntervalWindow>(), FixedWindows.of(Duration.millis(100))); - IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(300), new Instant(400)); - assertTrue(tester.shouldFire(arbitraryWindow)); - } - - @Test - public void testOnTimer() throws Exception { - TriggerTester<Integer, IntervalWindow> tester = TriggerTester.forTrigger( - new ReshuffleTrigger<IntervalWindow>(), FixedWindows.of(Duration.millis(100))); - IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(100), new Instant(200)); - tester.fireIfShouldFire(arbitraryWindow); - assertFalse(tester.isMarkedFinished(arbitraryWindow)); - } - - @Test - public void testToString() { - Trigger trigger = new ReshuffleTrigger<>(); - assertEquals("ReshuffleTrigger()", trigger.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java deleted file mode 100644 index b258a79..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import com.google.common.collect.Lists; -import java.util.List; -import org.joda.time.Instant; - -/** - * No-op {@link OnceTrigger} implementation for testing. - */ -abstract class StubTrigger extends Trigger.OnceTrigger { - /** - * Create a stub {@link Trigger} instance which returns the specified name on {@link #toString()}. - */ - static StubTrigger named(final String name) { - return new StubTrigger() { - @Override - public String toString() { - return name; - } - }; - } - - protected StubTrigger() { - super(Lists.<Trigger>newArrayList()); - } - - @Override - protected void onOnlyFiring(TriggerContext context) throws Exception { - } - - @Override - public void onElement(OnElementContext c) throws Exception { - } - - @Override - public void onMerge(OnMergeContext c) throws Exception { - } - - @Override - public boolean shouldFire(TriggerContext context) throws Exception { - return false; - } - - @Override - protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { - return null; - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java deleted file mode 100644 index cfc03b2..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.Arrays; -import java.util.List; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link Trigger}. - */ -@RunWith(JUnit4.class) -public class TriggerTest { - - @Test - public void testTriggerToString() throws Exception { - assertEquals("AfterWatermark.pastEndOfWindow()", AfterWatermark.pastEndOfWindow().toString()); - assertEquals("Repeatedly.forever(AfterWatermark.pastEndOfWindow())", - Repeatedly.forever(AfterWatermark.pastEndOfWindow()).toString()); - } - - @Test - public void testIsCompatible() throws Exception { - assertTrue(new Trigger1(null).isCompatible(new Trigger1(null))); - assertTrue(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null))) - .isCompatible(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null))))); - - assertFalse(new Trigger1(null).isCompatible(new Trigger2(null))); - assertFalse(new Trigger1(Arrays.<Trigger>asList(new Trigger1(null))) - .isCompatible(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null))))); - } - - private static class Trigger1 extends Trigger { - - private Trigger1(List<Trigger> subTriggers) { - super(subTriggers); - } - - @Override - public void onElement(Trigger.OnElementContext c) { } - - @Override - public void onMerge(Trigger.OnMergeContext c) { } - - @Override - protected Trigger getContinuationTrigger( - List<Trigger> continuationTriggers) { - return null; - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - return null; - } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - return false; - } - - @Override - public void onFire(Trigger.TriggerContext context) throws Exception { } - } - - private static class Trigger2 extends Trigger { - - private Trigger2(List<Trigger> subTriggers) { - super(subTriggers); - } - - @Override - public void onElement(Trigger.OnElementContext c) { } - - @Override - public void onMerge(Trigger.OnMergeContext c) { } - - @Override - protected Trigger getContinuationTrigger( - List<Trigger> continuationTriggers) { - return null; - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - return null; - } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - return false; - } - - @Override - public void onFire(Trigger.TriggerContext context) throws Exception { } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTester.java deleted file mode 100644 index 5fe17ad..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTester.java +++ /dev/null @@ -1,410 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static org.junit.Assert.assertTrue; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import javax.annotation.Nullable; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.ActiveWindowSet.MergeCallback; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; -import org.apache.beam.sdk.util.state.InMemoryTimerInternals; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.util.state.StateNamespaces; -import org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace; -import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace; -import org.apache.beam.sdk.util.state.TestInMemoryStateInternals; -import org.apache.beam.sdk.util.state.TimerCallback; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * Test utility that runs a {@link Trigger}, using in-memory stub implementation to provide - * the {@link StateInternals}. - * - * @param <W> The type of windows being used. - */ -public class TriggerTester<InputT, W extends BoundedWindow> { - - /** - * A {@link TriggerTester} specialized to {@link Integer} values, so elements and timestamps - * can be conflated. Today, triggers should not observed the element type, so this is the - * only trigger tester that needs to be used. - */ - public static class SimpleTriggerTester<W extends BoundedWindow> - extends TriggerTester<Integer, W> { - - private SimpleTriggerTester(WindowingStrategy<Object, W> windowingStrategy) throws Exception { - super(windowingStrategy); - } - - public void injectElements(int... values) throws Exception { - List<TimestampedValue<Integer>> timestampedValues = - Lists.newArrayListWithCapacity(values.length); - for (int value : values) { - timestampedValues.add(TimestampedValue.of(value, new Instant(value))); - } - injectElements(timestampedValues); - } - - public SimpleTriggerTester<W> withAllowedLateness(Duration allowedLateness) throws Exception { - return new SimpleTriggerTester<>( - windowingStrategy.withAllowedLateness(allowedLateness)); - } - } - - protected final WindowingStrategy<Object, W> windowingStrategy; - - private final TestInMemoryStateInternals<?> stateInternals = - new TestInMemoryStateInternals<Object>(null /* key */); - private final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); - private final TriggerContextFactory<W> contextFactory; - private final WindowFn<Object, W> windowFn; - private final ActiveWindowSet<W> activeWindows; - private final Map<W, W> windowToMergeResult; - - /** - * An {@link ExecutableTrigger} built from the {@link Trigger} or {@link Trigger} - * under test. - */ - private final ExecutableTrigger executableTrigger; - - /** - * A map from a window and trigger to whether that trigger is finished for the window. - */ - private final Map<W, FinishedTriggers> finishedSets; - - public static <W extends BoundedWindow> SimpleTriggerTester<W> forTrigger( - Trigger trigger, WindowFn<Object, W> windowFn) - throws Exception { - WindowingStrategy<Object, W> windowingStrategy = - WindowingStrategy.of(windowFn).withTrigger(trigger) - // Merging requires accumulation mode or early firings can break up a session. - // Not currently an issue with the tester (because we never GC) but we don't want - // mystery failures due to violating this need. - .withMode(windowFn.isNonMerging() - ? AccumulationMode.DISCARDING_FIRED_PANES - : AccumulationMode.ACCUMULATING_FIRED_PANES); - - return new SimpleTriggerTester<>(windowingStrategy); - } - - public static <InputT, W extends BoundedWindow> TriggerTester<InputT, W> forAdvancedTrigger( - Trigger trigger, WindowFn<Object, W> windowFn) throws Exception { - WindowingStrategy<Object, W> strategy = - WindowingStrategy.of(windowFn).withTrigger(trigger) - // Merging requires accumulation mode or early firings can break up a session. - // Not currently an issue with the tester (because we never GC) but we don't want - // mystery failures due to violating this need. - .withMode(windowFn.isNonMerging() - ? AccumulationMode.DISCARDING_FIRED_PANES - : AccumulationMode.ACCUMULATING_FIRED_PANES); - - return new TriggerTester<>(strategy); - } - - protected TriggerTester(WindowingStrategy<Object, W> windowingStrategy) throws Exception { - this.windowingStrategy = windowingStrategy; - this.windowFn = windowingStrategy.getWindowFn(); - this.executableTrigger = windowingStrategy.getTrigger(); - this.finishedSets = new HashMap<>(); - - this.activeWindows = - windowFn.isNonMerging() - ? new NonMergingActiveWindowSet<W>() - : new MergingActiveWindowSet<W>(windowFn, stateInternals); - this.windowToMergeResult = new HashMap<>(); - - this.contextFactory = - new TriggerContextFactory<>(windowingStrategy.getWindowFn(), stateInternals, activeWindows); - } - - /** - * Instructs the trigger to clear its state for the given window. - */ - public void clearState(W window) throws Exception { - executableTrigger.invokeClear(contextFactory.base(window, - new TestTimers(windowNamespace(window)), executableTrigger, getFinishedSet(window))); - } - - /** - * Asserts that the trigger has actually cleared all of its state for the given window. Since - * the trigger under test is the root, this makes the assert for all triggers regardless - * of their position in the trigger tree. - */ - public void assertCleared(W window) { - for (StateNamespace untypedNamespace : stateInternals.getNamespacesInUse()) { - if (untypedNamespace instanceof WindowAndTriggerNamespace) { - @SuppressWarnings("unchecked") - WindowAndTriggerNamespace<W> namespace = (WindowAndTriggerNamespace<W>) untypedNamespace; - if (namespace.getWindow().equals(window)) { - Set<?> tagsInUse = stateInternals.getTagsInUse(namespace); - assertTrue("Trigger has not cleared tags: " + tagsInUse, tagsInUse.isEmpty()); - } - } - } - } - - /** - * Returns {@code true} if the {@link Trigger} under test is finished for the given window. - */ - public boolean isMarkedFinished(W window) { - FinishedTriggers finishedSet = finishedSets.get(window); - if (finishedSet == null) { - return false; - } - - return finishedSet.isFinished(executableTrigger); - } - - private StateNamespace windowNamespace(W window) { - return StateNamespaces.window(windowFn.windowCoder(), checkNotNull(window)); - } - - /** - * Advance the input watermark to the specified time, then advance the output watermark as far as - * possible. - */ - public void advanceInputWatermark(Instant newInputWatermark) throws Exception { - // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694 - timerInternals.advanceInputWatermark(TimerCallback.NO_OP, newInputWatermark); - } - - /** Advance the processing time to the specified time. */ - public void advanceProcessingTime(Instant newProcessingTime) throws Exception { - // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694 - timerInternals.advanceProcessingTime(TimerCallback.NO_OP, newProcessingTime); - } - - /** - * Inject all the timestamped values (after passing through the window function) as if they - * arrived in a single chunk of a bundle (or work-unit). - */ - @SafeVarargs - public final void injectElements(TimestampedValue<InputT>... values) throws Exception { - injectElements(Arrays.asList(values)); - } - - public final void injectElements(Collection<TimestampedValue<InputT>> values) throws Exception { - for (TimestampedValue<InputT> value : values) { - WindowTracing.trace("TriggerTester.injectElements: {}", value); - } - - List<WindowedValue<InputT>> windowedValues = Lists.newArrayListWithCapacity(values.size()); - - for (TimestampedValue<InputT> input : values) { - try { - InputT value = input.getValue(); - Instant timestamp = input.getTimestamp(); - Collection<W> assignedWindows = windowFn.assignWindows(new TestAssignContext<W>( - windowFn, value, timestamp, GlobalWindow.INSTANCE)); - - for (W window : assignedWindows) { - activeWindows.addActiveForTesting(window); - - // Today, triggers assume onTimer firing at the watermark time, whether or not they - // explicitly set the timer themselves. So this tester must set it. - timerInternals.setTimer( - TimerData.of(windowNamespace(window), window.maxTimestamp(), TimeDomain.EVENT_TIME)); - } - - windowedValues.add(WindowedValue.of(value, timestamp, assignedWindows, PaneInfo.NO_FIRING)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - for (WindowedValue<InputT> windowedValue : windowedValues) { - for (BoundedWindow untypedWindow : windowedValue.getWindows()) { - // SDK is responsible for type safety - @SuppressWarnings("unchecked") - W window = mergeResult((W) untypedWindow); - - Trigger.OnElementContext context = contextFactory.createOnElementContext(window, - new TestTimers(windowNamespace(window)), windowedValue.getTimestamp(), - executableTrigger, getFinishedSet(window)); - - if (!context.trigger().isFinished()) { - executableTrigger.invokeOnElement(context); - } - } - } - } - - public boolean shouldFire(W window) throws Exception { - Trigger.TriggerContext context = contextFactory.base( - window, - new TestTimers(windowNamespace(window)), - executableTrigger, getFinishedSet(window)); - executableTrigger.getSpec().prefetchShouldFire(context.state()); - return executableTrigger.invokeShouldFire(context); - } - - public void fireIfShouldFire(W window) throws Exception { - Trigger.TriggerContext context = contextFactory.base( - window, - new TestTimers(windowNamespace(window)), - executableTrigger, getFinishedSet(window)); - - executableTrigger.getSpec().prefetchShouldFire(context.state()); - if (executableTrigger.invokeShouldFire(context)) { - executableTrigger.getSpec().prefetchOnFire(context.state()); - executableTrigger.invokeOnFire(context); - if (context.trigger().isFinished()) { - activeWindows.remove(window); - executableTrigger.invokeClear(context); - } - } - } - - public void setSubTriggerFinishedForWindow(int subTriggerIndex, W window, boolean value) { - getFinishedSet(window).setFinished(executableTrigger.subTriggers().get(subTriggerIndex), value); - } - - /** - * Invokes merge from the {@link WindowFn} a single time and passes the resulting merge - * events on to the trigger under test. Does not persist the fact that merging happened, - * since it is just to test the trigger's {@code OnMerge} method. - */ - public final void mergeWindows() throws Exception { - windowToMergeResult.clear(); - activeWindows.merge(new MergeCallback<W>() { - @Override - public void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {} - - @Override - public void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception { - List<W> activeToBeMerged = new ArrayList<W>(); - for (W window : toBeMerged) { - windowToMergeResult.put(window, mergeResult); - if (activeWindows.isActive(window)) { - activeToBeMerged.add(window); - } - } - Map<W, FinishedTriggers> mergingFinishedSets = - Maps.newHashMapWithExpectedSize(activeToBeMerged.size()); - for (W oldWindow : activeToBeMerged) { - mergingFinishedSets.put(oldWindow, getFinishedSet(oldWindow)); - } - executableTrigger.invokeOnMerge(contextFactory.createOnMergeContext(mergeResult, - new TestTimers(windowNamespace(mergeResult)), executableTrigger, - getFinishedSet(mergeResult), mergingFinishedSets)); - timerInternals.setTimer(TimerData.of( - windowNamespace(mergeResult), mergeResult.maxTimestamp(), TimeDomain.EVENT_TIME)); - } - }); - } - - public W mergeResult(W window) { - W result = windowToMergeResult.get(window); - return result == null ? window : result; - } - - private FinishedTriggers getFinishedSet(W window) { - FinishedTriggers finishedSet = finishedSets.get(window); - if (finishedSet == null) { - finishedSet = FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()); - finishedSets.put(window, finishedSet); - } - return finishedSet; - } - - private static class TestAssignContext<W extends BoundedWindow> - extends WindowFn<Object, W>.AssignContext { - private Object element; - private Instant timestamp; - private BoundedWindow window; - - public TestAssignContext( - WindowFn<Object, W> windowFn, Object element, Instant timestamp, BoundedWindow window) { - windowFn.super(); - this.element = element; - this.timestamp = timestamp; - this.window = window; - } - - @Override - public Object element() { - return element; - } - - @Override - public Instant timestamp() { - return timestamp; - } - - @Override - public BoundedWindow window() { - return window; - } - } - - private class TestTimers implements Timers { - private final StateNamespace namespace; - - public TestTimers(StateNamespace namespace) { - checkArgument(namespace instanceof WindowNamespace); - this.namespace = namespace; - } - - @Override - public void setTimer(Instant timestamp, TimeDomain timeDomain) { - timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain)); - } - - @Override - public void deleteTimer(Instant timestamp, TimeDomain timeDomain) { - timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain)); - } - - @Override - public Instant currentProcessingTime() { - return timerInternals.currentProcessingTime(); - } - - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return timerInternals.currentSynchronizedProcessingTime(); - } - - @Override - public Instant currentEventTime() { - return timerInternals.currentInputWatermarkTime(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterAllStateMachineTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterAllStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterAllStateMachineTest.java new file mode 100644 index 0000000..907292c --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterAllStateMachineTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link AfterAllStateMachine}. + */ +@RunWith(JUnit4.class) +public class AfterAllStateMachineTest { + + private SimpleTriggerStateMachineTester<IntervalWindow> tester; + + @Test + public void testT1FiresFirst() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + AfterAllStateMachine.of( + AfterPaneStateMachine.elementCountAtLeast(1), + AfterPaneStateMachine.elementCountAtLeast(2)), + FixedWindows.of(Duration.millis(100))); + + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.injectElements(2); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + @Test + public void testT2FiresFirst() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + AfterAllStateMachine.of( + AfterPaneStateMachine.elementCountAtLeast(2), + AfterPaneStateMachine.elementCountAtLeast(1)), + FixedWindows.of(Duration.millis(100))); + + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.injectElements(2); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + /** + * Tests that the AfterAll properly unsets finished bits when a merge causing it to become + * unfinished. + */ + @Test + public void testOnMergeRewinds() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + AfterEachStateMachine.inOrder( + AfterAllStateMachine.of( + AfterWatermarkStateMachine.pastEndOfWindow(), + AfterPaneStateMachine.elementCountAtLeast(1)), + RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(1))), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements(1); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + + tester.injectElements(5); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + + // Finish the AfterAll in the first window + tester.advanceInputWatermark(new Instant(11)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + + // Merge them; the AfterAll should not be finished + tester.mergeWindows(); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + assertFalse(tester.isMarkedFinished(mergedWindow)); + + // Confirm that we are back on the first trigger by probing that it is not ready to fire + // after an element (with merging) + tester.injectElements(3); + tester.mergeWindows(); + assertFalse(tester.shouldFire(mergedWindow)); + + // Fire the AfterAll in the merged window + tester.advanceInputWatermark(new Instant(15)); + assertTrue(tester.shouldFire(mergedWindow)); + tester.fireIfShouldFire(mergedWindow); + + // Confirm that we are on the second trigger by probing + tester.injectElements(2); + tester.mergeWindows(); + assertTrue(tester.shouldFire(mergedWindow)); + tester.fireIfShouldFire(mergedWindow); + tester.injectElements(2); + tester.mergeWindows(); + assertTrue(tester.shouldFire(mergedWindow)); + tester.fireIfShouldFire(mergedWindow); + } + + @Test + public void testToString() { + TriggerStateMachine trigger = + AfterAllStateMachine.of( + StubTriggerStateMachine.named("t1"), StubTriggerStateMachine.named("t2")); + assertEquals("AfterAll.of(t1, t2)", trigger.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterEachStateMachineTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterEachStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterEachStateMachineTest.java new file mode 100644 index 0000000..4fae8f1 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterEachStateMachineTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link AfterEachStateMachine}. + */ +@RunWith(JUnit4.class) +public class AfterEachStateMachineTest { + + private SimpleTriggerStateMachineTester<IntervalWindow> tester; + + @Before + public void initMocks() { + MockitoAnnotations.initMocks(this); + } + + /** + * Tests that the {@link AfterEachStateMachine} trigger fires and finishes the first trigger then + * the second. + */ + @Test + public void testAfterEachInSequence() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + AfterEachStateMachine.inOrder( + RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(2)) + .orFinally(AfterPaneStateMachine.elementCountAtLeast(3)), + RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(5)) + .orFinally(AfterWatermarkStateMachine.pastEndOfWindow())), + FixedWindows.of(Duration.millis(10))); + + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + + // AfterCount(2) not ready + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + // AfterCount(2) ready, not finished + tester.injectElements(2); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + // orFinally(AfterCount(3)) ready and will finish the first + tester.injectElements(1, 2, 3); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + // Now running as the second trigger + assertFalse(tester.shouldFire(window)); + // This quantity of elements would fire and finish if it were erroneously still the first + tester.injectElements(1, 2, 3, 4); + assertFalse(tester.shouldFire(window)); + + // Now fire once + tester.injectElements(5); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + // This time advance the watermark to finish the whole mess. + tester.advanceInputWatermark(new Instant(10)); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + @Test + public void testToString() { + TriggerStateMachine trigger = AfterEachStateMachine.inOrder( + StubTriggerStateMachine.named("t1"), + StubTriggerStateMachine.named("t2"), + StubTriggerStateMachine.named("t3")); + + assertEquals("AfterEach.inOrder(t1, t2, t3)", trigger.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java new file mode 100644 index 0000000..453c8ff --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; +import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link AfterFirstStateMachine}. + */ +@RunWith(JUnit4.class) +public class AfterFirstStateMachineTest { + + @Mock private OnceTriggerStateMachine mockTrigger1; + @Mock private OnceTriggerStateMachine mockTrigger2; + private SimpleTriggerStateMachineTester<IntervalWindow> tester; + private static TriggerStateMachine.TriggerContext anyTriggerContext() { + return Mockito.<TriggerStateMachine.TriggerContext>any(); + } + + @Before + public void initMocks() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testNeitherShouldFireFixedWindows() throws Exception { + tester = + TriggerStateMachineTester.forTrigger( + AfterFirstStateMachine.of(mockTrigger1, mockTrigger2), + FixedWindows.of(Duration.millis(10))); + + tester.injectElements(1); + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + + when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false); + when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false); + + assertFalse(tester.shouldFire(window)); // should not fire + assertFalse(tester.isMarkedFinished(window)); // not finished + } + + @Test + public void testOnlyT1ShouldFireFixedWindows() throws Exception { + tester = + TriggerStateMachineTester.forTrigger( + AfterFirstStateMachine.of(mockTrigger1, mockTrigger2), + FixedWindows.of(Duration.millis(10))); + tester.injectElements(1); + IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11)); + + when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false); + + assertTrue(tester.shouldFire(window)); // should fire + + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + @Test + public void testOnlyT2ShouldFireFixedWindows() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + AfterFirstStateMachine.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10))); + tester.injectElements(1); + IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11)); + + when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false); + when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true); + assertTrue(tester.shouldFire(window)); // should fire + + tester.fireIfShouldFire(window); // now finished + assertTrue(tester.isMarkedFinished(window)); + } + + @Test + public void testBothShouldFireFixedWindows() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + AfterFirstStateMachine.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10))); + tester.injectElements(1); + IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11)); + + when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true); + assertTrue(tester.shouldFire(window)); // should fire + + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + /** + * Tests that if the first trigger rewinds to be non-finished in the merged window, + * then it becomes the currently active trigger again, with real triggers. + */ + @Test + public void testShouldFireAfterMerge() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + AfterEachStateMachine.inOrder( + AfterFirstStateMachine.of( + AfterPaneStateMachine.elementCountAtLeast(5), + AfterWatermarkStateMachine.pastEndOfWindow()), + RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(1))), + Sessions.withGapDuration(Duration.millis(10))); + + // Finished the AfterFirst in the first window + tester.injectElements(1); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + assertFalse(tester.shouldFire(firstWindow)); + tester.advanceInputWatermark(new Instant(11)); + assertTrue(tester.shouldFire(firstWindow)); + tester.fireIfShouldFire(firstWindow); + + // Set up second window where it is not done + tester.injectElements(5); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + assertFalse(tester.shouldFire(secondWindow)); + + // Merge them, if the merged window were on the second trigger, it would be ready + tester.mergeWindows(); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + assertFalse(tester.shouldFire(mergedWindow)); + + // Now adding 3 more makes the AfterFirst ready to fire + tester.injectElements(1, 2, 3, 4, 5); + tester.mergeWindows(); + assertTrue(tester.shouldFire(mergedWindow)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachineTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachineTest.java new file mode 100644 index 0000000..4240174 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachineTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link AfterPaneStateMachine}. + */ +@RunWith(JUnit4.class) +public class AfterPaneStateMachineTest { + + SimpleTriggerStateMachineTester<IntervalWindow> tester; + /** + * Tests that the trigger does fire when enough elements are in a window, and that it only + * fires that window (no leakage). + */ + @Test + public void testAfterPaneElementCountFixedWindows() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + AfterPaneStateMachine.elementCountAtLeast(2), + FixedWindows.of(Duration.millis(10))); + + tester.injectElements(1); // [0, 10) + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + assertFalse(tester.shouldFire(window)); + + tester.injectElements(2); // [0, 10) + tester.injectElements(11); // [10, 20) + + assertTrue(tester.shouldFire(window)); // ready to fire + tester.fireIfShouldFire(window); // and finished + assertTrue(tester.isMarkedFinished(window)); + + // But don't finish the other window + assertFalse(tester.isMarkedFinished(new IntervalWindow(new Instant(10), new Instant(20)))); + } + + @Test + public void testClear() throws Exception { + SimpleTriggerStateMachineTester<IntervalWindow> tester = TriggerStateMachineTester.forTrigger( + AfterPaneStateMachine.elementCountAtLeast(2), + FixedWindows.of(Duration.millis(10))); + + tester.injectElements(1, 2, 3); + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + tester.clearState(window); + tester.assertCleared(window); + } + + @Test + public void testAfterPaneElementCountSessions() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + AfterPaneStateMachine.elementCountAtLeast(2), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements( + 1, // in [1, 11) + 2); // in [2, 12) + + assertFalse(tester.shouldFire(new IntervalWindow(new Instant(1), new Instant(11)))); + assertFalse(tester.shouldFire(new IntervalWindow(new Instant(2), new Instant(12)))); + + tester.mergeWindows(); + + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(12)); + assertTrue(tester.shouldFire(mergedWindow)); + tester.fireIfShouldFire(mergedWindow); + assertTrue(tester.isMarkedFinished(mergedWindow)); + + // Because we closed the previous window, we don't have it around to merge with. So there + // will be a new FIRE_AND_FINISH result. + tester.injectElements( + 7, // in [7, 17) + 9); // in [9, 19) + + tester.mergeWindows(); + + IntervalWindow newMergedWindow = new IntervalWindow(new Instant(7), new Instant(19)); + assertTrue(tester.shouldFire(newMergedWindow)); + tester.fireIfShouldFire(newMergedWindow); + assertTrue(tester.isMarkedFinished(newMergedWindow)); + } + + @Test + public void testToString() { + TriggerStateMachine trigger = AfterPaneStateMachine.elementCountAtLeast(5); + assertEquals("AfterPane.elementCountAtLeast(5)", trigger.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachineTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachineTest.java new file mode 100644 index 0000000..9fbf801 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachineTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests the {@link AfterProcessingTimeStateMachine}. + */ +@RunWith(JUnit4.class) +public class AfterProcessingTimeStateMachineTest { + + /** + * Tests the basic property that the trigger does wait for processing time to be + * far enough advanced. + */ + @Test + public void testAfterProcessingTimeFixedWindows() throws Exception { + Duration windowDuration = Duration.millis(10); + SimpleTriggerStateMachineTester<IntervalWindow> tester = TriggerStateMachineTester.forTrigger( + AfterProcessingTimeStateMachine + .pastFirstElementInPane() + .plusDelayOf(Duration.millis(5)), + FixedWindows.of(windowDuration)); + + tester.advanceProcessingTime(new Instant(10)); + + // Timer at 15 + tester.injectElements(1); + IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10)); + tester.advanceProcessingTime(new Instant(12)); + assertFalse(tester.shouldFire(firstWindow)); + + // Load up elements in the next window, timer at 17 for them + tester.injectElements(11, 12, 13); + IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20)); + assertFalse(tester.shouldFire(secondWindow)); + + // Not quite time to fire + tester.advanceProcessingTime(new Instant(14)); + assertFalse(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + + // Timer at 19 for these in the first window; it should be ignored since the 15 will fire first + tester.injectElements(2, 3); + + // Advance past the first timer and fire, finishing the first window + tester.advanceProcessingTime(new Instant(16)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + assertTrue(tester.isMarkedFinished(firstWindow)); + + // The next window fires and finishes now + tester.advanceProcessingTime(new Instant(18)); + assertTrue(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(secondWindow); + assertTrue(tester.isMarkedFinished(secondWindow)); + } + + /** + * Tests that when windows merge, if the trigger is waiting for "N millis after the first + * element" that it is relative to the earlier of the two merged windows. + */ + @Test + public void testClear() throws Exception { + SimpleTriggerStateMachineTester<IntervalWindow> tester = TriggerStateMachineTester.forTrigger( + AfterProcessingTimeStateMachine + .pastFirstElementInPane() + .plusDelayOf(Duration.millis(5)), + FixedWindows.of(Duration.millis(10))); + + tester.injectElements(1, 2, 3); + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + tester.clearState(window); + tester.assertCleared(window); + } + + @Test + public void testAfterProcessingTimeWithMergingWindow() throws Exception { + SimpleTriggerStateMachineTester<IntervalWindow> tester = TriggerStateMachineTester.forTrigger( + AfterProcessingTimeStateMachine + .pastFirstElementInPane() + .plusDelayOf(Duration.millis(5)), + Sessions.withGapDuration(Duration.millis(10))); + + tester.advanceProcessingTime(new Instant(10)); + tester.injectElements(1); // in [1, 11), timer for 15 + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + assertFalse(tester.shouldFire(firstWindow)); + + tester.advanceProcessingTime(new Instant(12)); + tester.injectElements(3); // in [3, 13), timer for 17 + IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new Instant(13)); + assertFalse(tester.shouldFire(secondWindow)); + + tester.mergeWindows(); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(13)); + + tester.advanceProcessingTime(new Instant(16)); + assertTrue(tester.shouldFire(mergedWindow)); + } + + /** + * Basic test of compatibility check between identical triggers. + */ + @Test + public void testCompatibilityIdentical() throws Exception { + TriggerStateMachine t1 = AfterProcessingTimeStateMachine.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(1L)); + TriggerStateMachine t2 = AfterProcessingTimeStateMachine.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(1L)); + assertTrue(t1.isCompatible(t2)); + } + + @Test + public void testToString() { + TriggerStateMachine trigger = AfterProcessingTimeStateMachine.pastFirstElementInPane(); + assertEquals("AfterProcessingTime.pastFirstElementInPane()", trigger.toString()); + } + + @Test + public void testWithDelayToString() { + TriggerStateMachine trigger = AfterProcessingTimeStateMachine.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(5)); + + assertEquals("AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5 minutes)", + trigger.toString()); + } + + @Test + public void testBuiltUpToString() { + TriggerStateMachine trigger = AfterWatermarkStateMachine.pastEndOfWindow() + .withLateFirings(AfterProcessingTimeStateMachine + .pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(10))); + + String expected = "AfterWatermark.pastEndOfWindow()" + + ".withLateFirings(AfterProcessingTime" + + ".pastFirstElementInPane()" + + ".plusDelayOf(10 minutes))"; + + assertEquals(expected, trigger.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java new file mode 100644 index 0000000..140bd62 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests the {@link AfterSynchronizedProcessingTimeStateMachine}. + */ +@RunWith(JUnit4.class) +public class AfterSynchronizedProcessingTimeStateMachineTest { + + private TriggerStateMachine underTest = new AfterSynchronizedProcessingTimeStateMachine(); + + @Test + public void testAfterProcessingTimeWithFixedWindows() throws Exception { + Duration windowDuration = Duration.millis(10); + SimpleTriggerStateMachineTester<IntervalWindow> tester = TriggerStateMachineTester.forTrigger( + AfterProcessingTimeStateMachine + .pastFirstElementInPane() + .plusDelayOf(Duration.millis(5)), + FixedWindows.of(windowDuration)); + + tester.advanceProcessingTime(new Instant(10)); + + // Timer at 15 + tester.injectElements(1); + IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10)); + tester.advanceProcessingTime(new Instant(12)); + assertFalse(tester.shouldFire(firstWindow)); + + // Load up elements in the next window, timer at 17 for them + tester.injectElements(11, 12, 13); + IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20)); + assertFalse(tester.shouldFire(secondWindow)); + + // Not quite time to fire + tester.advanceProcessingTime(new Instant(14)); + assertFalse(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + + // Timer at 19 for these in the first window; it should be ignored since the 15 will fire first + tester.injectElements(2, 3); + + // Advance past the first timer and fire, finishing the first window + tester.advanceProcessingTime(new Instant(16)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + assertTrue(tester.isMarkedFinished(firstWindow)); + + // The next window fires and finishes now + tester.advanceProcessingTime(new Instant(18)); + assertTrue(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(secondWindow); + assertTrue(tester.isMarkedFinished(secondWindow)); + } + + @Test + public void testAfterProcessingTimeWithMergingWindow() throws Exception { + Duration windowDuration = Duration.millis(10); + SimpleTriggerStateMachineTester<IntervalWindow> tester = TriggerStateMachineTester.forTrigger( + AfterProcessingTimeStateMachine + .pastFirstElementInPane() + .plusDelayOf(Duration.millis(5)), + Sessions.withGapDuration(windowDuration)); + + tester.advanceProcessingTime(new Instant(10)); + tester.injectElements(1); // in [1, 11), timer for 15 + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + assertFalse(tester.shouldFire(firstWindow)); + + tester.advanceProcessingTime(new Instant(12)); + tester.injectElements(3); // in [3, 13), timer for 17 + IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new Instant(13)); + assertFalse(tester.shouldFire(secondWindow)); + + tester.mergeWindows(); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(13)); + + tester.advanceProcessingTime(new Instant(16)); + assertTrue(tester.shouldFire(mergedWindow)); + } +}