Demogorgon314 commented on code in PR #19708:
URL: https://github.com/apache/pulsar/pull/19708#discussion_r1128942918


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java:
##########
@@ -409,22 +413,23 @@ private static void 
filterDomainsNotHavingLeastNumberAntiAffinityNamespaces(
             return nsCount != null && nsCount != finalLeastNamespaceCount;
         });
     }
-
+    @FunctionalInterface
+    private interface CountAntiAffinityNamespaceOwnedBrokers{
+        void run(String broker, String namespace);
+    }
     /**
      * It returns map of broker and count of namespace that are belong to the 
same anti-affinity group as given.
      *
      * @param pulsar
      * @param namespaceName
-     * @param brokerToNamespaceToBundleRange
+     * @param bundleOwnershipData
      * @return
      */
     public static CompletableFuture<Map<String, Integer>> 
getAntiAffinityNamespaceOwnedBrokers(

Review Comment:
   Can we change the `bundleOwnershipData` to `Supplier<Map<String, String>> 
brokerToNsMapSupplier`?
   ```java
       public static CompletableFuture<Map<String, Integer>> 
getAntiAffinityNamespaceOwnedBrokers(
               final PulsarService pulsar, final String namespaceName,
               Supplier<Map<String, String>> brokerToNsMapSupplier) {
   
           CompletableFuture<Map<String, Integer>> antiAffinityNsBrokersResult 
= new CompletableFuture<>();
           
pulsar.getPulsarResources().getLocalPolicies().getLocalPoliciesAsync(NamespaceName.get(namespaceName))
                   .thenAccept(policies -> {
               if (!policies.isPresent() || 
StringUtils.isBlank(policies.get().namespaceAntiAffinityGroup)) {
                   antiAffinityNsBrokersResult.complete(null);
                   return;
               }
               final String antiAffinityGroup = 
policies.get().namespaceAntiAffinityGroup;
               final Map<String, Integer> brokerToAntiAffinityNamespaceCount = 
new ConcurrentHashMap<>();
               final List<CompletableFuture<Void>> futures = new ArrayList<>();
   
               Map<String, String> brokerToNsMap = brokerToNsMapSupplier.get();
               brokerToNsMap.forEach((broker, ns) -> {
                   CompletableFuture<Void> future = new CompletableFuture<>();
                   futures.add(future);
   
                   
pulsar.getPulsarResources().getLocalPolicies().getLocalPoliciesAsync(NamespaceName.get(ns))
                           .thenAccept(nsPolicies -> {
                               if (nsPolicies.isPresent()
                                       && 
antiAffinityGroup.equalsIgnoreCase(nsPolicies.get().namespaceAntiAffinityGroup))
 {
                                   
brokerToAntiAffinityNamespaceCount.compute(broker,
                                           (brokerName, count) -> count == null 
? 1 : count + 1);
                               }
                               future.complete(null);
                           }).exceptionally(ex -> {
                               future.complete(null);
                               return null;
                           });
               });
               FutureUtil.waitForAll(futures)
                       .thenAccept(r -> 
antiAffinityNsBrokersResult.complete(brokerToAntiAffinityNamespaceCount));
           }).exceptionally(ex -> {
               // namespace-policies has not been created yet
               antiAffinityNsBrokersResult.complete(null);
               return null;
           });
           return antiAffinityNsBrokersResult;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to