Repository: beam Updated Branches: refs/heads/master b92032ff6 -> a8edbb81f
Revise MapState and SetState APIs to leverage ReadableState Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/604be670 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/604be670 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/604be670 Branch: refs/heads/master Commit: 604be6703daadfcf085d69ee2859577218d6b3d4 Parents: b92032f Author: Kenneth Knowles <[email protected]> Authored: Tue Mar 28 20:08:45 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Apr 6 15:36:09 2017 -0700 ---------------------------------------------------------------------- .../runners/core/InMemoryStateInternals.java | 84 ++++--------------- .../core/InMemoryStateInternalsTest.java | 87 +++++++++----------- .../CopyOnAccessInMemoryStateInternalsTest.java | 16 ++-- .../apache/beam/sdk/util/state/MapState.java | 52 +++++------- .../beam/sdk/util/state/ReadableStates.java | 45 ++++++++++ .../apache/beam/sdk/util/state/SetState.java | 34 +------- .../apache/beam/sdk/transforms/ParDoTest.java | 6 +- 7 files changed, 134 insertions(+), 190 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/604be670/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java index 0d5b058..55b7fc2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java @@ -17,8 +17,6 @@ */ package org.apache.beam.runners.core; -import static com.google.common.base.Preconditions.checkNotNull; - import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -42,6 +40,7 @@ import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.MapState; import org.apache.beam.sdk.util.state.ReadableState; +import org.apache.beam.sdk.util.state.ReadableStates; import org.apache.beam.sdk.util.state.SetState; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateContext; @@ -468,13 +467,15 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { } @Override - public boolean contains(T t) { - return contents.contains(t); + public ReadableState<Boolean> contains(T t) { + return ReadableStates.immediate(contents.contains(t)); } @Override - public boolean addIfAbsent(T t) { - return contents.add(t); + public ReadableState<Boolean> addIfAbsent(T t) { + boolean alreadyContained = contents.contains(t); + contents.add(t); + return ReadableStates.immediate(!alreadyContained); } @Override @@ -483,33 +484,6 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { } @Override - public SetState<T> readLater(Iterable<T> elements) { - return this; - } - - @Override - public boolean containsAny(Iterable<T> elements) { - elements = checkNotNull(elements); - for (T t : elements) { - if (contents.contains(t)) { - return true; - } - } - return false; - } - - @Override - public boolean containsAll(Iterable<T> elements) { - elements = checkNotNull(elements); - for (T t : elements) { - if (!contents.contains(t)) { - return false; - } - } - return true; - } - - @Override public InMemorySet<T> readLater() { return this; } @@ -565,8 +539,8 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { } @Override - public V get(K key) { - return contents.get(key); + public ReadableState<V> get(K key) { + return ReadableStates.immediate(contents.get(key)); } @Override @@ -575,13 +549,13 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { } @Override - public V putIfAbsent(K key, V value) { + public ReadableState<V> putIfAbsent(K key, V value) { V v = contents.get(key); if (v == null) { v = contents.put(key, value); } - return v; + return ReadableStates.immediate(v); } @Override @@ -590,42 +564,18 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { } @Override - public Iterable<V> get(Iterable<K> keys) { - List<V> values = new ArrayList<>(); - for (K k : keys) { - values.add(contents.get(k)); - } - return values; + public ReadableState<Iterable<K>> keys() { + return ReadableStates.immediate((Iterable<K>) contents.keySet()); } @Override - public MapState<K, V> getLater(K k) { - return this; - } - - @Override - public MapState<K, V> getLater(Iterable<K> keys) { - return this; - } - - @Override - public Iterable<K> keys() { - return contents.keySet(); - } - - @Override - public Iterable<V> values() { - return contents.values(); - } - - @Override - public MapState<K, V> iterateLater() { - return this; + public ReadableState<Iterable<V>> values() { + return ReadableStates.immediate((Iterable<V>) contents.values()); } @Override - public Iterable<Map.Entry<K, V>> iterate() { - return contents.entrySet(); + public ReadableState<Iterable<Map.Entry<K, V>>> entries() { + return ReadableStates.immediate((Iterable<Map.Entry<K, V>>) contents.entrySet()); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/604be670/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java index e4fb5c1..34ddae6 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java @@ -17,7 +17,9 @@ */ package org.apache.beam.runners.core; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -26,7 +28,6 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import java.util.Arrays; -import java.util.Collections; import java.util.Map; import java.util.Objects; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -112,10 +113,10 @@ public class InMemoryStateInternalsTest { assertThat(value.read(), Matchers.emptyIterable()); value.add("hello"); - assertThat(value.read(), Matchers.containsInAnyOrder("hello")); + assertThat(value.read(), containsInAnyOrder("hello")); value.add("world"); - assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world")); + assertThat(value.read(), containsInAnyOrder("hello", "world")); value.clear(); assertThat(value.read(), Matchers.emptyIterable()); @@ -147,7 +148,7 @@ public class InMemoryStateInternalsTest { StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1); // Reading the merged bag gets both the contents - assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); + assertThat(bag1.read(), containsInAnyOrder("Hello", "World", "!")); assertThat(bag2.read(), Matchers.emptyIterable()); } @@ -164,7 +165,7 @@ public class InMemoryStateInternalsTest { StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3); // Reading the merged bag gets both the contents - assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); + assertThat(bag3.read(), containsInAnyOrder("Hello", "World", "!")); assertThat(bag1.read(), Matchers.emptyIterable()); assertThat(bag2.read(), Matchers.emptyIterable()); } @@ -179,41 +180,32 @@ public class InMemoryStateInternalsTest { // empty assertThat(value.read(), Matchers.emptyIterable()); - assertFalse(value.contains("A")); - assertFalse(value.containsAny(Collections.singletonList("A"))); + assertFalse(value.contains("A").read()); // add value.add("A"); value.add("B"); value.add("A"); - assertFalse(value.addIfAbsent("B")); - assertThat(value.read(), Matchers.containsInAnyOrder("A", "B")); + assertFalse(value.addIfAbsent("B").read()); + assertThat(value.read(), containsInAnyOrder("A", "B")); // remove value.remove("A"); - assertThat(value.read(), Matchers.containsInAnyOrder("B")); + assertThat(value.read(), containsInAnyOrder("B")); value.remove("C"); - assertThat(value.read(), Matchers.containsInAnyOrder("B")); + assertThat(value.read(), containsInAnyOrder("B")); // contains - assertFalse(value.contains("A")); - assertTrue(value.contains("B")); + assertFalse(value.contains("A").read()); + assertTrue(value.contains("B").read()); value.add("C"); value.add("D"); - // containsAny - assertTrue(value.containsAny(Arrays.asList("A", "C"))); - assertFalse(value.containsAny(Arrays.asList("A", "E"))); - - // containsAll - assertTrue(value.containsAll(Arrays.asList("B", "C"))); - assertFalse(value.containsAll(Arrays.asList("A", "B"))); - // readLater - assertThat(value.readLater().read(), Matchers.containsInAnyOrder("B", "C", "D")); - SetState<String> later = value.readLater(Arrays.asList("A", "C", "D")); - assertTrue(later.containsAll(Arrays.asList("C", "D"))); - assertFalse(later.contains("A")); + assertThat(value.readLater().read(), containsInAnyOrder("B", "C", "D")); + SetState<String> later = value.readLater(); + assertThat(later.read(), hasItems("C", "D")); + assertFalse(later.contains("A").read()); // clear value.clear(); @@ -248,7 +240,7 @@ public class InMemoryStateInternalsTest { StateMerging.mergeSets(Arrays.asList(set1, set2), set1); // Reading the merged set gets both the contents - assertThat(set1.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); + assertThat(set1.read(), containsInAnyOrder("Hello", "World", "!")); assertThat(set2.read(), Matchers.emptyIterable()); } @@ -266,7 +258,7 @@ public class InMemoryStateInternalsTest { StateMerging.mergeSets(Arrays.asList(set1, set2, set3), set3); // Reading the merged set gets both the contents - assertThat(set3.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); + assertThat(set3.read(), containsInAnyOrder("Hello", "World", "!")); assertThat(set1.read(), Matchers.emptyIterable()); assertThat(set2.read(), Matchers.emptyIterable()); } @@ -330,49 +322,46 @@ public class InMemoryStateInternalsTest { assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_MAP_ADDR)))); // put - assertThat(value.iterate(), Matchers.emptyIterable()); + assertThat(value.entries().read(), Matchers.emptyIterable()); value.put("A", 1); value.put("B", 2); value.put("A", 11); - assertThat(value.putIfAbsent("B", 22), equalTo(2)); - assertThat(value.iterate(), Matchers.containsInAnyOrder(MapEntry.of("A", 11), + assertThat(value.putIfAbsent("B", 22).read(), equalTo(2)); + assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("A", 11), MapEntry.of("B", 2))); // remove value.remove("A"); - assertThat(value.iterate(), Matchers.containsInAnyOrder(MapEntry.of("B", 2))); + assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2))); value.remove("C"); - assertThat(value.iterate(), Matchers.containsInAnyOrder(MapEntry.of("B", 2))); + assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2))); // get - assertNull(value.get("A")); - assertThat(value.get("B"), equalTo(2)); + assertNull(value.get("A").read()); + assertThat(value.get("B").read(), equalTo(2)); value.put("C", 3); value.put("D", 4); - assertThat(value.get("C"), equalTo(3)); - assertThat(value.get(Collections.singletonList("D")), Matchers.containsInAnyOrder(4)); - assertThat(value.get(Arrays.asList("B", "C")), Matchers.containsInAnyOrder(2, 3)); + assertThat(value.get("C").read(), equalTo(3)); // iterate value.put("E", 5); value.remove("C"); - assertThat(value.keys(), Matchers.containsInAnyOrder("B", "D", "E")); - assertThat(value.values(), Matchers.containsInAnyOrder(2, 4, 5)); - assertThat(value.iterate(), Matchers.containsInAnyOrder( - MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5))); + assertThat(value.keys().read(), containsInAnyOrder("B", "D", "E")); + assertThat(value.values().read(), containsInAnyOrder(2, 4, 5)); + assertThat( + value.entries().read(), + containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5))); // readLater - assertThat(value.getLater("B").get("B"), equalTo(2)); - assertNull(value.getLater("A").get("A")); - MapState<String, Integer> later = value.getLater(Arrays.asList("C", "D")); - assertNull(later.get("C")); - assertThat(later.get("D"), equalTo(4)); - assertThat(value.iterateLater().iterate(), Matchers.containsInAnyOrder( - MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5))); + assertThat(value.get("B").readLater().read(), equalTo(2)); + assertNull(value.get("A").readLater().read()); + assertThat( + value.entries().readLater().read(), + containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5))); // clear value.clear(); - assertThat(value.iterate(), Matchers.emptyIterable()); + assertThat(value.entries().read(), Matchers.emptyIterable()); assertThat(underTest.state(NAMESPACE_1, STRING_MAP_ADDR), Matchers.sameInstance(value)); } http://git-wip-us.apache.org/repos/asf/beam/blob/604be670/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java index 142af32..68c6613 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java @@ -201,24 +201,24 @@ public class CopyOnAccessInMemoryStateInternalsTest { StateTag<Object, MapState<String, Integer>> valueTag = StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of()); MapState<String, Integer> underlyingValue = underlying.state(namespace, valueTag); - assertThat(underlyingValue.iterate(), emptyIterable()); + assertThat(underlyingValue.entries().read(), emptyIterable()); underlyingValue.put("hello", 1); - assertThat(underlyingValue.get("hello"), equalTo(1)); + assertThat(underlyingValue.get("hello").read(), equalTo(1)); CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); MapState<String, Integer> copyOnAccessState = internals.state(namespace, valueTag); - assertThat(copyOnAccessState.get("hello"), equalTo(1)); + assertThat(copyOnAccessState.get("hello").read(), equalTo(1)); copyOnAccessState.put("world", 4); - assertThat(copyOnAccessState.get("hello"), equalTo(1)); - assertThat(copyOnAccessState.get("world"), equalTo(4)); - assertThat(underlyingValue.get("hello"), equalTo(1)); - assertNull(underlyingValue.get("world")); + assertThat(copyOnAccessState.get("hello").read(), equalTo(1)); + assertThat(copyOnAccessState.get("world").read(), equalTo(4)); + assertThat(underlyingValue.get("hello").read(), equalTo(1)); + assertNull(underlyingValue.get("world").read()); MapState<String, Integer> reReadUnderlyingValue = underlying.state(namespace, valueTag); - assertThat(underlyingValue.iterate(), equalTo(reReadUnderlyingValue.iterate())); + assertThat(underlyingValue.entries().read(), equalTo(reReadUnderlyingValue.entries().read())); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/604be670/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java index 85d99d6..fb7e807 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java @@ -30,21 +30,20 @@ import java.util.Map; public interface MapState<K, V> extends State { /** - * Returns the value to which the specified key is mapped in the state. - */ - V get(K key); - - /** * Associates the specified value with the specified key in this state. */ void put(K key, V value); /** - * If the specified key is not already associated with a value (or is mapped - * to {@code null}) associates it with the given value and returns - * {@code null}, else returns the current value. + * A deferred read-followed-by-write. + * + * <p>When {@code read()} is called on the result or state is committed, it forces a read of the + * map and reconciliation with any pending modifications. + * + * <p>If the specified key is not already associated with a value (or is mapped to {@code null}) + * associates it with the given value and returns {@code null}, else returns the current value. */ - V putIfAbsent(K key, V value); + ReadableState<V> putIfAbsent(K key, V value); /** * Removes the mapping for a key from this map if it is present. @@ -52,42 +51,29 @@ public interface MapState<K, V> extends State { void remove(K key); /** - * A bulk get. - * @param keys the keys to search for - * @return a iterable view of values, maybe some values is null. - * The order of values corresponds to the order of the keys. + * A deferred lookup. + * + * <p>A user is encouraged to call {@code get} for all relevant keys and call {@code readLater()} + * on the results. + * + * <p>When {@code read()} is called, a particular state implementation is encouraged to perform + * all pending reads in a single batch. */ - Iterable<V> get(Iterable<K> keys); - - /** - * Indicate that specified key will be read later. - */ - MapState<K, V> getLater(K k); - - /** - * Indicate that specified batch keys will be read later. - */ - MapState<K, V> getLater(Iterable<K> keys); + ReadableState<V> get(K key); /** * Returns a iterable view of the keys contained in this map. */ - Iterable<K> keys(); + ReadableState<Iterable<K>> keys(); /** * Returns a iterable view of the values contained in this map. */ - Iterable<V> values(); - - /** - * Indicate that all key-values will be read later. - */ - MapState<K, V> iterateLater(); + ReadableState<Iterable<V>> values(); /** * Returns a iterable view of all key-values. */ - Iterable<Map.Entry<K, V>> iterate(); - + ReadableState<Iterable<Map.Entry<K, V>>> entries(); } http://git-wip-us.apache.org/repos/asf/beam/blob/604be670/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java new file mode 100644 index 0000000..819eda6 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.state; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * Utilities for constructing and manipulating {@link ReadableState} instances. + */ +@Experimental(Kind.STATE) +public class ReadableStates { + + /** + * A {@link ReadableState} constructed from a constant value, hence immediately available. + */ + public static <T> ReadableState<T> immediate(final T value) { + return new ReadableState<T>() { + @Override + public T read() { + return value; + } + + @Override + public ReadableState<T> readLater() { + return this; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/604be670/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java index 5c907d5..56ea510 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java @@ -27,45 +27,19 @@ public interface SetState<T> extends GroupingState<T, Iterable<T>> { /** * Returns true if this set contains the specified element. */ - boolean contains(T t); + ReadableState<Boolean> contains(T t); /** - * Add a value to the buffer if it is not already present. - * If this set already contains the element, the call leaves the set - * unchanged and returns false. + * Ensures a value is a member of the set, returning {@code true} if it was added and {@code + * false} otherwise. */ - boolean addIfAbsent(T t); + ReadableState<Boolean> addIfAbsent(T t); /** * Removes the specified element from this set if it is present. */ void remove(T t); - /** - * Indicate that elements will be read later. - * @param elements to be read later - * @return this for convenient chaining - */ - SetState<T> readLater(Iterable<T> elements); - - /** - * <p>Checks if SetState contains any given elements.</p> - * - * @param elements the elements to search for - * @return the {@code true} if any of the elements are found, - * {@code false} if no match - */ - boolean containsAny(Iterable<T> elements); - - /** - * <p>Checks if SetState contains all given elements.</p> - * - * @param elements the elements to find - * @return true if the SetState contains all elements, - * false if not - */ - boolean containsAll(Iterable<T> elements); - @Override SetState<T> readLater(); } http://git-wip-us.apache.org/repos/asf/beam/blob/604be670/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index e305da1..b429eab 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -2228,7 +2228,7 @@ public class ParDoTest implements Serializable { state.put(value.getKey(), value.getValue()); count.add(1); if (count.read() >= 4) { - Iterable<Map.Entry<String, Integer>> iterate = state.iterate(); + Iterable<Map.Entry<String, Integer>> iterate = state.entries().read(); for (Map.Entry<String, Integer> entry : iterate) { c.output(KV.of(entry.getKey(), entry.getValue())); } @@ -2274,7 +2274,7 @@ public class ParDoTest implements Serializable { state.put(value.getKey(), new MyInteger(value.getValue())); count.add(1); if (count.read() >= 4) { - Iterable<Map.Entry<String, MyInteger>> iterate = state.iterate(); + Iterable<Map.Entry<String, MyInteger>> iterate = state.entries().read(); for (Map.Entry<String, MyInteger> entry : iterate) { c.output(KV.of(entry.getKey(), entry.getValue())); } @@ -2320,7 +2320,7 @@ public class ParDoTest implements Serializable { state.put(value.getKey(), new MyInteger(value.getValue())); count.add(1); if (count.read() >= 4) { - Iterable<Map.Entry<String, MyInteger>> iterate = state.iterate(); + Iterable<Map.Entry<String, MyInteger>> iterate = state.entries().read(); for (Map.Entry<String, MyInteger> entry : iterate) { c.output(KV.of(entry.getKey(), entry.getValue())); }
