BewareMyPower commented on code in PR #23217:
URL: https://github.com/apache/pulsar/pull/23217#discussion_r1732187003


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -873,17 +857,7 @@ private void preallocateBundle(String bundle, String 
broker) {
 
         final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
         final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundle);
-        final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> 
namespaceToBundleRange =
-                brokerToNamespaceToBundleRange
-                        .computeIfAbsent(broker,
-                                k -> ConcurrentOpenHashMap.<String,
-                                        
ConcurrentOpenHashSet<String>>newBuilder()
-                                        .build());
-        synchronized (namespaceToBundleRange) {
-            namespaceToBundleRange.computeIfAbsent(namespaceName,
-                    k -> ConcurrentOpenHashSet.<String>newBuilder().build())
-                    .add(bundleRange);
-        }
+        brokerToNamespaceToBundleRange.addBundleRange(broker, namespaceName, 
bundleRange);
     }

Review Comment:
   Yes. The original synchronized block is added just because this place:
   
   
https://github.com/apache/pulsar/blob/1c495e190b3c569e9dfd44acef2a697c93a1f771/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java#L591-L595
   
   The code above has 1 `clear` call and multiple `computeIfAbsent` calls so 
it's synchronized. So even though `namespaceToBundleRange.computeIfAbsent` 
itself is thread safe, this call should also be synchronized with the code 
above before.
   
   For example,
   
   Thread 1:
   
   ```java
               synchronized (namespaceToBundleRange) {
                   namespaceToBundleRange.clear(); // A
                   
LoadManagerShared.fillNamespaceToBundlesMap(statsMap.keySet(), 
namespaceToBundleRange); // B
                   
LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundleData.keySet(),  
namespaceToBundleRange); // C
               }
   ```
   
   Thread 2:
   
   ```java
           synchronized (namespaceToBundleRange) {
               namespaceToBundleRange.computeIfAbsent(namespaceName,
                       k -> ConcurrentOpenHashSet.<String>newBuilder().build())
                       .add(bundleRange); // D
           }
   ```
   
   Since the map is a concurrent hash map, all A,B,C,D are thread safe with a 
strong happens-before relationship. However, without the `synchronized` in 
thread 2, the order could be `A -> B -> D -> C` or `A -> D -> B -> C`. The 
`synchronized` in thread 2 guarantees the order must be `A -> B -> C -> D` or 
`D -> A -> B -> C`.
   
   However, `reloadFromBundles` gets over the issue by pre-constructing a map 
before and using a single `put` call to guarantee the thread safety.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to