This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 61b8f416e61 Changes multimap entries Iterable to make a deep copy of 
pending adds and deletes (#36759)
61b8f416e61 is described below

commit 61b8f416e61d81dcc37aafbdf55f0ecbd6ec9b23
Author: Andrew Crites <[email protected]>
AuthorDate: Thu Nov 13 11:08:50 2025 -0800

    Changes multimap entries Iterable to make a deep copy of pending adds and 
deletes (#36759)
---
 .../apache/beam/fn/harness/state/MultimapUserState.java  | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
index 8e3d76f5fc8..83d78ff836c 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
@@ -47,8 +47,6 @@ import org.apache.beam.sdk.fn.stream.PrefetchableIterator;
 import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
@@ -273,7 +271,12 @@ public class MultimapUserState<K, V> {
         keysStateRequest.getStateKey());
     // Make a deep copy of pendingAdds so this iterator represents a snapshot 
of state at the time
     // it was created.
-    Map<Object, KV<K, List<V>>> pendingAddsNow = 
ImmutableMap.copyOf(pendingAdds);
+    Map<Object, KV<K, List<V>>> pendingAddsNow = new HashMap<>();
+    for (Map.Entry<Object, KV<K, List<V>>> entry : pendingAdds.entrySet()) {
+      pendingAddsNow.put(
+          entry.getKey(),
+          KV.of(entry.getValue().getKey(), new 
ArrayList<>(entry.getValue().getValue())));
+    }
     if (isCleared) {
       return PrefetchableIterables.maybePrefetchable(
           Iterables.concat(
@@ -285,7 +288,12 @@ public class MultimapUserState<K, V> {
                           value -> 
Maps.immutableEntry(entry.getValue().getKey(), value)))));
     }
 
-    Set<Object> pendingRemovesNow = 
ImmutableSet.copyOf(pendingRemoves.keySet());
+    // Make a deep copy of pendingRemoves so this iterator represents a 
snapshot of state at the
+    // time it was created.
+    Set<Object> pendingRemovesNow = new HashSet<>();
+    for (Object key : pendingRemoves.keySet()) {
+      pendingRemovesNow.add(key);
+    }
     return new PrefetchableIterables.Default<Map.Entry<K, V>>() {
       @Override
       public PrefetchableIterator<Map.Entry<K, V>> createIterator() {

Reply via email to