This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 61b8f416e61 Changes multimap entries Iterable to make a deep copy of
pending adds and deletes (#36759)
61b8f416e61 is described below
commit 61b8f416e61d81dcc37aafbdf55f0ecbd6ec9b23
Author: Andrew Crites <[email protected]>
AuthorDate: Thu Nov 13 11:08:50 2025 -0800
Changes multimap entries Iterable to make a deep copy of pending adds and
deletes (#36759)
---
.../apache/beam/fn/harness/state/MultimapUserState.java | 16 ++++++++++++----
1 file changed, 12 insertions(+), 4 deletions(-)
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
index 8e3d76f5fc8..83d78ff836c 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
@@ -47,8 +47,6 @@ import org.apache.beam.sdk.fn.stream.PrefetchableIterator;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
@@ -273,7 +271,12 @@ public class MultimapUserState<K, V> {
keysStateRequest.getStateKey());
// Make a deep copy of pendingAdds so this iterator represents a snapshot
of state at the time
// it was created.
- Map<Object, KV<K, List<V>>> pendingAddsNow =
ImmutableMap.copyOf(pendingAdds);
+ Map<Object, KV<K, List<V>>> pendingAddsNow = new HashMap<>();
+ for (Map.Entry<Object, KV<K, List<V>>> entry : pendingAdds.entrySet()) {
+ pendingAddsNow.put(
+ entry.getKey(),
+ KV.of(entry.getValue().getKey(), new
ArrayList<>(entry.getValue().getValue())));
+ }
if (isCleared) {
return PrefetchableIterables.maybePrefetchable(
Iterables.concat(
@@ -285,7 +288,12 @@ public class MultimapUserState<K, V> {
value ->
Maps.immutableEntry(entry.getValue().getKey(), value)))));
}
- Set<Object> pendingRemovesNow =
ImmutableSet.copyOf(pendingRemoves.keySet());
+ // Make a deep copy of pendingRemoves so this iterator represents a
snapshot of state at the
+ // time it was created.
+ Set<Object> pendingRemovesNow = new HashSet<>();
+ for (Object key : pendingRemoves.keySet()) {
+ pendingRemovesNow.add(key);
+ }
return new PrefetchableIterables.Default<Map.Entry<K, V>>() {
@Override
public PrefetchableIterator<Map.Entry<K, V>> createIterator() {