rionmonster commented on code in PR #27459:
URL: https://github.com/apache/flink/pull/27459#discussion_r2721084658


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageResourceRegistry.java:
##########
@@ -20,19 +20,19 @@
 
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageDataIdentifier;
 
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
  * A registry that maintains local or remote resources that correspond to a 
certain set of data in
  * the Tiered Storage.
  */
 public class TieredStorageResourceRegistry {
 
-    private final Map<TieredStorageDataIdentifier, List<TieredStorageResource>>
-            registeredResources = new HashMap<>();
+    private final ConcurrentHashMap<
+                    TieredStorageDataIdentifier, 
CopyOnWriteArrayList<TieredStorageResource>>

Review Comment:
   That's a good question! 
   
   I thought the same when I initially applied the fix (e.g., only swapped out 
the external map for its thread-safe brethren), however realized the tests that 
were added would still fail. 
   
   The `ConcurrentHashMap` handles the thread-safety for the map operations but 
_not_ for the internal values within the map. This makes it possible to have 
multiple separate threads acting upon the non thread-safe list, which can lead 
to some inconsistency:
   
   ```
   registeredResources
         .computeIfAbsent(owner, (ignore) -> new ArrayList<>())
         // Concurrent callers could be working with the same thread-safe map, 
but
         // the underlying list is not thread-safe
         .add(tieredStorageResource);
   ```
   
   Without the extra thread-safety on the list, many of the existing tests can 
fail with `ConcurrentModificationException`, `NullPointerException`, and lost 
entries (which `testConcurrentRegisterWithSameIdentifier` specifically checks 
for). Making the swap to the `CopyOnWriteArrayList` (or some other thread-safe 
collection like `Collections.synchronizedList()`) makes the behavior consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to