Clarify semantics of objects returned by state access
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/77840fa3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/77840fa3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/77840fa3 Branch: refs/heads/master Commit: 77840fa3565f6e0ba625556b3fcaff9fa408aca2 Parents: aed6773 Author: Daniel Mills <[email protected]> Authored: Wed Sep 20 16:35:06 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Thu Sep 21 15:32:42 2017 -0700 ---------------------------------------------------------------------- .../runners/core/InMemoryStateInternals.java | 39 +++++++++-- .../CopyOnAccessInMemoryStateInternalsTest.java | 74 +++++++++++--------- .../apache/beam/sdk/state/GroupingState.java | 12 +++- .../org/apache/beam/sdk/state/MapState.java | 20 +++++- .../apache/beam/sdk/state/ReadableState.java | 4 ++ .../org/apache/beam/sdk/state/SetState.java | 10 ++- .../apache/beam/sdk/transforms/ParDoTest.java | 44 +++++++++--- 7 files changed, 148 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/77840fa3/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java index 59814bc..075e264 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java @@ -17,8 +17,12 @@ */ package org.apache.beam.runners.core; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -326,7 +330,8 @@ public class InMemoryStateInternals<K> implements StateInternals { @Override public OutputT read() { - return combineFn.extractOutput(accum); + return combineFn.extractOutput( + combineFn.mergeAccumulators(Arrays.asList(combineFn.createAccumulator(), accum))); } @Override @@ -407,7 +412,7 @@ public class InMemoryStateInternals<K> implements StateInternals { @Override public Iterable<T> read() { - return contents; + return Iterables.limit(contents, contents.size()); } @Override @@ -478,7 +483,7 @@ public class InMemoryStateInternals<K> implements StateInternals { @Override public Iterable<T> read() { - return contents; + return ImmutableSet.copyOf(contents); } @Override @@ -551,19 +556,41 @@ public class InMemoryStateInternals<K> implements StateInternals { contents.remove(key); } + private static class CollectionViewState<T> implements ReadableState<Iterable<T>> { + private final Collection<T> collection; + + private CollectionViewState(Collection<T> collection) { + this.collection = collection; + } + + public static <T> CollectionViewState<T> of(Collection<T> collection) { + return new CollectionViewState<>(collection); + } + + @Override + public Iterable<T> read() { + return ImmutableList.copyOf(collection); + } + + @Override + public ReadableState<Iterable<T>> readLater() { + return this; + } + } + @Override public ReadableState<Iterable<K>> keys() { - return ReadableStates.immediate((Iterable<K>) contents.keySet()); + return CollectionViewState.of(contents.keySet()); } @Override public ReadableState<Iterable<V>> values() { - return ReadableStates.immediate((Iterable<V>) contents.values()); + return CollectionViewState.of(contents.values()); } @Override public ReadableState<Iterable<Map.Entry<K, V>>> entries() { - return ReadableStates.immediate((Iterable<Map.Entry<K, V>>) contents.entrySet()); + return CollectionViewState.of(contents.entrySet()); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/77840fa3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java index 1e60ca3..657bb7f 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import com.google.common.collect.Lists; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaceForTest; import org.apache.beam.runners.core.StateNamespaces; @@ -63,8 +64,10 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class CopyOnAccessInMemoryStateInternalsTest { - @Rule public final TestPipeline pipeline = TestPipeline.create(); - @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + @Rule + public ExpectedException thrown = ExpectedException.none(); private String key = "foo"; @Test @@ -114,7 +117,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { */ @Test public void testGetWithPresentInUnderlying() { - CopyOnAccessInMemoryStateInternals<String>underlying = + CopyOnAccessInMemoryStateInternals<String> underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); StateNamespace namespace = new StateNamespaceForTest("foo"); @@ -125,7 +128,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { underlyingValue.write("bar"); assertThat(underlyingValue.read(), equalTo("bar")); - CopyOnAccessInMemoryStateInternals<String>internals = + CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); ValueState<String> copyOnAccessState = internals.state(namespace, valueTag); assertThat(copyOnAccessState.read(), equalTo("bar")); @@ -140,7 +143,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testBagStateWithUnderlying() { - CopyOnAccessInMemoryStateInternals<String>underlying = + CopyOnAccessInMemoryStateInternals<String> underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); StateNamespace namespace = new StateNamespaceForTest("foo"); @@ -151,7 +154,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { underlyingValue.add(1); assertThat(underlyingValue.read(), containsInAnyOrder(1)); - CopyOnAccessInMemoryStateInternals<String>internals = + CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); BagState<Integer> copyOnAccessState = internals.state(namespace, valueTag); assertThat(copyOnAccessState.read(), containsInAnyOrder(1)); @@ -161,12 +164,13 @@ public class CopyOnAccessInMemoryStateInternalsTest { assertThat(underlyingValue.read(), containsInAnyOrder(1)); BagState<Integer> reReadUnderlyingValue = underlying.state(namespace, valueTag); - assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read())); + assertThat(Lists.newArrayList(underlyingValue.read()), + equalTo(Lists.newArrayList(reReadUnderlyingValue.read()))); } @Test public void testSetStateWithUnderlying() { - CopyOnAccessInMemoryStateInternals<String>underlying = + CopyOnAccessInMemoryStateInternals<String> underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); StateNamespace namespace = new StateNamespaceForTest("foo"); @@ -177,7 +181,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { underlyingValue.add(1); assertThat(underlyingValue.read(), containsInAnyOrder(1)); - CopyOnAccessInMemoryStateInternals<String>internals = + CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); SetState<Integer> copyOnAccessState = internals.state(namespace, valueTag); assertThat(copyOnAccessState.read(), containsInAnyOrder(1)); @@ -192,7 +196,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testMapStateWithUnderlying() { - CopyOnAccessInMemoryStateInternals<String>underlying = + CopyOnAccessInMemoryStateInternals<String> underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); StateNamespace namespace = new StateNamespaceForTest("foo"); @@ -204,7 +208,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { underlyingValue.put("hello", 1); assertThat(underlyingValue.get("hello").read(), equalTo(1)); - CopyOnAccessInMemoryStateInternals<String>internals = + CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); MapState<String, Integer> copyOnAccessState = internals.state(namespace, valueTag); assertThat(copyOnAccessState.get("hello").read(), equalTo(1)); @@ -221,7 +225,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testAccumulatorCombiningStateWithUnderlying() throws CannotProvideCoderException { - CopyOnAccessInMemoryStateInternals<String>underlying = + CopyOnAccessInMemoryStateInternals<String> underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); CombineFn<Long, long[], Long> sumLongFn = Sum.ofLongs(); @@ -236,7 +240,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { underlyingValue.add(1L); assertThat(underlyingValue.read(), equalTo(1L)); - CopyOnAccessInMemoryStateInternals<String>internals = + CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag); assertThat(copyOnAccessState.read(), equalTo(1L)); @@ -251,7 +255,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testWatermarkHoldStateWithUnderlying() { - CopyOnAccessInMemoryStateInternals<String>underlying = + CopyOnAccessInMemoryStateInternals<String> underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); TimestampCombiner timestampCombiner = TimestampCombiner.EARLIEST; @@ -265,7 +269,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { underlyingValue.add(new Instant(250L)); assertThat(underlyingValue.read(), equalTo(new Instant(250L))); - CopyOnAccessInMemoryStateInternals<String>internals = + CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); WatermarkHoldState copyOnAccessState = internals.state(namespace, stateTag); assertThat(copyOnAccessState.read(), equalTo(new Instant(250L))); @@ -284,7 +288,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testCommitWithoutUnderlying() { - CopyOnAccessInMemoryStateInternals<String>internals = + CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); StateNamespace namespace = new StateNamespaceForTest("foo"); StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); @@ -304,9 +308,9 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testCommitWithUnderlying() { - CopyOnAccessInMemoryStateInternals<String>underlying = + CopyOnAccessInMemoryStateInternals<String> underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); - CopyOnAccessInMemoryStateInternals<String>internals = + CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); StateNamespace namespace = new StateNamespaceForTest("foo"); @@ -331,11 +335,11 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testCommitWithClearedInUnderlying() { - CopyOnAccessInMemoryStateInternals<String>underlying = + CopyOnAccessInMemoryStateInternals<String> underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); - CopyOnAccessInMemoryStateInternals<String>secondUnderlying = + CopyOnAccessInMemoryStateInternals<String> secondUnderlying = spy(CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying)); - CopyOnAccessInMemoryStateInternals<String>internals = + CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, secondUnderlying); StateNamespace namespace = new StateNamespaceForTest("foo"); @@ -361,9 +365,9 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testCommitWithOverwrittenUnderlying() { - CopyOnAccessInMemoryStateInternals<String>underlying = + CopyOnAccessInMemoryStateInternals<String> underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); - CopyOnAccessInMemoryStateInternals<String>internals = + CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); StateNamespace namespace = new StateNamespaceForTest("foo"); @@ -392,9 +396,9 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testCommitWithAddedUnderlying() { - CopyOnAccessInMemoryStateInternals<String>underlying = + CopyOnAccessInMemoryStateInternals<String> underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); - CopyOnAccessInMemoryStateInternals<String>internals = + CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); internals.commit(); @@ -416,7 +420,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testCommitWithEmptyTableIsEmpty() { - CopyOnAccessInMemoryStateInternals<String>internals = + CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); internals.commit(); @@ -426,7 +430,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testCommitWithOnlyClearedValuesIsEmpty() { - CopyOnAccessInMemoryStateInternals<String>internals = + CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); StateNamespace namespace = new StateNamespaceForTest("foo"); @@ -444,9 +448,9 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testCommitWithEmptyNewAndFullUnderlyingIsNotEmpty() { - CopyOnAccessInMemoryStateInternals<String>underlying = + CopyOnAccessInMemoryStateInternals<String> underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); - CopyOnAccessInMemoryStateInternals<String>internals = + CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); StateNamespace namespace = new StateNamespaceForTest("foo"); @@ -475,7 +479,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { return new Instant(689743L); } }; - CopyOnAccessInMemoryStateInternals<String>internals = + CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null); StateTag<WatermarkHoldState> firstHoldAddress = @@ -508,7 +512,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { return new Instant(689743L); } }; - CopyOnAccessInMemoryStateInternals<String>underlying = + CopyOnAccessInMemoryStateInternals<String> underlying = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null); StateTag<WatermarkHoldState> firstHoldAddress = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); @@ -516,7 +520,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { underlying.state(StateNamespaces.window(null, first), firstHoldAddress); firstHold.add(new Instant(22L)); - CopyOnAccessInMemoryStateInternals<String>internals = + CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit()); StateTag<WatermarkHoldState> secondHoldAddress = @@ -545,7 +549,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { return new Instant(689743L); } }; - CopyOnAccessInMemoryStateInternals<String>underlying = + CopyOnAccessInMemoryStateInternals<String> underlying = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null); StateTag<WatermarkHoldState> firstHoldAddress = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); @@ -553,7 +557,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { underlying.state(StateNamespaces.window(null, first), firstHoldAddress); firstHold.add(new Instant(224L)); - CopyOnAccessInMemoryStateInternals<String>internals = + CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit()); StateTag<WatermarkHoldState> secondHoldAddress = @@ -568,7 +572,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testGetEarliestHoldBeforeCommit() { - CopyOnAccessInMemoryStateInternals<String>internals = + CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); internals http://git-wip-us.apache.org/repos/asf/beam/blob/77840fa3/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java index 9c4c23e..8f244d4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java @@ -33,10 +33,18 @@ import org.apache.beam.sdk.transforms.GroupByKey; */ @Experimental(Kind.STATE) public interface GroupingState<InputT, OutputT> extends ReadableState<OutputT>, State { - /** Add a value to the buffer. */ + /** + * Add a value to the buffer. + * + * <p>Elements added will not be reflected in {@code OutputT} objects returned by + * previous calls to {@link #read}. + */ void add(InputT value); - /** Return true if this state is empty. */ + /** + * Returns a {@link ReadableState} whose {@link #read} method will return true if this state is + * empty at the point when that {@link #read} call returns. + */ ReadableState<Boolean> isEmpty(); @Override http://git-wip-us.apache.org/repos/asf/beam/blob/77840fa3/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java index 17ea332..8b89d7b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java @@ -33,7 +33,13 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; @Experimental(Kind.STATE) public interface MapState<K, V> extends State { - /** Associates the specified value with the specified key in this state. */ + /** + * Associates the specified value with the specified key in this state. + * + * <p>Changes will not be reflected in the results returned by + * previous calls to {@link ReadableState#read} on the results any of the reading methods + * ({@link #get}, {@link #keys}, {@link #values}, and {@link #entries}). + */ void put(K key, V value); /** @@ -44,10 +50,20 @@ public interface MapState<K, V> extends State { * * <p>If the specified key is not already associated with a value (or is mapped to {@code null}) * associates it with the given value and returns {@code null}, else returns the current value. + * + * <p>Changes will not be reflected in the results returned by + * previous calls to {@link ReadableState#read} on the results any of the reading methods + * ({@link #get}, {@link #keys}, {@link #values}, and {@link #entries}). */ ReadableState<V> putIfAbsent(K key, V value); - /** Remove the mapping for a key from this map if it is present. */ + /** + * Remove the mapping for a key from this map if it is present. + * + * <p>Changes will not be reflected in the results returned by + * previous calls to {@link ReadableState#read} on the results any of the reading methods + * ({@link #get}, {@link #keys}, {@link #values}, and {@link #entries}). + */ void remove(K key); /** http://git-wip-us.apache.org/repos/asf/beam/blob/77840fa3/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java index 70703ce..f2774ba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java @@ -36,6 +36,10 @@ public interface ReadableState<T> { * <p>If there will be many calls to {@link #read} for different state in short succession, you * should first call {@link #readLater} for all of them so the reads can potentially be batched * (depending on the underlying implementation}. + * + * <p>The returned object should be independent of the underlying state. Any direct modification + * of the returned object should not modify state without going through the appropriate state + * interface, and modification to the state should not be mirrored in the returned object. */ T read(); http://git-wip-us.apache.org/repos/asf/beam/blob/77840fa3/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java index fd339b2..d94c5c1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java @@ -36,10 +36,18 @@ public interface SetState<T> extends GroupingState<T, Iterable<T>> { /** * Ensures a value is a member of the set, returning {@code true} if it was added and {@code * false} otherwise. + * + * <p>Elements added will not be reflected in {@code OutputT} objects returned by + * previous calls to {@link #read}. */ ReadableState<Boolean> addIfAbsent(T t); - /** Removes the specified element from this set if it is present. */ + /** + * Removes the specified element from this set if it is present. + * + * <p>Changes will not be reflected in {@code OutputT} objects returned by + * previous calls to {@link #read}. + */ void remove(T t); @Override http://git-wip-us.apache.org/repos/asf/beam/blob/77840fa3/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 142dff8..03e3104 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -34,6 +34,7 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import com.fasterxml.jackson.annotation.JsonCreator; @@ -52,6 +53,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; @@ -67,6 +69,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.BagState; import org.apache.beam.sdk.state.CombiningState; import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.ReadableState; import org.apache.beam.sdk.state.SetState; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; @@ -1983,9 +1986,16 @@ public class ParDoTest implements Serializable { @ProcessElement public void processElement( ProcessContext c, @StateId(stateId) BagState<Integer> state) { - Iterable<Integer> currentValue = state.read(); + ReadableState<Boolean> isEmpty = state.isEmpty(); state.add(c.element().getValue()); - if (Iterables.size(state.read()) >= 4) { + assertFalse(isEmpty.read()); + Iterable<Integer> currentValue = state.read(); + if (Iterables.size(currentValue) >= 4) { + // Make sure that the cached Iterable doesn't change when new elements are added. + state.add(-1); + assertEquals(4, Iterables.size(currentValue)); + assertEquals(5, Iterables.size(state.read())); + List<Integer> sorted = Lists.newArrayList(currentValue); Collections.sort(sorted); c.output(sorted); @@ -2020,9 +2030,9 @@ public class ParDoTest implements Serializable { @ProcessElement public void processElement( ProcessContext c, @StateId(stateId) BagState<MyInteger> state) { - Iterable<MyInteger> currentValue = state.read(); state.add(new MyInteger(c.element().getValue())); - if (Iterables.size(state.read()) >= 4) { + Iterable<MyInteger> currentValue = state.read(); + if (Iterables.size(currentValue) >= 4) { List<MyInteger> sorted = Lists.newArrayList(currentValue); Collections.sort(sorted); c.output(sorted); @@ -2058,9 +2068,9 @@ public class ParDoTest implements Serializable { @ProcessElement public void processElement( ProcessContext c, @StateId(stateId) BagState<MyInteger> state) { - Iterable<MyInteger> currentValue = state.read(); state.add(new MyInteger(c.element().getValue())); - if (Iterables.size(state.read()) >= 4) { + Iterable<MyInteger> currentValue = state.read(); + if (Iterables.size(currentValue) >= 4) { List<MyInteger> sorted = Lists.newArrayList(currentValue); Collections.sort(sorted); c.output(sorted); @@ -2102,10 +2112,18 @@ public class ParDoTest implements Serializable { @StateId(stateId) SetState<Integer> state, @StateId(countStateId) CombiningState<Integer, int[], Integer> count) { + ReadableState<Boolean> isEmpty = state.isEmpty(); state.add(c.element().getValue()); + assertFalse(isEmpty.read()); count.add(1); if (count.read() >= 4) { - Set<Integer> set = Sets.newHashSet(state.read()); + // Make sure that the cached Iterable doesn't change when new elements are added. + Iterable<Integer> ints = state.read(); + state.add(-1); + assertEquals(3, Iterables.size(ints)); + assertEquals(4, Iterables.size(state.read())); + + Set<Integer> set = Sets.newHashSet(ints); c.output(set); } } @@ -2231,10 +2249,18 @@ public class ParDoTest implements Serializable { @StateId(countStateId) CombiningState<Integer, int[], Integer> count) { KV<String, Integer> value = c.element().getValue(); + ReadableState<Iterable<Entry<String, Integer>>> entriesView = state.entries(); state.put(value.getKey(), value.getValue()); count.add(1); if (count.read() >= 4) { Iterable<Map.Entry<String, Integer>> iterate = state.entries().read(); + // Make sure that the cached Iterable doesn't change when new elements are added, but + // that cached ReadableState views of the state do change. + state.put("BadKey", -1); + assertEquals(3, Iterables.size(iterate)); + assertEquals(4, Iterables.size(entriesView.read())); + assertEquals(4, Iterables.size(state.entries().read())); + for (Map.Entry<String, Integer> entry : iterate) { c.output(KV.of(entry.getKey(), entry.getValue())); } @@ -2525,9 +2551,9 @@ public class ParDoTest implements Serializable { @ProcessElement public void processElement( ProcessContext c, @StateId(stateId) BagState<Integer> state) { - Iterable<Integer> currentValue = state.read(); state.add(c.element().getValue()); - if (Iterables.size(state.read()) >= 4) { + Iterable<Integer> currentValue = state.read(); + if (Iterables.size(currentValue) >= 4) { List<Integer> sorted = Lists.newArrayList(currentValue); Collections.sort(sorted); c.output(sorted);
