This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c84ccd385c2 callStateMap was accessed from multiple threads without
synchronization. changing callStateMap to concurrent hashmap (#36886)
c84ccd385c2 is described below
commit c84ccd385c2f55568708da754e9bf446313f3d95
Author: Radosław Stankiewicz <[email protected]>
AuthorDate: Mon Nov 24 21:23:34 2025 +0100
callStateMap was accessed from multiple threads without synchronization.
changing callStateMap to concurrent hashmap (#36886)
---
.../org/apache/beam/sdk/transforms/ParDoLifecycleTest.java | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
index 02d67f5261f..21b4f64f924 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
@@ -32,9 +32,9 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -293,7 +293,7 @@ public class ParDoLifecycleTest implements Serializable {
@Before
public void setup() {
- ExceptionThrowingFn.callStateMap = new HashMap<>();
+ ExceptionThrowingFn.callStateMap.clear();
ExceptionThrowingFn.exceptionWasThrown.set(false);
}
@@ -356,7 +356,7 @@ public class ParDoLifecycleTest implements Serializable {
}
private static class ExceptionThrowingFn<T> extends DoFn<T, T> {
- static HashMap<Integer, DelayedCallStateTracker> callStateMap = new
HashMap<>();
+ static Map<Integer, DelayedCallStateTracker> callStateMap = new
ConcurrentHashMap<>();
// exception is not necessarily thrown on every instance. But we expect at
least
// one during tests
static AtomicBoolean exceptionWasThrown = new AtomicBoolean(false);
@@ -373,7 +373,10 @@ public class ParDoLifecycleTest implements Serializable {
Map<Integer, DelayedCallStateTracker> callStates;
synchronized (ExceptionThrowingFn.class) {
callStates =
- (Map<Integer, DelayedCallStateTracker>)
ExceptionThrowingFn.callStateMap.clone();
+ (Map<Integer, DelayedCallStateTracker>)
+ Collections.synchronizedMap(
+ ExceptionThrowingFn.callStateMap.entrySet().stream()
+ .collect(Collectors.toMap(e -> e.getKey(), e ->
e.getValue())));
}
assertThat(callStates, is(not(anEmptyMap())));
// assert that callStateMap contains only TEARDOWN as a value. Note: We
do not expect