This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c84ccd385c2 callStateMap was accessed from multiple threads without 
synchronization. changing callStateMap to concurrent hashmap (#36886)
c84ccd385c2 is described below

commit c84ccd385c2f55568708da754e9bf446313f3d95
Author: RadosÅ‚aw Stankiewicz <[email protected]>
AuthorDate: Mon Nov 24 21:23:34 2025 +0100

    callStateMap was accessed from multiple threads without synchronization. 
changing callStateMap to concurrent hashmap (#36886)
---
 .../org/apache/beam/sdk/transforms/ParDoLifecycleTest.java    | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
index 02d67f5261f..21b4f64f924 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
@@ -32,9 +32,9 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -293,7 +293,7 @@ public class ParDoLifecycleTest implements Serializable {
 
   @Before
   public void setup() {
-    ExceptionThrowingFn.callStateMap = new HashMap<>();
+    ExceptionThrowingFn.callStateMap.clear();
     ExceptionThrowingFn.exceptionWasThrown.set(false);
   }
 
@@ -356,7 +356,7 @@ public class ParDoLifecycleTest implements Serializable {
   }
 
   private static class ExceptionThrowingFn<T> extends DoFn<T, T> {
-    static HashMap<Integer, DelayedCallStateTracker> callStateMap = new 
HashMap<>();
+    static Map<Integer, DelayedCallStateTracker> callStateMap = new 
ConcurrentHashMap<>();
     // exception is not necessarily thrown on every instance. But we expect at 
least
     // one during tests
     static AtomicBoolean exceptionWasThrown = new AtomicBoolean(false);
@@ -373,7 +373,10 @@ public class ParDoLifecycleTest implements Serializable {
       Map<Integer, DelayedCallStateTracker> callStates;
       synchronized (ExceptionThrowingFn.class) {
         callStates =
-            (Map<Integer, DelayedCallStateTracker>) 
ExceptionThrowingFn.callStateMap.clone();
+            (Map<Integer, DelayedCallStateTracker>)
+                Collections.synchronizedMap(
+                    ExceptionThrowingFn.callStateMap.entrySet().stream()
+                        .collect(Collectors.toMap(e -> e.getKey(), e -> 
e.getValue())));
       }
       assertThat(callStates, is(not(anEmptyMap())));
       // assert that callStateMap contains only TEARDOWN as a value. Note: We 
do not expect

Reply via email to