This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 325c6a58d53 [fix][broker] Fix thread unsafe access on the bundle range 
cache for load manager (#23217)
325c6a58d53 is described below

commit 325c6a58d53b9e7b4fe31883ec47ae12c5abc71f
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Aug 28 19:26:45 2024 +0800

    [fix][broker] Fix thread unsafe access on the bundle range cache for load 
manager (#23217)
---
 .../broker/loadbalance/impl/BundleRangeCache.java  | 84 ++++++++++++++++++++++
 .../broker/loadbalance/impl/LoadManagerShared.java | 73 +++++--------------
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 40 ++---------
 .../loadbalance/impl/SimpleLoadManagerImpl.java    | 33 ++-------
 .../AntiAffinityNamespaceGroupTest.java            | 33 +++------
 .../loadbalance/impl/LoadManagerSharedTest.java    | 45 ++++--------
 6 files changed, 135 insertions(+), 173 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleRangeCache.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleRangeCache.java
new file mode 100644
index 00000000000..5cb92682232
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleRangeCache.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+
+/**
+ * The cache for the bundle ranges.
+ * The first key is the broker id and the second key is the namespace name, 
the value is the set of bundle ranges of
+ * that namespace. When the broker key is accessed if the associated value is 
not present, an empty map will be created
+ * as the initial value that will never be removed.
+ * Therefore, for each broker, there could only be one internal map during the 
whole lifetime. Then it will be safe
+ * to apply the synchronized key word on the value for thread safe operations.
+ */
+public class BundleRangeCache {
+
+    // Map from brokers to namespaces to the bundle ranges in that namespace 
assigned to that broker.
+    // Used to distribute bundles within a namespace evenly across brokers.
+    private final Map<String, Map<String, Set<String>>> data = new 
ConcurrentHashMap<>();
+
+    public void reloadFromBundles(String broker, Stream<String> bundles) {
+        final var namespaceToBundleRange = data.computeIfAbsent(broker, __ -> 
new HashMap<>());
+        synchronized (namespaceToBundleRange) {
+            namespaceToBundleRange.clear();
+            bundles.forEach(bundleName -> {
+                final String namespace = 
LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
+                final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundleName);
+                namespaceToBundleRange.computeIfAbsent(namespace, __ -> new 
HashSet<>()).add(bundleRange);
+            });
+        }
+    }
+
+    public void add(String broker, String namespace, String bundleRange) {
+        final var namespaceToBundleRange = data.computeIfAbsent(broker, __ -> 
new HashMap<>());
+        synchronized (namespaceToBundleRange) {
+            namespaceToBundleRange.computeIfAbsent(namespace, __ -> new 
HashSet<>()).add(bundleRange);
+        }
+    }
+
+    public int getBundleRangeCount(String broker, String namespace) {
+        final var namespaceToBundleRange = data.computeIfAbsent(broker, __ -> 
new HashMap<>());
+        synchronized (namespaceToBundleRange) {
+            final var bundleRangeSet = namespaceToBundleRange.get(namespace);
+            return bundleRangeSet != null ? bundleRangeSet.size() : 0;
+        }
+    }
+
+    /**
+     * Get the map whose key is the broker and value is the namespace that has 
at least 1 cached bundle range.
+     */
+    public Map<String, List<String>> getBrokerToNamespacesMap() {
+        final var brokerToNamespaces = new HashMap<String, List<String>>();
+        for (var entry : data.entrySet()) {
+            final var broker = entry.getKey();
+            final var namespaceToBundleRange = entry.getValue();
+            synchronized (namespaceToBundleRange) {
+                brokerToNamespaces.put(broker, 
namespaceToBundleRange.keySet().stream().toList());
+            }
+        }
+        return brokerToNamespaces;
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index 3d627db6cfa..7ca2b926db7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -46,8 +46,6 @@ import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.policies.data.FailureDomainImpl;
 import org.apache.pulsar.common.util.DirectMemoryUtils;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
@@ -282,24 +280,6 @@ public class LoadManagerShared {
             return brokerCandidateCache;
         });
     }
-    /**
-     * Using the given bundles, populate the namespace to bundle range map.
-     *
-     * @param bundles
-     *            Bundles with which to populate.
-     * @param target
-     *            Map to fill.
-     */
-    public static void fillNamespaceToBundlesMap(final Set<String> bundles,
-            final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> 
target) {
-        bundles.forEach(bundleName -> {
-            final String namespaceName = 
getNamespaceNameFromBundleName(bundleName);
-            final String bundleRange = 
getBundleRangeFromBundleName(bundleName);
-            target.computeIfAbsent(namespaceName,
-                    k -> ConcurrentOpenHashSet.<String>newBuilder().build())
-                    .add(bundleRange);
-        });
-    }
 
     // From a full bundle name, extract the bundle range.
     public static String getBundleRangeFromBundleName(String bundleName) {
@@ -359,8 +339,7 @@ public class LoadManagerShared {
     public static void removeMostServicingBrokersForNamespace(
             final String assignedBundleName,
             final Set<String> candidates,
-            final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, 
ConcurrentOpenHashSet<String>>>
-                    brokerToNamespaceToBundleRange) {
+            final BundleRangeCache brokerToNamespaceToBundleRange) {
         if (candidates.isEmpty()) {
             return;
         }
@@ -369,13 +348,7 @@ public class LoadManagerShared {
         int leastBundles = Integer.MAX_VALUE;
 
         for (final String broker : candidates) {
-            int bundles = (int) brokerToNamespaceToBundleRange
-                    .computeIfAbsent(broker,
-                            k -> ConcurrentOpenHashMap.<String,
-                                    
ConcurrentOpenHashSet<String>>newBuilder().build())
-                    .computeIfAbsent(namespaceName,
-                            k -> 
ConcurrentOpenHashSet.<String>newBuilder().build())
-                    .size();
+            int bundles = 
brokerToNamespaceToBundleRange.getBundleRangeCount(broker, namespaceName);
             leastBundles = Math.min(leastBundles, bundles);
             if (leastBundles == 0) {
                 break;
@@ -386,13 +359,8 @@ public class LoadManagerShared {
         // `leastBundles` may differ from the actual value.
 
         final int finalLeastBundles = leastBundles;
-        candidates.removeIf(
-                broker -> 
brokerToNamespaceToBundleRange.computeIfAbsent(broker,
-                        k -> ConcurrentOpenHashMap.<String,
-                                
ConcurrentOpenHashSet<String>>newBuilder().build())
-                        .computeIfAbsent(namespaceName,
-                                k -> 
ConcurrentOpenHashSet.<String>newBuilder().build())
-                        .size() > finalLeastBundles);
+        candidates.removeIf(broker ->
+                brokerToNamespaceToBundleRange.getBundleRangeCount(broker, 
namespaceName) > finalLeastBundles);
     }
 
     /**
@@ -426,8 +394,7 @@ public class LoadManagerShared {
     public static void filterAntiAffinityGroupOwnedBrokers(
             final PulsarService pulsar, final String assignedBundleName,
             final Set<String> candidates,
-            final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, 
ConcurrentOpenHashSet<String>>>
-                    brokerToNamespaceToBundleRange,
+            final BundleRangeCache brokerToNamespaceToBundleRange,
             Map<String, String> brokerToDomainMap) {
         if (candidates.isEmpty()) {
             return;
@@ -572,8 +539,7 @@ public class LoadManagerShared {
      */
     public static CompletableFuture<Map<String, Integer>> 
getAntiAffinityNamespaceOwnedBrokers(
             final PulsarService pulsar, final String namespaceName,
-            final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, 
ConcurrentOpenHashSet<String>>>
-                    brokerToNamespaceToBundleRange) {
+            final BundleRangeCache brokerToNamespaceToBundleRange) {
 
         CompletableFuture<Map<String, Integer>> antiAffinityNsBrokersResult = 
new CompletableFuture<>();
         getNamespaceAntiAffinityGroupAsync(pulsar, namespaceName)
@@ -584,21 +550,16 @@ public class LoadManagerShared {
             }
             final String antiAffinityGroup = antiAffinityGroupOptional.get();
             final Map<String, Integer> brokerToAntiAffinityNamespaceCount = 
new ConcurrentHashMap<>();
-            final List<CompletableFuture<Void>> futures = new ArrayList<>();
-            brokerToNamespaceToBundleRange.forEach((broker, nsToBundleRange) 
-> {
-                nsToBundleRange.forEach((ns, bundleRange) -> {
-                    if (bundleRange.isEmpty()) {
-                        return;
-                    }
-
-                    CompletableFuture<Void> future = new CompletableFuture<>();
-                    futures.add(future);
-                    countAntiAffinityNamespaceOwnedBrokers(broker, ns, future,
+            final var brokerToNamespaces = 
brokerToNamespaceToBundleRange.getBrokerToNamespacesMap();
+            
FutureUtil.waitForAll(brokerToNamespaces.entrySet().stream().flatMap(e -> {
+                final var broker = e.getKey();
+                return e.getValue().stream().map(namespace -> {
+                    final var future = new CompletableFuture<Void>();
+                    countAntiAffinityNamespaceOwnedBrokers(broker, namespace, 
future,
                             pulsar, antiAffinityGroup, 
brokerToAntiAffinityNamespaceCount);
+                    return future;
                 });
-            });
-            FutureUtil.waitForAll(futures)
-                    .thenAccept(r -> 
antiAffinityNsBrokersResult.complete(brokerToAntiAffinityNamespaceCount));
+            }).toList()).thenAccept(__ -> 
antiAffinityNsBrokersResult.complete(brokerToAntiAffinityNamespaceCount));
         }).exceptionally(ex -> {
             // namespace-policies has not been created yet
             antiAffinityNsBrokersResult.complete(null);
@@ -698,7 +659,6 @@ public class LoadManagerShared {
      * by different broker.
      *
      * @param namespace
-     * @param bundle
      * @param currentBroker
      * @param pulsar
      * @param brokerToNamespaceToBundleRange
@@ -707,10 +667,9 @@ public class LoadManagerShared {
      * @throws Exception
      */
     public static boolean shouldAntiAffinityNamespaceUnload(
-            String namespace, String bundle, String currentBroker,
+            String namespace, String currentBroker,
             final PulsarService pulsar,
-            final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, 
ConcurrentOpenHashSet<String>>>
-                    brokerToNamespaceToBundleRange,
+            final BundleRangeCache brokerToNamespaceToBundleRange,
             Set<String> candidateBrokers) throws Exception {
 
         Map<String, Integer> brokerNamespaceCount = 
getAntiAffinityNamespaceOwnedBrokers(pulsar, namespace,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 05c984d0349..48a6121b9dd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.SystemUtils;
@@ -72,8 +73,6 @@ import org.apache.pulsar.common.policies.data.ResourceQuota;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.Reflections;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.apache.pulsar.metadata.api.Notification;
@@ -116,10 +115,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
     // Broker host usage object used to calculate system resource usage.
     private BrokerHostUsage brokerHostUsage;
 
-    // Map from brokers to namespaces to the bundle ranges in that namespace 
assigned to that broker.
-    // Used to distribute bundles within a namespace evenly across brokers.
-    private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, 
ConcurrentOpenHashSet<String>>>
-            brokerToNamespaceToBundleRange;
+    private final BundleRangeCache brokerToNamespaceToBundleRange = new 
BundleRangeCache();
 
     // Path to the ZNode containing the LocalBrokerData json for this broker.
     private String brokerZnodePath;
@@ -199,10 +195,6 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
      */
     public ModularLoadManagerImpl() {
         brokerCandidateCache = new HashSet<>();
-        brokerToNamespaceToBundleRange =
-                ConcurrentOpenHashMap.<String,
-                        ConcurrentOpenHashMap<String, 
ConcurrentOpenHashSet<String>>>newBuilder()
-                        .build();
         defaultStats = new NamespaceBundleStats();
         filterPipeline = new ArrayList<>();
         loadData = new LoadData();
@@ -582,17 +574,9 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
             TimeAverageBrokerData timeAverageData = new 
TimeAverageBrokerData();
             timeAverageData.reset(statsMap.keySet(), bundleData, defaultStats);
             brokerData.setTimeAverageData(timeAverageData);
-            final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> 
namespaceToBundleRange =
-                    brokerToNamespaceToBundleRange
-                            .computeIfAbsent(broker, k ->
-                                    ConcurrentOpenHashMap.<String,
-                                            
ConcurrentOpenHashSet<String>>newBuilder()
-                                            .build());
-            synchronized (namespaceToBundleRange) {
-                namespaceToBundleRange.clear();
-                LoadManagerShared.fillNamespaceToBundlesMap(statsMap.keySet(), 
namespaceToBundleRange);
-                
LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundleData.keySet(), 
namespaceToBundleRange);
-            }
+
+            brokerToNamespaceToBundleRange.reloadFromBundles(broker,
+                    Stream.of(statsMap.keySet(), 
preallocatedBundleData.keySet()).flatMap(Collection::stream));
         }
 
         // Remove not active bundle from loadData
@@ -736,7 +720,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
                         .getBundle(namespace, bundle);
                 LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache,
                         getAvailableBrokers(), brokerTopicLoadingPredicate);
-                return 
LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle, 
currentBroker, pulsar,
+                return 
LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, currentBroker, 
pulsar,
                         brokerToNamespaceToBundleRange, brokerCandidateCache);
             }
 
@@ -873,17 +857,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
 
         final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
         final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundle);
-        final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> 
namespaceToBundleRange =
-                brokerToNamespaceToBundleRange
-                        .computeIfAbsent(broker,
-                                k -> ConcurrentOpenHashMap.<String,
-                                        
ConcurrentOpenHashSet<String>>newBuilder()
-                                        .build());
-        synchronized (namespaceToBundleRange) {
-            namespaceToBundleRange.computeIfAbsent(namespaceName,
-                    k -> ConcurrentOpenHashSet.<String>newBuilder().build())
-                    .add(bundleRange);
-        }
+        brokerToNamespaceToBundleRange.add(broker, namespaceName, bundleRange);
     }
 
     @VisibleForTesting
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index be0580808ca..30a7359ce0e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
 import com.google.common.collect.TreeMultimap;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -47,6 +48,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.stream.Stream;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -62,8 +64,6 @@ import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.policies.data.ResourceQuota;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Notification;
@@ -107,10 +107,7 @@ public class SimpleLoadManagerImpl implements LoadManager, 
Consumer<Notification
     private final Set<String> bundleGainsCache;
     private final Set<String> bundleLossesCache;
 
-    // Map from brokers to namespaces to the bundle ranges in that namespace 
assigned to that broker.
-    // Used to distribute bundles within a namespace evenly across brokers.
-    private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String,
-            ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange;
+    private final BundleRangeCache brokerToNamespaceToBundleRange = new 
BundleRangeCache();
 
     // CPU usage per msg/sec
     private double realtimeCpuLoadFactor = 0.025;
@@ -205,10 +202,6 @@ public class SimpleLoadManagerImpl implements LoadManager, 
Consumer<Notification
         bundleLossesCache = new HashSet<>();
         brokerCandidateCache = new HashSet<>();
         availableBrokersCache = new HashSet<>();
-        brokerToNamespaceToBundleRange =
-                ConcurrentOpenHashMap.<String,
-                        ConcurrentOpenHashMap<String, 
ConcurrentOpenHashSet<String>>>newBuilder()
-                        .build();
         this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
             @Override
             public boolean isEnablePersistentTopics(String brokerId) {
@@ -853,14 +846,7 @@ public class SimpleLoadManagerImpl implements LoadManager, 
Consumer<Notification
                 ResourceQuota quota = this.getResourceQuota(serviceUnitId);
                 // Add preallocated bundle range so incoming bundles from the 
same namespace are not assigned to the
                 // same broker.
-                brokerToNamespaceToBundleRange
-                        .computeIfAbsent(selectedRU.getResourceId(),
-                                k -> ConcurrentOpenHashMap.<String,
-                                        
ConcurrentOpenHashSet<String>>newBuilder()
-                                        .build())
-                        .computeIfAbsent(namespaceName, k ->
-                                
ConcurrentOpenHashSet.<String>newBuilder().build())
-                        .add(bundleRange);
+                brokerToNamespaceToBundleRange.add(selectedRU.getResourceId(), 
namespaceName, bundleRange);
                 ranking.addPreAllocatedServiceUnit(serviceUnitId, quota);
                 resourceUnitRankings.put(selectedRU, ranking);
             }
@@ -1272,15 +1258,8 @@ public class SimpleLoadManagerImpl implements 
LoadManager, Consumer<Notification
             final String broker = resourceUnit.getResourceId();
             final Set<String> loadedBundles = ranking.getLoadedBundles();
             final Set<String> preallocatedBundles = 
resourceUnitRankings.get(resourceUnit).getPreAllocatedBundles();
-            final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> 
namespaceToBundleRange =
-                    brokerToNamespaceToBundleRange
-                            .computeIfAbsent(broker,
-                                    k -> ConcurrentOpenHashMap.<String,
-                                            
ConcurrentOpenHashSet<String>>newBuilder()
-                                            .build());
-            namespaceToBundleRange.clear();
-            LoadManagerShared.fillNamespaceToBundlesMap(loadedBundles, 
namespaceToBundleRange);
-            LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundles, 
namespaceToBundleRange);
+            brokerToNamespaceToBundleRange.reloadFromBundles(broker,
+                    Stream.of(loadedBundles, 
preallocatedBundles).flatMap(Collection::stream));
         });
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index fc2fec96294..f1e462c4ec7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
+import org.apache.pulsar.broker.loadbalance.impl.BundleRangeCache;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
@@ -59,8 +60,6 @@ import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
@@ -167,12 +166,10 @@ public class AntiAffinityNamespaceGroupTest extends 
MockedPulsarServiceBaseTest
         }
     }
 
-
     protected Object getBundleOwnershipData(){
-        return ConcurrentOpenHashMap.<String, ConcurrentOpenHashMap<String, 
ConcurrentOpenHashSet<String>>>newBuilder().build();
+        return new BundleRangeCache();
     }
 
-
     protected String getLoadManagerClassName() {
         return ModularLoadManagerImpl.class.getName();
     }
@@ -366,17 +363,8 @@ public class AntiAffinityNamespaceGroupTest extends 
MockedPulsarServiceBaseTest
             Object ownershipData,
             String broker, String namespace, String assignedBundleName) {
 
-        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, 
ConcurrentOpenHashSet<String>>>
-                brokerToNamespaceToBundleRange =
-                (ConcurrentOpenHashMap<String,
-                        ConcurrentOpenHashMap<String, 
ConcurrentOpenHashSet<String>>>) ownershipData;
-        ConcurrentOpenHashSet<String> bundleSet =
-                ConcurrentOpenHashSet.<String>newBuilder().build();
-        bundleSet.add(assignedBundleName);
-        ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> 
nsToBundleMap =
-                ConcurrentOpenHashMap.<String, 
ConcurrentOpenHashSet<String>>newBuilder().build();
-        nsToBundleMap.put(namespace, bundleSet);
-        brokerToNamespaceToBundleRange.put(broker, nsToBundleMap);
+        final var brokerToNamespaceToBundleRange = (BundleRangeCache) 
ownershipData;
+        brokerToNamespaceToBundleRange.add(broker, namespace, 
assignedBundleName);
     }
 
     /**
@@ -562,10 +550,9 @@ public class AntiAffinityNamespaceGroupTest extends 
MockedPulsarServiceBaseTest
         if (ownershipData instanceof Set) {
             LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, 
assignedNamespace, brokers,
                     (Set<Map.Entry<String, ServiceUnitStateData>>) 
ownershipData, brokerToDomainMap);
-        } else if (ownershipData instanceof ConcurrentOpenHashMap) {
+        } else if (ownershipData instanceof BundleRangeCache) {
             LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, 
assignedNamespace, brokers,
-                    (ConcurrentOpenHashMap<String, 
ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>)
-                            ownershipData, brokerToDomainMap);
+                    (BundleRangeCache) ownershipData, brokerToDomainMap);
         } else {
             throw new RuntimeException("Unknown ownershipData class type");
         }
@@ -582,11 +569,9 @@ public class AntiAffinityNamespaceGroupTest extends 
MockedPulsarServiceBaseTest
         if (ownershipData instanceof Set) {
             return 
LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle,
                     currentBroker, pulsar, (Set<Map.Entry<String, 
ServiceUnitStateData>>) ownershipData, candidate);
-        } else if (ownershipData instanceof ConcurrentOpenHashMap) {
-            return 
LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle,
-                    currentBroker, pulsar,
-                    (ConcurrentOpenHashMap<String, 
ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>)
-                            ownershipData, candidate);
+        } else if (ownershipData instanceof BundleRangeCache) {
+            return 
LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, currentBroker, 
pulsar,
+                    (BundleRangeCache) ownershipData, candidate);
         } else {
             throw new RuntimeException("Unknown ownershipData class type");
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java
index 8bc097779b0..465e8e2d852 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java
@@ -18,13 +18,9 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
+import com.google.common.collect.Sets;
 import java.util.HashSet;
 import java.util.Set;
-
-import com.google.common.collect.Sets;
-
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -37,59 +33,44 @@ public class LoadManagerSharedTest {
         String assignedBundle = namespace + "/0x00000000_0x40000000";
 
         Set<String> candidates = new HashSet<>();
-        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, 
ConcurrentOpenHashSet<String>>> map =
-                ConcurrentOpenHashMap.<String,
-                        ConcurrentOpenHashMap<String, 
ConcurrentOpenHashSet<String>>>newBuilder()
-                        .build();
-        
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, 
candidates, map);
+        final var cache = new BundleRangeCache();
+        
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, 
candidates, cache);
         Assert.assertEquals(candidates.size(), 0);
 
         candidates = Sets.newHashSet("broker1");
-        
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, 
candidates, map);
+        
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, 
candidates, cache);
         Assert.assertEquals(candidates.size(), 1);
         Assert.assertTrue(candidates.contains("broker1"));
 
         candidates = Sets.newHashSet("broker1");
-        fillBrokerToNamespaceToBundleMap(map, "broker1", namespace, 
"0x40000000_0x80000000");
-        
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, 
candidates, map);
+        cache.add("broker1", namespace, "0x40000000_0x80000000");
+        
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, 
candidates, cache);
         Assert.assertEquals(candidates.size(), 1);
         Assert.assertTrue(candidates.contains("broker1"));
 
         candidates = Sets.newHashSet("broker1", "broker2");
-        
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, 
candidates, map);
+        
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, 
candidates, cache);
         Assert.assertEquals(candidates.size(), 1);
         Assert.assertTrue(candidates.contains("broker2"));
 
         candidates = Sets.newHashSet("broker1", "broker2");
-        fillBrokerToNamespaceToBundleMap(map, "broker2", namespace, 
"0x80000000_0xc0000000");
-        
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, 
candidates, map);
+        cache.add("broker2", namespace, "0x80000000_0xc0000000");
+        
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, 
candidates, cache);
         Assert.assertEquals(candidates.size(), 2);
         Assert.assertTrue(candidates.contains("broker1"));
         Assert.assertTrue(candidates.contains("broker2"));
 
         candidates = Sets.newHashSet("broker1", "broker2");
-        fillBrokerToNamespaceToBundleMap(map, "broker2", namespace, 
"0xc0000000_0xd0000000");
-        
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, 
candidates, map);
+        cache.add("broker2", namespace, "0xc0000000_0xd0000000");
+        
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, 
candidates, cache);
         Assert.assertEquals(candidates.size(), 1);
         Assert.assertTrue(candidates.contains("broker1"));
 
         candidates = Sets.newHashSet("broker1", "broker2", "broker3");
-        fillBrokerToNamespaceToBundleMap(map, "broker3", namespace, 
"0xd0000000_0xffffffff");
-        
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, 
candidates, map);
+        cache.add("broker3", namespace, "0xd0000000_0xffffffff");
+        
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, 
candidates, cache);
         Assert.assertEquals(candidates.size(), 2);
         Assert.assertTrue(candidates.contains("broker1"));
         Assert.assertTrue(candidates.contains("broker3"));
     }
-
-    private static void fillBrokerToNamespaceToBundleMap(
-            ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, 
ConcurrentOpenHashSet<String>>> map,
-            String broker, String namespace, String bundle) {
-        map.computeIfAbsent(broker,
-                k -> ConcurrentOpenHashMap.<String,
-                        ConcurrentOpenHashSet<String>>newBuilder().build())
-                .computeIfAbsent(namespace,
-                        k -> 
ConcurrentOpenHashSet.<String>newBuilder().build())
-                .add(bundle);
-    }
-
 }


Reply via email to