BewareMyPower commented on code in PR #23217:
URL: https://github.com/apache/pulsar/pull/23217#discussion_r1732192436


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -873,17 +857,7 @@ private void preallocateBundle(String bundle, String 
broker) {
 
         final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
         final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundle);
-        final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> 
namespaceToBundleRange =
-                brokerToNamespaceToBundleRange
-                        .computeIfAbsent(broker,
-                                k -> ConcurrentOpenHashMap.<String,
-                                        
ConcurrentOpenHashSet<String>>newBuilder()
-                                        .build());
-        synchronized (namespaceToBundleRange) {
-            namespaceToBundleRange.computeIfAbsent(namespaceName,
-                    k -> ConcurrentOpenHashSet.<String>newBuilder().build())
-                    .add(bundleRange);
-        }
+        brokerToNamespaceToBundleRange.addBundleRange(broker, namespaceName, 
bundleRange);
     }

Review Comment:
   Thread 1:
   
   ```java
       public void reloadFromBundles(String broker, Stream<String> bundles) {
           final var namespaceToBundleRange = new ConcurrentHashMap<String, 
Set<String>>();
           bundles.forEach(bundleName -> {
               final String namespace = 
LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
               final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundleName);
               initConcurrentHashSet(namespaceToBundleRange, 
namespace).add(bundleRange);
           }); // A (construct the map)
           data.put(broker, namespaceToBundleRange); // B
       }
   ```
   
   Thread 2:
   
   ```java
       public void addBundleRange(String broker, String namespace, String 
bundleRange) {
           getBundleRangeSet(broker, namespace).add(bundleRange); // C
       }
   ```
   
   With the current design, there are two cases:
   1. `B -> C`, then C will read the cache generated from A and add the 
bundleRange to it.
   2. `C -> B`, then C will add the bundle range to the old cache, which will 
be replaced by the cache generated from A via B.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to