Clarify semantics of objects returned by state access

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/77840fa3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/77840fa3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/77840fa3

Branch: refs/heads/master
Commit: 77840fa3565f6e0ba625556b3fcaff9fa408aca2
Parents: aed6773
Author: Daniel Mills <[email protected]>
Authored: Wed Sep 20 16:35:06 2017 -0700
Committer: Thomas Groh <[email protected]>
Committed: Thu Sep 21 15:32:42 2017 -0700

----------------------------------------------------------------------
 .../runners/core/InMemoryStateInternals.java    | 39 +++++++++--
 .../CopyOnAccessInMemoryStateInternalsTest.java | 74 +++++++++++---------
 .../apache/beam/sdk/state/GroupingState.java    | 12 +++-
 .../org/apache/beam/sdk/state/MapState.java     | 20 +++++-
 .../apache/beam/sdk/state/ReadableState.java    |  4 ++
 .../org/apache/beam/sdk/state/SetState.java     | 10 ++-
 .../apache/beam/sdk/transforms/ParDoTest.java   | 44 +++++++++---
 7 files changed, 148 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/77840fa3/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
index 59814bc..075e264 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -17,8 +17,12 @@
  */
 package org.apache.beam.runners.core;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -326,7 +330,8 @@ public class InMemoryStateInternals<K> implements 
StateInternals {
 
     @Override
     public OutputT read() {
-      return combineFn.extractOutput(accum);
+      return combineFn.extractOutput(
+          
combineFn.mergeAccumulators(Arrays.asList(combineFn.createAccumulator(), 
accum)));
     }
 
     @Override
@@ -407,7 +412,7 @@ public class InMemoryStateInternals<K> implements 
StateInternals {
 
     @Override
     public Iterable<T> read() {
-      return contents;
+      return Iterables.limit(contents, contents.size());
     }
 
     @Override
@@ -478,7 +483,7 @@ public class InMemoryStateInternals<K> implements 
StateInternals {
 
     @Override
     public Iterable<T> read() {
-      return contents;
+      return ImmutableSet.copyOf(contents);
     }
 
     @Override
@@ -551,19 +556,41 @@ public class InMemoryStateInternals<K> implements 
StateInternals {
       contents.remove(key);
     }
 
+    private static class CollectionViewState<T> implements 
ReadableState<Iterable<T>> {
+      private final Collection<T> collection;
+
+      private CollectionViewState(Collection<T> collection) {
+        this.collection = collection;
+      }
+
+      public static <T> CollectionViewState<T> of(Collection<T> collection) {
+        return new CollectionViewState<>(collection);
+      }
+
+      @Override
+      public Iterable<T> read() {
+        return ImmutableList.copyOf(collection);
+      }
+
+      @Override
+      public ReadableState<Iterable<T>> readLater() {
+        return this;
+      }
+    }
+
     @Override
     public ReadableState<Iterable<K>> keys() {
-      return ReadableStates.immediate((Iterable<K>) contents.keySet());
+      return CollectionViewState.of(contents.keySet());
     }
 
     @Override
     public ReadableState<Iterable<V>> values() {
-      return ReadableStates.immediate((Iterable<V>) contents.values());
+      return CollectionViewState.of(contents.values());
     }
 
     @Override
     public ReadableState<Iterable<Map.Entry<K, V>>> entries() {
-      return ReadableStates.immediate((Iterable<Map.Entry<K, V>>) 
contents.entrySet());
+      return CollectionViewState.of(contents.entrySet());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/77840fa3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 1e60ca3..657bb7f 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
+import com.google.common.collect.Lists;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaceForTest;
 import org.apache.beam.runners.core.StateNamespaces;
@@ -63,8 +64,10 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class CopyOnAccessInMemoryStateInternalsTest {
 
-  @Rule public final TestPipeline pipeline = TestPipeline.create();
-  @Rule public ExpectedException thrown = ExpectedException.none();
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
   private String key = "foo";
 
   @Test
@@ -114,7 +117,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
    */
   @Test
   public void testGetWithPresentInUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String>underlying =
+    CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
@@ -125,7 +128,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     underlyingValue.write("bar");
     assertThat(underlyingValue.read(), equalTo("bar"));
 
-    CopyOnAccessInMemoryStateInternals<String>internals =
+    CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
     ValueState<String> copyOnAccessState = internals.state(namespace, 
valueTag);
     assertThat(copyOnAccessState.read(), equalTo("bar"));
@@ -140,7 +143,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testBagStateWithUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String>underlying =
+    CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
@@ -151,7 +154,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     underlyingValue.add(1);
     assertThat(underlyingValue.read(), containsInAnyOrder(1));
 
-    CopyOnAccessInMemoryStateInternals<String>internals =
+    CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
     BagState<Integer> copyOnAccessState = internals.state(namespace, valueTag);
     assertThat(copyOnAccessState.read(), containsInAnyOrder(1));
@@ -161,12 +164,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     assertThat(underlyingValue.read(), containsInAnyOrder(1));
 
     BagState<Integer> reReadUnderlyingValue = underlying.state(namespace, 
valueTag);
-    assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
+    assertThat(Lists.newArrayList(underlyingValue.read()),
+        equalTo(Lists.newArrayList(reReadUnderlyingValue.read())));
   }
 
   @Test
   public void testSetStateWithUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String>underlying =
+    CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
@@ -177,7 +181,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     underlyingValue.add(1);
     assertThat(underlyingValue.read(), containsInAnyOrder(1));
 
-    CopyOnAccessInMemoryStateInternals<String>internals =
+    CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
     SetState<Integer> copyOnAccessState = internals.state(namespace, valueTag);
     assertThat(copyOnAccessState.read(), containsInAnyOrder(1));
@@ -192,7 +196,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testMapStateWithUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String>underlying =
+    CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
@@ -204,7 +208,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     underlyingValue.put("hello", 1);
     assertThat(underlyingValue.get("hello").read(), equalTo(1));
 
-    CopyOnAccessInMemoryStateInternals<String>internals =
+    CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
     MapState<String, Integer> copyOnAccessState = internals.state(namespace, 
valueTag);
     assertThat(copyOnAccessState.get("hello").read(), equalTo(1));
@@ -221,7 +225,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testAccumulatorCombiningStateWithUnderlying() throws 
CannotProvideCoderException {
-    CopyOnAccessInMemoryStateInternals<String>underlying =
+    CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
     CombineFn<Long, long[], Long> sumLongFn = Sum.ofLongs();
 
@@ -236,7 +240,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     underlyingValue.add(1L);
     assertThat(underlyingValue.read(), equalTo(1L));
 
-    CopyOnAccessInMemoryStateInternals<String>internals =
+    CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
     GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, 
stateTag);
     assertThat(copyOnAccessState.read(), equalTo(1L));
@@ -251,7 +255,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testWatermarkHoldStateWithUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String>underlying =
+    CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     TimestampCombiner timestampCombiner = TimestampCombiner.EARLIEST;
@@ -265,7 +269,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     underlyingValue.add(new Instant(250L));
     assertThat(underlyingValue.read(), equalTo(new Instant(250L)));
 
-    CopyOnAccessInMemoryStateInternals<String>internals =
+    CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
     WatermarkHoldState copyOnAccessState = internals.state(namespace, 
stateTag);
     assertThat(copyOnAccessState.read(), equalTo(new Instant(250L)));
@@ -284,7 +288,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithoutUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String>internals =
+    CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
     StateNamespace namespace = new StateNamespaceForTest("foo");
     StateTag<BagState<String>> bagTag = StateTags.bag("foo", 
StringUtf8Coder.of());
@@ -304,9 +308,9 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String>underlying =
+    CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CopyOnAccessInMemoryStateInternals<String>internals =
+    CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
@@ -331,11 +335,11 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithClearedInUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String>underlying =
+    CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CopyOnAccessInMemoryStateInternals<String>secondUnderlying =
+    CopyOnAccessInMemoryStateInternals<String> secondUnderlying =
         spy(CopyOnAccessInMemoryStateInternals.withUnderlying(key, 
underlying));
-    CopyOnAccessInMemoryStateInternals<String>internals =
+    CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, 
secondUnderlying);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
@@ -361,9 +365,9 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithOverwrittenUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String>underlying =
+    CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CopyOnAccessInMemoryStateInternals<String>internals =
+    CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
@@ -392,9 +396,9 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithAddedUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String>underlying =
+    CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CopyOnAccessInMemoryStateInternals<String>internals =
+    CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
 
     internals.commit();
@@ -416,7 +420,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithEmptyTableIsEmpty() {
-    CopyOnAccessInMemoryStateInternals<String>internals =
+    CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     internals.commit();
@@ -426,7 +430,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithOnlyClearedValuesIsEmpty() {
-    CopyOnAccessInMemoryStateInternals<String>internals =
+    CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
@@ -444,9 +448,9 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithEmptyNewAndFullUnderlyingIsNotEmpty() {
-    CopyOnAccessInMemoryStateInternals<String>underlying =
+    CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CopyOnAccessInMemoryStateInternals<String>internals =
+    CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
@@ -475,7 +479,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
         return new Instant(689743L);
       }
     };
-    CopyOnAccessInMemoryStateInternals<String>internals =
+    CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
 
     StateTag<WatermarkHoldState> firstHoldAddress =
@@ -508,7 +512,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
         return new Instant(689743L);
       }
     };
-    CopyOnAccessInMemoryStateInternals<String>underlying =
+    CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
     StateTag<WatermarkHoldState> firstHoldAddress =
         StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
@@ -516,7 +520,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
         underlying.state(StateNamespaces.window(null, first), 
firstHoldAddress);
     firstHold.add(new Instant(22L));
 
-    CopyOnAccessInMemoryStateInternals<String>internals =
+    CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", 
underlying.commit());
 
     StateTag<WatermarkHoldState> secondHoldAddress =
@@ -545,7 +549,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
             return new Instant(689743L);
           }
         };
-    CopyOnAccessInMemoryStateInternals<String>underlying =
+    CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
     StateTag<WatermarkHoldState> firstHoldAddress =
         StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
@@ -553,7 +557,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
         underlying.state(StateNamespaces.window(null, first), 
firstHoldAddress);
     firstHold.add(new Instant(224L));
 
-    CopyOnAccessInMemoryStateInternals<String>internals =
+    CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", 
underlying.commit());
 
     StateTag<WatermarkHoldState> secondHoldAddress =
@@ -568,7 +572,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testGetEarliestHoldBeforeCommit() {
-    CopyOnAccessInMemoryStateInternals<String>internals =
+    CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     internals

http://git-wip-us.apache.org/repos/asf/beam/blob/77840fa3/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java
index 9c4c23e..8f244d4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java
@@ -33,10 +33,18 @@ import org.apache.beam.sdk.transforms.GroupByKey;
  */
 @Experimental(Kind.STATE)
 public interface GroupingState<InputT, OutputT> extends 
ReadableState<OutputT>, State {
-  /** Add a value to the buffer. */
+  /**
+   * Add a value to the buffer.
+   *
+   * <p>Elements added will not be reflected in {@code OutputT} objects 
returned by
+   * previous calls to {@link #read}.
+   */
   void add(InputT value);
 
-  /** Return true if this state is empty. */
+  /**
+   * Returns a {@link ReadableState} whose {@link #read} method will return 
true if this state is
+   * empty at the point when that {@link #read} call returns.
+   */
   ReadableState<Boolean> isEmpty();
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/77840fa3/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java
index 17ea332..8b89d7b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java
@@ -33,7 +33,13 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
 @Experimental(Kind.STATE)
 public interface MapState<K, V> extends State {
 
-  /** Associates the specified value with the specified key in this state. */
+  /**
+   * Associates the specified value with the specified key in this state.
+   *
+   * <p>Changes will not be reflected in the results returned by
+   * previous calls to {@link ReadableState#read} on the results any of the 
reading methods
+   * ({@link #get}, {@link #keys}, {@link #values}, and {@link #entries}).
+   */
   void put(K key, V value);
 
   /**
@@ -44,10 +50,20 @@ public interface MapState<K, V> extends State {
    *
    * <p>If the specified key is not already associated with a value (or is 
mapped to {@code null})
    * associates it with the given value and returns {@code null}, else returns 
the current value.
+   *
+   * <p>Changes will not be reflected in the results returned by
+   * previous calls to {@link ReadableState#read} on the results any of the 
reading methods
+   * ({@link #get}, {@link #keys}, {@link #values}, and {@link #entries}).
    */
   ReadableState<V> putIfAbsent(K key, V value);
 
-  /** Remove the mapping for a key from this map if it is present. */
+  /**
+   * Remove the mapping for a key from this map if it is present.
+   *
+   * <p>Changes will not be reflected in the results returned by
+   * previous calls to {@link ReadableState#read} on the results any of the 
reading methods
+   * ({@link #get}, {@link #keys}, {@link #values}, and {@link #entries}).
+   */
   void remove(K key);
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/77840fa3/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java
index 70703ce..f2774ba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java
@@ -36,6 +36,10 @@ public interface ReadableState<T> {
    * <p>If there will be many calls to {@link #read} for different state in 
short succession, you
    * should first call {@link #readLater} for all of them so the reads can 
potentially be batched
    * (depending on the underlying implementation}.
+   *
+   * <p>The returned object should be independent of the underlying state.  
Any direct modification
+   * of the returned object should not modify state without going through the 
appropriate state
+   * interface, and modification to the state should not be mirrored in the 
returned object.
    */
   T read();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/77840fa3/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java
index fd339b2..d94c5c1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java
@@ -36,10 +36,18 @@ public interface SetState<T> extends GroupingState<T, 
Iterable<T>> {
   /**
    * Ensures a value is a member of the set, returning {@code true} if it was 
added and {@code
    * false} otherwise.
+   *
+   * <p>Elements added will not be reflected in {@code OutputT} objects 
returned by
+   * previous calls to {@link #read}.
    */
   ReadableState<Boolean> addIfAbsent(T t);
 
-  /** Removes the specified element from this set if it is present. */
+  /**
+   * Removes the specified element from this set if it is present.
+   *
+   * <p>Changes will not be reflected in {@code OutputT} objects returned by
+   * previous calls to {@link #read}.
+   */
   void remove(T t);
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/77840fa3/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 142dff8..03e3104 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -34,6 +34,7 @@ import static org.hamcrest.Matchers.not;
 import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -52,6 +53,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -67,6 +69,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.CombiningState;
 import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.ReadableState;
 import org.apache.beam.sdk.state.SetState;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
@@ -1983,9 +1986,16 @@ public class ParDoTest implements Serializable {
           @ProcessElement
           public void processElement(
               ProcessContext c, @StateId(stateId) BagState<Integer> state) {
-            Iterable<Integer> currentValue = state.read();
+            ReadableState<Boolean> isEmpty = state.isEmpty();
             state.add(c.element().getValue());
-            if (Iterables.size(state.read()) >= 4) {
+            assertFalse(isEmpty.read());
+            Iterable<Integer> currentValue = state.read();
+            if (Iterables.size(currentValue) >= 4) {
+              // Make sure that the cached Iterable doesn't change when new 
elements are added.
+              state.add(-1);
+              assertEquals(4, Iterables.size(currentValue));
+              assertEquals(5, Iterables.size(state.read()));
+
               List<Integer> sorted = Lists.newArrayList(currentValue);
               Collections.sort(sorted);
               c.output(sorted);
@@ -2020,9 +2030,9 @@ public class ParDoTest implements Serializable {
           @ProcessElement
           public void processElement(
               ProcessContext c, @StateId(stateId) BagState<MyInteger> state) {
-            Iterable<MyInteger> currentValue = state.read();
             state.add(new MyInteger(c.element().getValue()));
-            if (Iterables.size(state.read()) >= 4) {
+            Iterable<MyInteger> currentValue = state.read();
+            if (Iterables.size(currentValue) >= 4) {
               List<MyInteger> sorted = Lists.newArrayList(currentValue);
               Collections.sort(sorted);
               c.output(sorted);
@@ -2058,9 +2068,9 @@ public class ParDoTest implements Serializable {
           @ProcessElement
           public void processElement(
               ProcessContext c, @StateId(stateId) BagState<MyInteger> state) {
-            Iterable<MyInteger> currentValue = state.read();
             state.add(new MyInteger(c.element().getValue()));
-            if (Iterables.size(state.read()) >= 4) {
+            Iterable<MyInteger> currentValue = state.read();
+            if (Iterables.size(currentValue) >= 4) {
               List<MyInteger> sorted = Lists.newArrayList(currentValue);
               Collections.sort(sorted);
               c.output(sorted);
@@ -2102,10 +2112,18 @@ public class ParDoTest implements Serializable {
               @StateId(stateId) SetState<Integer> state,
               @StateId(countStateId) CombiningState<Integer, int[], Integer>
                   count) {
+            ReadableState<Boolean> isEmpty = state.isEmpty();
             state.add(c.element().getValue());
+            assertFalse(isEmpty.read());
             count.add(1);
             if (count.read() >= 4) {
-              Set<Integer> set = Sets.newHashSet(state.read());
+              // Make sure that the cached Iterable doesn't change when new 
elements are added.
+              Iterable<Integer> ints = state.read();
+              state.add(-1);
+              assertEquals(3, Iterables.size(ints));
+              assertEquals(4, Iterables.size(state.read()));
+
+              Set<Integer> set = Sets.newHashSet(ints);
               c.output(set);
             }
           }
@@ -2231,10 +2249,18 @@ public class ParDoTest implements Serializable {
               @StateId(countStateId) CombiningState<Integer, int[], Integer>
                   count) {
             KV<String, Integer> value = c.element().getValue();
+            ReadableState<Iterable<Entry<String, Integer>>> entriesView = 
state.entries();
             state.put(value.getKey(), value.getValue());
             count.add(1);
             if (count.read() >= 4) {
               Iterable<Map.Entry<String, Integer>> iterate = 
state.entries().read();
+              // Make sure that the cached Iterable doesn't change when new 
elements are added, but
+              // that cached ReadableState views of the state do change.
+              state.put("BadKey", -1);
+              assertEquals(3, Iterables.size(iterate));
+              assertEquals(4, Iterables.size(entriesView.read()));
+              assertEquals(4, Iterables.size(state.entries().read()));
+
               for (Map.Entry<String, Integer> entry : iterate) {
                 c.output(KV.of(entry.getKey(), entry.getValue()));
               }
@@ -2525,9 +2551,9 @@ public class ParDoTest implements Serializable {
           @ProcessElement
           public void processElement(
               ProcessContext c, @StateId(stateId) BagState<Integer> state) {
-            Iterable<Integer> currentValue = state.read();
             state.add(c.element().getValue());
-            if (Iterables.size(state.read()) >= 4) {
+            Iterable<Integer> currentValue = state.read();
+            if (Iterables.size(currentValue) >= 4) {
               List<Integer> sorted = Lists.newArrayList(currentValue);
               Collections.sort(sorted);
               c.output(sorted);

Reply via email to