http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java index 1c6cd30..b526305 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java @@ -17,88 +17,545 @@ */ package org.apache.beam.runners.core; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.state.BagState; import org.apache.beam.sdk.state.CombiningState; +import org.apache.beam.sdk.state.GroupingState; import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.ReadableState; import org.apache.beam.sdk.state.SetState; -import org.apache.beam.sdk.state.State; import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.state.WatermarkHoldState; import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.hamcrest.Matchers; +import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.junit.runners.Suite; /** - * Tests for {@link InMemoryStateInternals}. This is based on {@link StateInternalsTest}. + * Tests for {@link InMemoryStateInternals}. */ -@RunWith(Suite.class) [email protected]({ - InMemoryStateInternalsTest.StandardStateInternalsTests.class, - InMemoryStateInternalsTest.OtherTests.class -}) +@RunWith(JUnit4.class) public class InMemoryStateInternalsTest { + private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10)); + private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1"); + private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); + private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3"); - /** - * A standard StateInternals test. - */ - @RunWith(JUnit4.class) - public static class StandardStateInternalsTests extends StateInternalsTest { - @Override - protected StateInternals createStateInternals() { - return new InMemoryStateInternals<>("dummyKey"); - } + private static final StateTag<ValueState<String>> STRING_VALUE_ADDR = + StateTags.value("stringValue", StringUtf8Coder.of()); + private static final StateTag<CombiningState<Integer, int[], Integer>> + SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( + "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); + private static final StateTag<BagState<String>> STRING_BAG_ADDR = + StateTags.bag("stringBag", StringUtf8Coder.of()); + private static final StateTag<SetState<String>> STRING_SET_ADDR = + StateTags.set("stringSet", StringUtf8Coder.of()); + private static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR = + StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of()); + private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); + private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); + private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); + + InMemoryStateInternals<String> underTest = InMemoryStateInternals.forKey("dummyKey"); + + @Test + public void testValue() throws Exception { + ValueState<String> value = underTest.state(NAMESPACE_1, STRING_VALUE_ADDR); + + // State instances are cached, but depend on the namespace. + assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), Matchers.sameInstance(value)); + assertThat( + underTest.state(NAMESPACE_2, STRING_VALUE_ADDR), + Matchers.not(Matchers.sameInstance(value))); + + assertThat(value.read(), Matchers.nullValue()); + value.write("hello"); + assertThat(value.read(), equalTo("hello")); + value.write("world"); + assertThat(value.read(), equalTo("world")); + + value.clear(); + assertThat(value.read(), Matchers.nullValue()); + assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), Matchers.sameInstance(value)); + } + + @Test + public void testBag() throws Exception { + BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + + // State instances are cached, but depend on the namespace. + assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_BAG_ADDR))); + assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)))); + + assertThat(value.read(), Matchers.emptyIterable()); + value.add("hello"); + assertThat(value.read(), containsInAnyOrder("hello")); + + value.add("world"); + assertThat(value.read(), containsInAnyOrder("hello", "world")); + + value.clear(); + assertThat(value.read(), Matchers.emptyIterable()); + assertThat(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), Matchers.sameInstance(value)); + } + + @Test + public void testBagIsEmpty() throws Exception { + BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + + assertThat(value.isEmpty().read(), Matchers.is(true)); + ReadableState<Boolean> readFuture = value.isEmpty(); + value.add("hello"); + assertThat(readFuture.read(), Matchers.is(false)); + + value.clear(); + assertThat(readFuture.read(), Matchers.is(true)); + } + + @Test + public void testMergeBagIntoSource() throws Exception { + BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); + + bag1.add("Hello"); + bag2.add("World"); + bag1.add("!"); + + StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1); + + // Reading the merged bag gets both the contents + assertThat(bag1.read(), containsInAnyOrder("Hello", "World", "!")); + assertThat(bag2.read(), Matchers.emptyIterable()); + } + + @Test + public void testMergeBagIntoNewNamespace() throws Exception { + BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); + BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR); + + bag1.add("Hello"); + bag2.add("World"); + bag1.add("!"); + + StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3); + + // Reading the merged bag gets both the contents + assertThat(bag3.read(), containsInAnyOrder("Hello", "World", "!")); + assertThat(bag1.read(), Matchers.emptyIterable()); + assertThat(bag2.read(), Matchers.emptyIterable()); + } + + @Test + public void testSet() throws Exception { + SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR); + + // State instances are cached, but depend on the namespace. + assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_SET_ADDR))); + assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_SET_ADDR)))); + + // empty + assertThat(value.read(), Matchers.emptyIterable()); + assertFalse(value.contains("A").read()); + + // add + value.add("A"); + value.add("B"); + value.add("A"); + assertFalse(value.addIfAbsent("B").read()); + assertThat(value.read(), containsInAnyOrder("A", "B")); + + // remove + value.remove("A"); + assertThat(value.read(), containsInAnyOrder("B")); + value.remove("C"); + assertThat(value.read(), containsInAnyOrder("B")); + + // contains + assertFalse(value.contains("A").read()); + assertTrue(value.contains("B").read()); + value.add("C"); + value.add("D"); + + // readLater + assertThat(value.readLater().read(), containsInAnyOrder("B", "C", "D")); + SetState<String> later = value.readLater(); + assertThat(later.read(), hasItems("C", "D")); + assertFalse(later.contains("A").read()); + + // clear + value.clear(); + assertThat(value.read(), Matchers.emptyIterable()); + assertThat(underTest.state(NAMESPACE_1, STRING_SET_ADDR), Matchers.sameInstance(value)); + + } + + @Test + public void testSetIsEmpty() throws Exception { + SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR); + + assertThat(value.isEmpty().read(), Matchers.is(true)); + ReadableState<Boolean> readFuture = value.isEmpty(); + value.add("hello"); + assertThat(readFuture.read(), Matchers.is(false)); + + value.clear(); + assertThat(readFuture.read(), Matchers.is(true)); + } + + @Test + public void testMergeSetIntoSource() throws Exception { + SetState<String> set1 = underTest.state(NAMESPACE_1, STRING_SET_ADDR); + SetState<String> set2 = underTest.state(NAMESPACE_2, STRING_SET_ADDR); + + set1.add("Hello"); + set2.add("Hello"); + set2.add("World"); + set1.add("!"); + + StateMerging.mergeSets(Arrays.asList(set1, set2), set1); + + // Reading the merged set gets both the contents + assertThat(set1.read(), containsInAnyOrder("Hello", "World", "!")); + assertThat(set2.read(), Matchers.emptyIterable()); } - /** - * A specific test of InMemoryStateInternals. - */ - @RunWith(JUnit4.class) - public static class OtherTests { - - private static final StateNamespace NAMESPACE = new StateNamespaceForTest("ns"); - - private static final StateTag<ValueState<String>> STRING_VALUE_ADDR = - StateTags.value("stringValue", StringUtf8Coder.of()); - private static final StateTag<CombiningState<Integer, int[], Integer>> - SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( - "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); - private static final StateTag<BagState<String>> STRING_BAG_ADDR = - StateTags.bag("stringBag", StringUtf8Coder.of()); - private static final StateTag<SetState<String>> STRING_SET_ADDR = - StateTags.set("stringSet", StringUtf8Coder.of()); - private static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR = - StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of()); - private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); - private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); - private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); - - StateInternals underTest = new InMemoryStateInternals<>("dummyKey"); - - @Test - public void testSameInstance() { - assertSameInstance(STRING_VALUE_ADDR); - assertSameInstance(SUM_INTEGER_ADDR); - assertSameInstance(STRING_BAG_ADDR); - assertSameInstance(STRING_SET_ADDR); - assertSameInstance(STRING_MAP_ADDR); - assertSameInstance(WATERMARK_EARLIEST_ADDR); + @Test + public void testMergeSetIntoNewNamespace() throws Exception { + SetState<String> set1 = underTest.state(NAMESPACE_1, STRING_SET_ADDR); + SetState<String> set2 = underTest.state(NAMESPACE_2, STRING_SET_ADDR); + SetState<String> set3 = underTest.state(NAMESPACE_3, STRING_SET_ADDR); + + set1.add("Hello"); + set2.add("Hello"); + set2.add("World"); + set1.add("!"); + + StateMerging.mergeSets(Arrays.asList(set1, set2, set3), set3); + + // Reading the merged set gets both the contents + assertThat(set3.read(), containsInAnyOrder("Hello", "World", "!")); + assertThat(set1.read(), Matchers.emptyIterable()); + assertThat(set2.read(), Matchers.emptyIterable()); + } + + // for testMap + private static class MapEntry<K, V> implements Map.Entry<K, V> { + private K key; + private V value; + + private MapEntry(K key, V value) { + this.key = key; + this.value = value; + } + + static <K, V> Map.Entry<K, V> of(K k, V v) { + return new MapEntry<>(k, v); + } + + public final K getKey() { + return key; + } + public final V getValue() { + return value; + } + + public final String toString() { + return key + "=" + value; + } + + public final int hashCode() { + return Objects.hashCode(key) ^ Objects.hashCode(value); + } + + public final V setValue(V newValue) { + V oldValue = value; + value = newValue; + return oldValue; } - private <T extends State> void assertSameInstance(StateTag<T> address) { - assertThat(underTest.state(NAMESPACE, address), - Matchers.sameInstance(underTest.state(NAMESPACE, address))); + public final boolean equals(Object o) { + if (o == this) { + return true; + } + if (o instanceof Map.Entry) { + Map.Entry<?, ?> e = (Map.Entry<?, ?>) o; + if (Objects.equals(key, e.getKey()) + && Objects.equals(value, e.getValue())) { + return true; + } + } + return false; } } + @Test + public void testMap() throws Exception { + MapState<String, Integer> value = underTest.state(NAMESPACE_1, STRING_MAP_ADDR); + + // State instances are cached, but depend on the namespace. + assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_MAP_ADDR))); + assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_MAP_ADDR)))); + + // put + assertThat(value.entries().read(), Matchers.emptyIterable()); + value.put("A", 1); + value.put("B", 2); + value.put("A", 11); + assertThat(value.putIfAbsent("B", 22).read(), equalTo(2)); + assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("A", 11), + MapEntry.of("B", 2))); + + // remove + value.remove("A"); + assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2))); + value.remove("C"); + assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2))); + + // get + assertNull(value.get("A").read()); + assertThat(value.get("B").read(), equalTo(2)); + value.put("C", 3); + value.put("D", 4); + assertThat(value.get("C").read(), equalTo(3)); + + // iterate + value.put("E", 5); + value.remove("C"); + assertThat(value.keys().read(), containsInAnyOrder("B", "D", "E")); + assertThat(value.values().read(), containsInAnyOrder(2, 4, 5)); + assertThat( + value.entries().read(), + containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5))); + + // readLater + assertThat(value.get("B").readLater().read(), equalTo(2)); + assertNull(value.get("A").readLater().read()); + assertThat( + value.entries().readLater().read(), + containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5))); + + // clear + value.clear(); + assertThat(value.entries().read(), Matchers.emptyIterable()); + assertThat(underTest.state(NAMESPACE_1, STRING_MAP_ADDR), Matchers.sameInstance(value)); + } + + @Test + public void testCombiningValue() throws Exception { + GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR))); + + assertThat(value.read(), equalTo(0)); + value.add(2); + assertThat(value.read(), equalTo(2)); + + value.add(3); + assertThat(value.read(), equalTo(5)); + + value.clear(); + assertThat(value.read(), equalTo(0)); + assertThat(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), Matchers.sameInstance(value)); + } + + @Test + public void testCombiningIsEmpty() throws Exception { + GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + + assertThat(value.isEmpty().read(), Matchers.is(true)); + ReadableState<Boolean> readFuture = value.isEmpty(); + value.add(5); + assertThat(readFuture.read(), Matchers.is(false)); + + value.clear(); + assertThat(readFuture.read(), Matchers.is(true)); + } + + @Test + public void testMergeCombiningValueIntoSource() throws Exception { + CombiningState<Integer, int[], Integer> value1 = + underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + CombiningState<Integer, int[], Integer> value2 = + underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); + + value1.add(5); + value2.add(10); + value1.add(6); + + assertThat(value1.read(), equalTo(11)); + assertThat(value2.read(), equalTo(10)); + + // Merging clears the old values and updates the result value. + StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1); + + assertThat(value1.read(), equalTo(21)); + assertThat(value2.read(), equalTo(0)); + } + + @Test + public void testMergeCombiningValueIntoNewNamespace() throws Exception { + CombiningState<Integer, int[], Integer> value1 = + underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + CombiningState<Integer, int[], Integer> value2 = + underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); + CombiningState<Integer, int[], Integer> value3 = + underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR); + + value1.add(5); + value2.add(10); + value1.add(6); + + StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3); + + // Merging clears the old values and updates the result value. + assertThat(value1.read(), equalTo(0)); + assertThat(value2.read(), equalTo(0)); + assertThat(value3.read(), equalTo(21)); + } + + @Test + public void testWatermarkEarliestState() throws Exception { + WatermarkHoldState value = + underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR))); + + assertThat(value.read(), Matchers.nullValue()); + value.add(new Instant(2000)); + assertThat(value.read(), equalTo(new Instant(2000))); + + value.add(new Instant(3000)); + assertThat(value.read(), equalTo(new Instant(2000))); + + value.add(new Instant(1000)); + assertThat(value.read(), equalTo(new Instant(1000))); + + value.clear(); + assertThat(value.read(), equalTo(null)); + assertThat(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), Matchers.sameInstance(value)); + } + + @Test + public void testWatermarkLatestState() throws Exception { + WatermarkHoldState value = + underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR))); + + assertThat(value.read(), Matchers.nullValue()); + value.add(new Instant(2000)); + assertThat(value.read(), equalTo(new Instant(2000))); + + value.add(new Instant(3000)); + assertThat(value.read(), equalTo(new Instant(3000))); + + value.add(new Instant(1000)); + assertThat(value.read(), equalTo(new Instant(3000))); + + value.clear(); + assertThat(value.read(), equalTo(null)); + assertThat(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), Matchers.sameInstance(value)); + } + + @Test + public void testWatermarkEndOfWindowState() throws Exception { + WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR))); + + assertThat(value.read(), Matchers.nullValue()); + value.add(new Instant(2000)); + assertThat(value.read(), equalTo(new Instant(2000))); + + value.clear(); + assertThat(value.read(), equalTo(null)); + assertThat(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), Matchers.sameInstance(value)); + } + + @Test + public void testWatermarkStateIsEmpty() throws Exception { + WatermarkHoldState value = + underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); + + assertThat(value.isEmpty().read(), Matchers.is(true)); + ReadableState<Boolean> readFuture = value.isEmpty(); + value.add(new Instant(1000)); + assertThat(readFuture.read(), Matchers.is(false)); + + value.clear(); + assertThat(readFuture.read(), Matchers.is(true)); + } + + @Test + public void testMergeEarliestWatermarkIntoSource() throws Exception { + WatermarkHoldState value1 = + underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); + WatermarkHoldState value2 = + underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR); + + value1.add(new Instant(3000)); + value2.add(new Instant(5000)); + value1.add(new Instant(4000)); + value2.add(new Instant(2000)); + + // Merging clears the old values and updates the merged value. + StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1); + + assertThat(value1.read(), equalTo(new Instant(2000))); + assertThat(value2.read(), equalTo(null)); + } + + @Test + public void testMergeLatestWatermarkIntoSource() throws Exception { + WatermarkHoldState value1 = + underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); + WatermarkHoldState value2 = + underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR); + WatermarkHoldState value3 = + underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR); + + value1.add(new Instant(3000)); + value2.add(new Instant(5000)); + value1.add(new Instant(4000)); + value2.add(new Instant(2000)); + + // Merging clears the old values and updates the result value. + StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1); + + // Merging clears the old values and updates the result value. + assertThat(value3.read(), equalTo(new Instant(5000))); + assertThat(value1.read(), equalTo(null)); + assertThat(value2.read(), equalTo(null)); + } }
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java index 959909e..a2f6acc 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java @@ -17,22 +17,18 @@ */ package org.apache.beam.runners.core; -import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume; -import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThan; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import java.util.Collection; import java.util.concurrent.Executors; -import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange; import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -46,27 +42,19 @@ import org.junit.Test; /** Tests for {@link OutputAndTimeBoundedSplittableProcessElementInvoker}. */ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest { private static class SomeFn extends DoFn<Integer, String> { - private final int numOutputsPerProcessCall; private final Duration sleepBeforeEachOutput; - private SomeFn(int numOutputsPerProcessCall, Duration sleepBeforeEachOutput) { - this.numOutputsPerProcessCall = numOutputsPerProcessCall; + private SomeFn(Duration sleepBeforeEachOutput) { this.sleepBeforeEachOutput = sleepBeforeEachOutput; } @ProcessElement - public ProcessContinuation process(ProcessContext context, OffsetRangeTracker tracker) + public void process(ProcessContext context, OffsetRangeTracker tracker) throws Exception { - for (long i = tracker.currentRestriction().getFrom(), numIterations = 1; - tracker.tryClaim(i); - ++i, ++numIterations) { + for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) { Thread.sleep(sleepBeforeEachOutput.getMillis()); context.output("" + i); - if (numIterations == numOutputsPerProcessCall) { - return resume(); - } } - return stop(); } @GetInitialRestriction @@ -76,8 +64,8 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest { } private SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result - runTest(int totalNumOutputs, int numOutputsPerProcessCall, Duration sleepPerElement) { - SomeFn fn = new SomeFn(numOutputsPerProcessCall, sleepPerElement); + runTest(int count, Duration sleepPerElement) { + SomeFn fn = new SomeFn(sleepPerElement); SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker> invoker = new OutputAndTimeBoundedSplittableProcessElementInvoker<>( fn, @@ -105,15 +93,14 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest { return invoker.invokeProcessElement( DoFnInvokers.invokerFor(fn), - WindowedValue.of(totalNumOutputs, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING), - new OffsetRangeTracker(new OffsetRange(0, totalNumOutputs))); + WindowedValue.of(count, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING), + new OffsetRangeTracker(new OffsetRange(0, count))); } @Test public void testInvokeProcessElementOutputBounded() throws Exception { SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res = - runTest(10000, Integer.MAX_VALUE, Duration.ZERO); - assertFalse(res.getContinuation().shouldResume()); + runTest(10000, Duration.ZERO); OffsetRange residualRange = res.getResidualRestriction(); // Should process the first 100 elements. assertEquals(1000, residualRange.getFrom()); @@ -123,8 +110,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest { @Test public void testInvokeProcessElementTimeBounded() throws Exception { SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res = - runTest(10000, Integer.MAX_VALUE, Duration.millis(100)); - assertFalse(res.getContinuation().shouldResume()); + runTest(10000, Duration.millis(100)); OffsetRange residualRange = res.getResidualRestriction(); // Should process ideally around 30 elements - but due to timing flakiness, we can't enforce // that precisely. Just test that it's not egregiously off. @@ -134,18 +120,9 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest { } @Test - public void testInvokeProcessElementVoluntaryReturnStop() throws Exception { + public void testInvokeProcessElementVoluntaryReturn() throws Exception { SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res = - runTest(5, Integer.MAX_VALUE, Duration.millis(100)); - assertFalse(res.getContinuation().shouldResume()); + runTest(5, Duration.millis(100)); assertNull(res.getResidualRestriction()); } - - @Test - public void testInvokeProcessElementVoluntaryReturnResume() throws Exception { - SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res = - runTest(10, 5, Duration.millis(100)); - assertTrue(res.getContinuation().shouldResume()); - assertEquals(new OffsetRange(5, 10), res.getResidualRestriction()); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 4f13af1..9e71300 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -55,7 +55,6 @@ import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; @@ -68,7 +67,6 @@ import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; @@ -142,40 +140,7 @@ public class ReduceFnRunnerTest { } }) .when(mockTrigger).onFire(anyTriggerContext()); - } - - /** - * Tests that a processing time timer does not cause window GC. - */ - @Test - public void testProcessingTimeTimerDoesNotGc() throws Exception { - WindowingStrategy<?, IntervalWindow> strategy = - WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100))) - .withTimestampCombiner(TimestampCombiner.EARLIEST) - .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) - .withAllowedLateness(Duration.ZERO) - .withTrigger( - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10)))); - - ReduceFnTester<Integer, Integer, IntervalWindow> tester = - ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); - - tester.advanceProcessingTime(new Instant(5000)); - injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100 - injectElement(tester, 5); - - tester.advanceProcessingTime(new Instant(10000)); - - tester.assertHasOnlyGlobalAndStateFor( - new IntervalWindow(new Instant(0), new Instant(100))); - - assertThat( - tester.extractOutput(), - contains( - isSingleWindowedValue( - equalTo(7), 2, 0, 100, PaneInfo.createPane(true, false, Timing.EARLY, 0, 0)))); - } + } @Test public void testOnElementBufferingDiscarding() throws Exception { @@ -246,52 +211,6 @@ public class ReduceFnRunnerTest { tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow); } - /** - * Tests that with the default trigger we will not produce two ON_TIME panes, even - * if there are two outputs that are both candidates. - */ - @Test - public void testOnlyOneOnTimePane() throws Exception { - WindowingStrategy<?, IntervalWindow> strategy = - WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) - .withTrigger(DefaultTrigger.of()) - .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) - .withAllowedLateness(Duration.millis(100)); - - ReduceFnTester<Integer, Integer, IntervalWindow> tester = - ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); - - tester.advanceInputWatermark(new Instant(0)); - - int value1 = 1; - int value2 = 3; - - // A single element that should be in the ON_TIME output - tester.injectElements( - TimestampedValue.of(value1, new Instant(1))); - - // Should fire ON_TIME - tester.advanceInputWatermark(new Instant(10)); - - // The DefaultTrigger should cause output labeled LATE, even though it does not have to be - // labeled as such. - tester.injectElements( - TimestampedValue.of(value2, new Instant(3))); - - List<WindowedValue<Integer>> output = tester.extractOutput(); - assertEquals(2, output.size()); - - assertThat(output.get(0), WindowMatchers.isWindowedValue(equalTo(value1))); - assertThat(output.get(1), WindowMatchers.isWindowedValue(equalTo(value1 + value2))); - - assertThat( - output.get(0), - WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0))); - assertThat( - output.get(1), - WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.LATE, 1, 1))); - } - @Test public void testOnElementCombiningDiscarding() throws Exception { // Test basic execution of a trigger using a non-combining window set and discarding mode. @@ -331,76 +250,6 @@ public class ReduceFnRunnerTest { } /** - * Tests that when a processing time timer comes in after a window is expired - * it is just ignored. - */ - @Test - public void testLateProcessingTimeTimer() throws Exception { - WindowingStrategy<?, IntervalWindow> strategy = - WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100))) - .withTimestampCombiner(TimestampCombiner.EARLIEST) - .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) - .withAllowedLateness(Duration.ZERO) - .withTrigger( - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10)))); - - ReduceFnTester<Integer, Integer, IntervalWindow> tester = - ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); - - tester.advanceProcessingTime(new Instant(5000)); - injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100 - injectElement(tester, 5); - - // After this advancement, the window is expired and only the GC process - // should be allowed to touch it - tester.advanceInputWatermarkNoTimers(new Instant(100)); - - // This should not output - tester.advanceProcessingTime(new Instant(6000)); - - assertThat(tester.extractOutput(), emptyIterable()); - } - - /** - * Tests that when a processing time timer comes in after a window is expired - * but in the same bundle it does not cause a spurious output. - */ - @Test - public void testCombiningAccumulatingProcessingTime() throws Exception { - WindowingStrategy<?, IntervalWindow> strategy = - WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100))) - .withTimestampCombiner(TimestampCombiner.EARLIEST) - .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) - .withAllowedLateness(Duration.ZERO) - .withTrigger( - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10)))); - - ReduceFnTester<Integer, Integer, IntervalWindow> tester = - ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); - - tester.advanceProcessingTime(new Instant(5000)); - injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100 - injectElement(tester, 5); - - tester.advanceInputWatermarkNoTimers(new Instant(100)); - tester.advanceProcessingTimeNoTimers(new Instant(5010)); - - // Fires the GC/EOW timer at the same time as the processing time timer. - tester.fireTimers( - new IntervalWindow(new Instant(0), new Instant(100)), - TimestampedValue.of(TimeDomain.EVENT_TIME, new Instant(100)), - TimestampedValue.of(TimeDomain.PROCESSING_TIME, new Instant(5010))); - - assertThat( - tester.extractOutput(), - contains( - isSingleWindowedValue( - equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)))); - } - - /** * Tests that the garbage collection time for a fixed window does not overflow the end of time. */ @Test @@ -467,67 +316,6 @@ public class ReduceFnRunnerTest { assertThat(tester.extractOutput(), contains(isWindowedValue(equalTo(55)))); } - /** - * Tests that when a processing time timers comes in after a window is expired - * and GC'd it does not cause a spurious output. - */ - @Test - public void testCombiningAccumulatingProcessingTimeSeparateBundles() throws Exception { - WindowingStrategy<?, IntervalWindow> strategy = - WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100))) - .withTimestampCombiner(TimestampCombiner.EARLIEST) - .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) - .withAllowedLateness(Duration.ZERO) - .withTrigger( - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10)))); - - ReduceFnTester<Integer, Integer, IntervalWindow> tester = - ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); - - tester.advanceProcessingTime(new Instant(5000)); - injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100 - injectElement(tester, 5); - - tester.advanceInputWatermark(new Instant(100)); - tester.advanceProcessingTime(new Instant(5011)); - - assertThat( - tester.extractOutput(), - contains( - isSingleWindowedValue( - equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)))); - } - - /** - * Tests that if end-of-window and GC timers come in together, that the pane is correctly - * marked as final. - */ - @Test - public void testCombiningAccumulatingEventTime() throws Exception { - WindowingStrategy<?, IntervalWindow> strategy = - WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100))) - .withTimestampCombiner(TimestampCombiner.EARLIEST) - .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) - .withAllowedLateness(Duration.millis(1)) - .withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow())); - - ReduceFnTester<Integer, Integer, IntervalWindow> tester = - ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); - - injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100 - injectElement(tester, 5); - - tester.advanceInputWatermark(new Instant(1000)); - - assertThat( - tester.extractOutput(), - contains( - isSingleWindowedValue( - equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)))); - } - - @Test public void testOnElementCombiningAccumulating() throws Exception { // Test basic execution of a trigger using a non-combining window set and accumulating mode. @@ -1501,166 +1289,6 @@ public class ReduceFnRunnerTest { } /** - * Test that it won't fire an empty on-time pane when OnTimeBehavior is FIRE_IF_NON_EMPTY. - */ - @Test - public void testEmptyOnTimeWithOnTimeBehaviorFireIfNonEmpty() throws Exception { - - WindowingStrategy<?, IntervalWindow> strategy = - WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) - .withTimestampCombiner(TimestampCombiner.EARLIEST) - .withTrigger( - AfterEach.<IntervalWindow>inOrder( - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(new Duration(5))) - .orFinally(AfterWatermark.pastEndOfWindow()), - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(new Duration(25))))) - .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) - .withAllowedLateness(Duration.millis(100)) - .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS) - .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY); - - ReduceFnTester<Integer, Integer, IntervalWindow> tester = - ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); - - tester.advanceInputWatermark(new Instant(0)); - tester.advanceProcessingTime(new Instant(0)); - - // Processing time timer for 5 - tester.injectElements( - TimestampedValue.of(1, new Instant(1)), - TimestampedValue.of(1, new Instant(3)), - TimestampedValue.of(1, new Instant(7)), - TimestampedValue.of(1, new Instant(5))); - - // Should fire early pane - tester.advanceProcessingTime(new Instant(6)); - - // Should not fire empty on time pane - tester.advanceInputWatermark(new Instant(11)); - - // Should fire final GC pane - tester.advanceInputWatermark(new Instant(10 + 100)); - List<WindowedValue<Integer>> output = tester.extractOutput(); - assertEquals(2, output.size()); - - assertThat(output.get(0), WindowMatchers.isSingleWindowedValue(4, 1, 0, 10)); - assertThat(output.get(1), WindowMatchers.isSingleWindowedValue(4, 9, 0, 10)); - - assertThat( - output.get(0), - WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1))); - assertThat( - output.get(1), - WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 1, 0))); - } - - /** - * Test that it fires an empty on-time isFinished pane when OnTimeBehavior is FIRE_ALWAYS - * and ClosingBehavior is FIRE_IF_NON_EMPTY. - * - * <p>This is a test just for backward compatibility. - */ - @Test - public void testEmptyOnTimeWithOnTimeBehaviorBackwardCompatibility() throws Exception { - - WindowingStrategy<?, IntervalWindow> strategy = - WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) - .withTimestampCombiner(TimestampCombiner.EARLIEST) - .withTrigger(AfterWatermark.pastEndOfWindow() - .withEarlyFirings(AfterPane.elementCountAtLeast(1))) - .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) - .withAllowedLateness(Duration.millis(0)) - .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY); - - ReduceFnTester<Integer, Integer, IntervalWindow> tester = - ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); - - tester.advanceInputWatermark(new Instant(0)); - tester.advanceProcessingTime(new Instant(0)); - - tester.injectElements( - TimestampedValue.of(1, new Instant(1))); - - // Should fire empty on time isFinished pane - tester.advanceInputWatermark(new Instant(11)); - - List<WindowedValue<Integer>> output = tester.extractOutput(); - assertEquals(2, output.size()); - - assertThat( - output.get(0), - WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1))); - assertThat( - output.get(1), - WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.ON_TIME, 1, 0))); - } - - /** - * Test that it won't fire an empty on-time pane when OnTimeBehavior is FIRE_IF_NON_EMPTY - * and when receiving late data. - */ - @Test - public void testEmptyOnTimeWithOnTimeBehaviorFireIfNonEmptyAndLateData() throws Exception { - - WindowingStrategy<?, IntervalWindow> strategy = - WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) - .withTimestampCombiner(TimestampCombiner.EARLIEST) - .withTrigger( - AfterEach.<IntervalWindow>inOrder( - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(new Duration(5))) - .orFinally(AfterWatermark.pastEndOfWindow()), - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(new Duration(25))))) - .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) - .withAllowedLateness(Duration.millis(100)) - .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY); - - ReduceFnTester<Integer, Integer, IntervalWindow> tester = - ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); - - tester.advanceInputWatermark(new Instant(0)); - tester.advanceProcessingTime(new Instant(0)); - - // Processing time timer for 5 - tester.injectElements( - TimestampedValue.of(1, new Instant(1)), - TimestampedValue.of(1, new Instant(3)), - TimestampedValue.of(1, new Instant(7)), - TimestampedValue.of(1, new Instant(5))); - - // Should fire early pane - tester.advanceProcessingTime(new Instant(6)); - - // Should not fire empty on time pane - tester.advanceInputWatermark(new Instant(11)); - - // Processing late data, and should fire late pane - tester.injectElements( - TimestampedValue.of(1, new Instant(9))); - tester.advanceProcessingTime(new Instant(6 + 25 + 1)); - - List<WindowedValue<Integer>> output = tester.extractOutput(); - assertEquals(2, output.size()); - - assertThat(output.get(0), WindowMatchers.isSingleWindowedValue(4, 1, 0, 10)); - assertThat(output.get(1), WindowMatchers.isSingleWindowedValue(5, 9, 0, 10)); - - assertThat( - output.get(0), - WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1))); - assertThat( - output.get(1), - WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.LATE, 1, 0))); - } - - /** * Tests for processing time firings after the watermark passes the end of the window. * Specifically, verify the proper triggerings and pane-info of a typical speculative/on-time/late * when the on-time pane is non-empty. http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 7ca96b9..7f83eae 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -318,19 +318,6 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { } @SafeVarargs - public final void assertHasOnlyGlobalAndStateFor(W... expectedWindows) { - assertHasOnlyGlobalAndAllowedTags( - ImmutableSet.copyOf(expectedWindows), - ImmutableSet.<StateTag<?>>of( - ((SystemReduceFn<?, ?, ?, ?, ?>) reduceFn).getBufferTag(), - TriggerStateMachineRunner.FINISHED_BITS_TAG, - PaneInfoTracker.PANE_INFO_TAG, - WatermarkHold.watermarkHoldTagForTimestampCombiner( - objectStrategy.getTimestampCombiner()), - WatermarkHold.EXTRA_HOLD_TAG)); - } - - @SafeVarargs public final void assertHasOnlyGlobalAndFinishedSetsAndPaneInfoFor(W... expectedWindows) { assertHasOnlyGlobalAndAllowedTags( ImmutableSet.copyOf(expectedWindows), @@ -433,10 +420,6 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { return result; } - public void advanceInputWatermarkNoTimers(Instant newInputWatermark) throws Exception { - timerInternals.advanceInputWatermark(newInputWatermark); - } - /** * Advance the input watermark to the specified time, firing any timers that should * fire. Then advance the output watermark as far as possible. @@ -468,10 +451,6 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { runner.persist(); } - public void advanceProcessingTimeNoTimers(Instant newProcessingTime) throws Exception { - timerInternals.advanceProcessingTime(newProcessingTime); - } - /** * If {@link #autoAdvanceOutputWatermark} is {@literal false}, advance the output watermark * to the given value. Otherwise throw. @@ -529,8 +508,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { for (TimestampedValue<InputT> value : values) { WindowTracing.trace("TriggerTester.injectElements: {}", value); } - - Iterable<WindowedValue<InputT>> inputs = + ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner(); + runner.processElements( Iterables.transform( Arrays.asList(values), new Function<TimestampedValue<InputT>, WindowedValue<InputT>>() { @@ -548,12 +527,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { throw new RuntimeException(e); } } - }); - - ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner(); - runner.processElements( - new LateDataDroppingDoFnRunner.LateDataFilter(objectStrategy, timerInternals) - .filter(KEY, inputs)); + })); // Persist after each bundle. runner.persist(); @@ -561,27 +535,13 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exception { ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner(); - ArrayList<TimerData> timers = new ArrayList<>(1); + ArrayList timers = new ArrayList(1); timers.add( TimerData.of(StateNamespaces.window(windowFn.windowCoder(), window), timestamp, domain)); runner.onTimers(timers); runner.persist(); } - public void fireTimers(W window, TimestampedValue<TimeDomain>... timers) throws Exception { - ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner(); - ArrayList<TimerData> timerData = new ArrayList<>(timers.length); - for (TimestampedValue<TimeDomain> timer : timers) { - timerData.add( - TimerData.of( - StateNamespaces.window(windowFn.windowCoder(), window), - timer.getTimestamp(), - timer.getValue())); - } - runner.onTimers(timerData); - runner.persist(); - } - /** * Convey the simulated state and implement {@link #outputWindowedValue} to capture all output * elements. http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java index 7449af3..d242431 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java @@ -17,9 +17,6 @@ */ package org.apache.beam.runners.core; -import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume; -import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasItems; @@ -38,15 +35,16 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.Executors; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessFn; +import org.apache.beam.runners.core.construction.ElementAndRestriction; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange; import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -55,7 +53,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; @@ -114,7 +111,9 @@ public class SplittableParDoProcessFnTest { private static class ProcessFnTester< InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> implements AutoCloseable { - private final DoFnTester<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> tester; + private final DoFnTester< + KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> + tester; private Instant currentProcessingTime; private InMemoryTimerInternals timerInternals; @@ -195,13 +194,14 @@ public class SplittableParDoProcessFnTest { void startElement(InputT element, RestrictionT restriction) throws Exception { startElement( WindowedValue.of( - KV.of(element, restriction), + ElementAndRestriction.of(element, restriction), currentProcessingTime, GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING)); } - void startElement(WindowedValue<KV<InputT, RestrictionT>> windowedValue) throws Exception { + void startElement(WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue) + throws Exception { tester.processElement( KeyedWorkItems.elementsWorkItem("key", Collections.singletonList(windowedValue))); } @@ -223,7 +223,8 @@ public class SplittableParDoProcessFnTest { return false; } tester.processElement( - KeyedWorkItems.<String, KV<InputT, RestrictionT>>timersWorkItem("key", timers)); + KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem( + "key", timers)); return true; } @@ -308,7 +309,7 @@ public class SplittableParDoProcessFnTest { MAX_BUNDLE_DURATION); tester.startElement( WindowedValue.of( - KV.of(42, new SomeRestriction()), + ElementAndRestriction.of(42, new SomeRestriction()), base, Collections.singletonList(w), PaneInfo.ON_TIME_AND_ONLY_FIRING)); @@ -368,71 +369,16 @@ public class SplittableParDoProcessFnTest { assertEquals(null, tester.getWatermarkHold()); } - /** A simple splittable {@link DoFn} that outputs the given element every 5 seconds forever. */ - private static class SelfInitiatedResumeFn extends DoFn<Integer, String> { - @ProcessElement - public ProcessContinuation process(ProcessContext c, SomeRestrictionTracker tracker) { - c.output(c.element().toString()); - return resume().withResumeDelay(Duration.standardSeconds(5)); - } - - @GetInitialRestriction - public SomeRestriction getInitialRestriction(Integer elem) { - return new SomeRestriction(); - } - } - - @Test - public void testResumeSetsTimer() throws Exception { - DoFn<Integer, String> fn = new SelfInitiatedResumeFn(); - Instant base = Instant.now(); - ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester = - new ProcessFnTester<>( - base, - fn, - BigEndianIntegerCoder.of(), - SerializableCoder.of(SomeRestriction.class), - MAX_OUTPUTS_PER_BUNDLE, - MAX_BUNDLE_DURATION); - - tester.startElement(42, new SomeRestriction()); - assertThat(tester.takeOutputElements(), contains("42")); - - // Should resume after 5 seconds: advancing by 3 seconds should have no effect. - assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3))); - assertTrue(tester.takeOutputElements().isEmpty()); - - // 6 seconds should be enough should invoke the fn again. - assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3))); - assertThat(tester.takeOutputElements(), contains("42")); - - // Should again resume after 5 seconds: advancing by 3 seconds should again have no effect. - assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3))); - assertTrue(tester.takeOutputElements().isEmpty()); - - // 6 seconds should again be enough. - assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3))); - assertThat(tester.takeOutputElements(), contains("42")); - } - - /** A splittable {@link DoFn} that generates the sequence [init, init + total). */ + /** + * A splittable {@link DoFn} that generates the sequence [init, init + total). + */ private static class CounterFn extends DoFn<Integer, String> { - private final int numOutputsPerCall; - - public CounterFn(int numOutputsPerCall) { - this.numOutputsPerCall = numOutputsPerCall; - } - @ProcessElement - public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) { - for (long i = tracker.currentRestriction().getFrom(), numIterations = 0; - tracker.tryClaim(i); ++i, ++numIterations) { + public void process(ProcessContext c, OffsetRangeTracker tracker) { + for (long i = tracker.currentRestriction().getFrom(); + tracker.tryClaim(i); ++i) { c.output(String.valueOf(c.element() + i)); - if (numIterations == numOutputsPerCall) { - return resume(); - } } - return stop(); } @GetInitialRestriction @@ -441,35 +387,10 @@ public class SplittableParDoProcessFnTest { } } - public void testResumeCarriesOverState() throws Exception { - DoFn<Integer, String> fn = new CounterFn(1); - Instant base = Instant.now(); - ProcessFnTester<Integer, String, OffsetRange, OffsetRangeTracker> tester = - new ProcessFnTester<>( - base, - fn, - BigEndianIntegerCoder.of(), - SerializableCoder.of(OffsetRange.class), - MAX_OUTPUTS_PER_BUNDLE, - MAX_BUNDLE_DURATION); - - tester.startElement(42, new OffsetRange(0, 3)); - assertThat(tester.takeOutputElements(), contains("42")); - assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1))); - assertThat(tester.takeOutputElements(), contains("43")); - assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1))); - assertThat(tester.takeOutputElements(), contains("44")); - assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1))); - // After outputting all 3 items, should not output anything more. - assertEquals(0, tester.takeOutputElements().size()); - // Should also not ask to resume. - assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(1))); - } - @Test public void testCheckpointsAfterNumOutputs() throws Exception { int max = 100; - DoFn<Integer, String> fn = new CounterFn(Integer.MAX_VALUE); + DoFn<Integer, String> fn = new CounterFn(); Instant base = Instant.now(); int baseIndex = 42; @@ -511,7 +432,7 @@ public class SplittableParDoProcessFnTest { // But bound bundle duration - the bundle should terminate. Duration maxBundleDuration = Duration.standardSeconds(1); // Create an fn that attempts to 2x output more than checkpointing allows. - DoFn<Integer, String> fn = new CounterFn(Integer.MAX_VALUE); + DoFn<Integer, String> fn = new CounterFn(); Instant base = Instant.now(); int baseIndex = 42; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java deleted file mode 100644 index ae07fe6..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java +++ /dev/null @@ -1,613 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasItems; -import static org.hamcrest.Matchers.not; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -import com.google.common.collect.Iterables; -import java.util.Arrays; -import java.util.Map; -import java.util.Objects; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.state.BagState; -import org.apache.beam.sdk.state.CombiningState; -import org.apache.beam.sdk.state.GroupingState; -import org.apache.beam.sdk.state.MapState; -import org.apache.beam.sdk.state.ReadableState; -import org.apache.beam.sdk.state.SetState; -import org.apache.beam.sdk.state.ValueState; -import org.apache.beam.sdk.state.WatermarkHoldState; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.hamcrest.Matchers; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Test; - -/** - * Tests for {@link StateInternals}. - */ -public abstract class StateInternalsTest { - - private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10)); - private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1"); - private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); - private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3"); - - private static final StateTag<ValueState<String>> STRING_VALUE_ADDR = - StateTags.value("stringValue", StringUtf8Coder.of()); - private static final StateTag<CombiningState<Integer, int[], Integer>> - SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( - "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); - private static final StateTag<BagState<String>> STRING_BAG_ADDR = - StateTags.bag("stringBag", StringUtf8Coder.of()); - private static final StateTag<SetState<String>> STRING_SET_ADDR = - StateTags.set("stringSet", StringUtf8Coder.of()); - private static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR = - StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of()); - private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); - private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); - private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); - - private StateInternals underTest; - - @Before - public void setUp() { - this.underTest = createStateInternals(); - } - - protected abstract StateInternals createStateInternals(); - - @Test - public void testValue() throws Exception { - ValueState<String> value = underTest.state(NAMESPACE_1, STRING_VALUE_ADDR); - - // State instances are cached, but depend on the namespace. - assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), equalTo(value)); - assertThat( - underTest.state(NAMESPACE_2, STRING_VALUE_ADDR), - Matchers.not(equalTo(value))); - - assertThat(value.read(), Matchers.nullValue()); - value.write("hello"); - assertThat(value.read(), equalTo("hello")); - value.write("world"); - assertThat(value.read(), equalTo("world")); - - value.clear(); - assertThat(value.read(), Matchers.nullValue()); - assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), equalTo(value)); - } - - @Test - public void testBag() throws Exception { - BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - - // State instances are cached, but depend on the namespace. - assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_BAG_ADDR))); - assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)))); - - assertThat(value.read(), Matchers.emptyIterable()); - value.add("hello"); - assertThat(value.read(), containsInAnyOrder("hello")); - - value.add("world"); - assertThat(value.read(), containsInAnyOrder("hello", "world")); - - value.clear(); - assertThat(value.read(), Matchers.emptyIterable()); - assertThat(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), equalTo(value)); - } - - @Test - public void testBagIsEmpty() throws Exception { - BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add("hello"); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeBagIntoSource() throws Exception { - BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); - - bag1.add("Hello"); - bag2.add("World"); - bag1.add("!"); - - StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1); - - // Reading the merged bag gets both the contents - assertThat(bag1.read(), containsInAnyOrder("Hello", "World", "!")); - assertThat(bag2.read(), Matchers.emptyIterable()); - } - - @Test - public void testMergeBagIntoNewNamespace() throws Exception { - BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); - BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR); - - bag1.add("Hello"); - bag2.add("World"); - bag1.add("!"); - - StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3); - - // Reading the merged bag gets both the contents - assertThat(bag3.read(), containsInAnyOrder("Hello", "World", "!")); - assertThat(bag1.read(), Matchers.emptyIterable()); - assertThat(bag2.read(), Matchers.emptyIterable()); - } - - @Test - public void testSet() throws Exception { - - SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR); - - // State instances are cached, but depend on the namespace. - assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_SET_ADDR))); - assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_SET_ADDR)))); - - // empty - assertThat(value.read(), Matchers.emptyIterable()); - assertFalse(value.contains("A").read()); - - // add - value.add("A"); - value.add("B"); - value.add("A"); - assertFalse(value.addIfAbsent("B").read()); - assertThat(value.read(), containsInAnyOrder("A", "B")); - - // remove - value.remove("A"); - assertThat(value.read(), containsInAnyOrder("B")); - value.remove("C"); - assertThat(value.read(), containsInAnyOrder("B")); - - // contains - assertFalse(value.contains("A").read()); - assertTrue(value.contains("B").read()); - value.add("C"); - value.add("D"); - - // readLater - assertThat(value.readLater().read(), containsInAnyOrder("B", "C", "D")); - SetState<String> later = value.readLater(); - assertThat(later.read(), hasItems("C", "D")); - assertFalse(later.contains("A").read()); - - // clear - value.clear(); - assertThat(value.read(), Matchers.emptyIterable()); - assertThat(underTest.state(NAMESPACE_1, STRING_SET_ADDR), equalTo(value)); - - } - - @Test - public void testSetIsEmpty() throws Exception { - - SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add("hello"); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeSetIntoSource() throws Exception { - - SetState<String> set1 = underTest.state(NAMESPACE_1, STRING_SET_ADDR); - SetState<String> set2 = underTest.state(NAMESPACE_2, STRING_SET_ADDR); - - set1.add("Hello"); - set2.add("Hello"); - set2.add("World"); - set1.add("!"); - - StateMerging.mergeSets(Arrays.asList(set1, set2), set1); - - // Reading the merged set gets both the contents - assertThat(set1.read(), containsInAnyOrder("Hello", "World", "!")); - assertThat(set2.read(), Matchers.emptyIterable()); - } - - @Test - public void testMergeSetIntoNewNamespace() throws Exception { - - SetState<String> set1 = underTest.state(NAMESPACE_1, STRING_SET_ADDR); - SetState<String> set2 = underTest.state(NAMESPACE_2, STRING_SET_ADDR); - SetState<String> set3 = underTest.state(NAMESPACE_3, STRING_SET_ADDR); - - set1.add("Hello"); - set2.add("Hello"); - set2.add("World"); - set1.add("!"); - - StateMerging.mergeSets(Arrays.asList(set1, set2, set3), set3); - - // Reading the merged set gets both the contents - assertThat(set3.read(), containsInAnyOrder("Hello", "World", "!")); - assertThat(set1.read(), Matchers.emptyIterable()); - assertThat(set2.read(), Matchers.emptyIterable()); - } - - // for testMap - private static class MapEntry<K, V> implements Map.Entry<K, V> { - private K key; - private V value; - - private MapEntry(K key, V value) { - this.key = key; - this.value = value; - } - - static <K, V> Map.Entry<K, V> of(K k, V v) { - return new MapEntry<>(k, v); - } - - public final K getKey() { - return key; - } - public final V getValue() { - return value; - } - - public final String toString() { - return key + "=" + value; - } - - public final int hashCode() { - return Objects.hashCode(key) ^ Objects.hashCode(value); - } - - public final V setValue(V newValue) { - V oldValue = value; - value = newValue; - return oldValue; - } - - public final boolean equals(Object o) { - if (o == this) { - return true; - } - if (o instanceof Map.Entry) { - Map.Entry<?, ?> e = (Map.Entry<?, ?>) o; - if (Objects.equals(key, e.getKey()) - && Objects.equals(value, e.getValue())) { - return true; - } - } - return false; - } - } - - @Test - public void testMap() throws Exception { - - MapState<String, Integer> value = underTest.state(NAMESPACE_1, STRING_MAP_ADDR); - - // State instances are cached, but depend on the namespace. - assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_MAP_ADDR))); - assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_MAP_ADDR)))); - - // put - assertThat(value.entries().read(), Matchers.emptyIterable()); - value.put("A", 1); - value.put("B", 2); - value.put("A", 11); - assertThat(value.putIfAbsent("B", 22).read(), equalTo(2)); - assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("A", 11), - MapEntry.of("B", 2))); - - // remove - value.remove("A"); - assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2))); - value.remove("C"); - assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2))); - - // get - assertNull(value.get("A").read()); - assertThat(value.get("B").read(), equalTo(2)); - value.put("C", 3); - value.put("D", 4); - assertThat(value.get("C").read(), equalTo(3)); - - // iterate - value.put("E", 5); - value.remove("C"); - assertThat(value.keys().read(), containsInAnyOrder("B", "D", "E")); - assertThat(value.values().read(), containsInAnyOrder(2, 4, 5)); - assertThat( - value.entries().read(), - containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5))); - - // readLater - assertThat(value.get("B").readLater().read(), equalTo(2)); - assertNull(value.get("A").readLater().read()); - assertThat( - value.entries().readLater().read(), - containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5))); - - // clear - value.clear(); - assertThat(value.entries().read(), Matchers.emptyIterable()); - assertThat(underTest.state(NAMESPACE_1, STRING_MAP_ADDR), equalTo(value)); - } - - @Test - public void testCombiningValue() throws Exception { - - GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR))); - - assertThat(value.read(), equalTo(0)); - value.add(2); - assertThat(value.read(), equalTo(2)); - - value.add(3); - assertThat(value.read(), equalTo(5)); - - value.clear(); - assertThat(value.read(), equalTo(0)); - assertThat(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), equalTo(value)); - } - - @Test - public void testCombiningIsEmpty() throws Exception { - GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add(5); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeCombiningValueIntoSource() throws Exception { - CombiningState<Integer, int[], Integer> value1 = - underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - CombiningState<Integer, int[], Integer> value2 = - underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); - - value1.add(5); - value2.add(10); - value1.add(6); - - assertThat(value1.read(), equalTo(11)); - assertThat(value2.read(), equalTo(10)); - - // Merging clears the old values and updates the result value. - StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1); - - assertThat(value1.read(), equalTo(21)); - assertThat(value2.read(), equalTo(0)); - } - - @Test - public void testMergeCombiningValueIntoNewNamespace() throws Exception { - CombiningState<Integer, int[], Integer> value1 = - underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - CombiningState<Integer, int[], Integer> value2 = - underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); - CombiningState<Integer, int[], Integer> value3 = - underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR); - - value1.add(5); - value2.add(10); - value1.add(6); - - StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3); - - // Merging clears the old values and updates the result value. - assertThat(value1.read(), equalTo(0)); - assertThat(value2.read(), equalTo(0)); - assertThat(value3.read(), equalTo(21)); - } - - @Test - public void testWatermarkEarliestState() throws Exception { - WatermarkHoldState value = - underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR))); - - assertThat(value.read(), Matchers.nullValue()); - value.add(new Instant(2000)); - assertThat(value.read(), equalTo(new Instant(2000))); - - value.add(new Instant(3000)); - assertThat(value.read(), equalTo(new Instant(2000))); - - value.add(new Instant(1000)); - assertThat(value.read(), equalTo(new Instant(1000))); - - value.clear(); - assertThat(value.read(), equalTo(null)); - assertThat(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), equalTo(value)); - } - - @Test - public void testWatermarkLatestState() throws Exception { - WatermarkHoldState value = - underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR))); - - assertThat(value.read(), Matchers.nullValue()); - value.add(new Instant(2000)); - assertThat(value.read(), equalTo(new Instant(2000))); - - value.add(new Instant(3000)); - assertThat(value.read(), equalTo(new Instant(3000))); - - value.add(new Instant(1000)); - assertThat(value.read(), equalTo(new Instant(3000))); - - value.clear(); - assertThat(value.read(), equalTo(null)); - assertThat(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), equalTo(value)); - } - - @Test - public void testWatermarkEndOfWindowState() throws Exception { - WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR))); - - assertThat(value.read(), Matchers.nullValue()); - value.add(new Instant(2000)); - assertThat(value.read(), equalTo(new Instant(2000))); - - value.clear(); - assertThat(value.read(), equalTo(null)); - assertThat(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), equalTo(value)); - } - - @Test - public void testWatermarkStateIsEmpty() throws Exception { - WatermarkHoldState value = - underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add(new Instant(1000)); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeEarliestWatermarkIntoSource() throws Exception { - WatermarkHoldState value1 = - underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - WatermarkHoldState value2 = - underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR); - - value1.add(new Instant(3000)); - value2.add(new Instant(5000)); - value1.add(new Instant(4000)); - value2.add(new Instant(2000)); - - // Merging clears the old values and updates the merged value. - StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1); - - assertThat(value1.read(), equalTo(new Instant(2000))); - assertThat(value2.read(), equalTo(null)); - } - - @Test - public void testMergeLatestWatermarkIntoSource() throws Exception { - WatermarkHoldState value1 = - underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); - WatermarkHoldState value2 = - underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR); - WatermarkHoldState value3 = - underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR); - - value1.add(new Instant(3000)); - value2.add(new Instant(5000)); - value1.add(new Instant(4000)); - value2.add(new Instant(2000)); - - // Merging clears the old values and updates the result value. - StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1); - - // Merging clears the old values and updates the result value. - assertThat(value3.read(), equalTo(new Instant(5000))); - assertThat(value1.read(), equalTo(null)); - assertThat(value2.read(), equalTo(null)); - } - - @Test - public void testSetReadable() throws Exception { - SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR); - - // test contains - ReadableState<Boolean> readable = value.contains("A"); - value.add("A"); - assertFalse(readable.read()); - - // test addIfAbsent - value.addIfAbsent("B"); - assertTrue(value.contains("B").read()); - } - - @Test - public void testMapReadable() throws Exception { - MapState<String, Integer> value = underTest.state(NAMESPACE_1, STRING_MAP_ADDR); - - // test iterable, should just return a iterable view of the values contained in this map. - // The iterable is backed by the map, so changes to the map are reflected in the iterable. - ReadableState<Iterable<String>> keys = value.keys(); - ReadableState<Iterable<Integer>> values = value.values(); - ReadableState<Iterable<Map.Entry<String, Integer>>> entries = value.entries(); - value.put("A", 1); - assertFalse(Iterables.isEmpty(keys.read())); - assertFalse(Iterables.isEmpty(values.read())); - assertFalse(Iterables.isEmpty(entries.read())); - - // test get - ReadableState<Integer> get = value.get("B"); - value.put("B", 2); - assertNull(get.read()); - - // test addIfAbsent - value.putIfAbsent("C", 3); - assertThat(value.get("C").read(), equalTo(3)); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java index 26cbfee..9769d10 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java @@ -116,21 +116,6 @@ public class WindowMatchers { } public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue( - Matcher<T> valueMatcher, - long timestamp, - long windowStart, - long windowEnd, - PaneInfo paneInfo) { - IntervalWindow intervalWindow = - new IntervalWindow(new Instant(windowStart), new Instant(windowEnd)); - return WindowMatchers.<T>isSingleWindowedValue( - valueMatcher, - Matchers.describedAs("%0", Matchers.equalTo(new Instant(timestamp)), timestamp), - Matchers.<BoundedWindow>equalTo(intervalWindow), - Matchers.equalTo(paneInfo)); - } - - public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue( Matcher<? super T> valueMatcher, Matcher<? super Instant> timestampMatcher, Matcher<? super BoundedWindow> windowMatcher) { http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java index 2be90de..453c8ff 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.when; +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; @@ -41,8 +42,8 @@ import org.mockito.MockitoAnnotations; @RunWith(JUnit4.class) public class AfterFirstStateMachineTest { - @Mock private TriggerStateMachine mockTrigger1; - @Mock private TriggerStateMachine mockTrigger2; + @Mock private OnceTriggerStateMachine mockTrigger1; + @Mock private OnceTriggerStateMachine mockTrigger2; private SimpleTriggerStateMachineTester<IntervalWindow> tester; private static TriggerStateMachine.TriggerContext anyTriggerContext() { return Mockito.<TriggerStateMachine.TriggerContext>any();
