This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 325c6a58d53 [fix][broker] Fix thread unsafe access on the bundle range
cache for load manager (#23217)
325c6a58d53 is described below
commit 325c6a58d53b9e7b4fe31883ec47ae12c5abc71f
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Aug 28 19:26:45 2024 +0800
[fix][broker] Fix thread unsafe access on the bundle range cache for load
manager (#23217)
---
.../broker/loadbalance/impl/BundleRangeCache.java | 84 ++++++++++++++++++++++
.../broker/loadbalance/impl/LoadManagerShared.java | 73 +++++--------------
.../loadbalance/impl/ModularLoadManagerImpl.java | 40 ++---------
.../loadbalance/impl/SimpleLoadManagerImpl.java | 33 ++-------
.../AntiAffinityNamespaceGroupTest.java | 33 +++------
.../loadbalance/impl/LoadManagerSharedTest.java | 45 ++++--------
6 files changed, 135 insertions(+), 173 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleRangeCache.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleRangeCache.java
new file mode 100644
index 00000000000..5cb92682232
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleRangeCache.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+
+/**
+ * The cache for the bundle ranges.
+ * The first key is the broker id and the second key is the namespace name,
the value is the set of bundle ranges of
+ * that namespace. When the broker key is accessed if the associated value is
not present, an empty map will be created
+ * as the initial value that will never be removed.
+ * Therefore, for each broker, there could only be one internal map during the
whole lifetime. Then it will be safe
+ * to apply the synchronized key word on the value for thread safe operations.
+ */
+public class BundleRangeCache {
+
+ // Map from brokers to namespaces to the bundle ranges in that namespace
assigned to that broker.
+ // Used to distribute bundles within a namespace evenly across brokers.
+ private final Map<String, Map<String, Set<String>>> data = new
ConcurrentHashMap<>();
+
+ public void reloadFromBundles(String broker, Stream<String> bundles) {
+ final var namespaceToBundleRange = data.computeIfAbsent(broker, __ ->
new HashMap<>());
+ synchronized (namespaceToBundleRange) {
+ namespaceToBundleRange.clear();
+ bundles.forEach(bundleName -> {
+ final String namespace =
LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
+ final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundleName);
+ namespaceToBundleRange.computeIfAbsent(namespace, __ -> new
HashSet<>()).add(bundleRange);
+ });
+ }
+ }
+
+ public void add(String broker, String namespace, String bundleRange) {
+ final var namespaceToBundleRange = data.computeIfAbsent(broker, __ ->
new HashMap<>());
+ synchronized (namespaceToBundleRange) {
+ namespaceToBundleRange.computeIfAbsent(namespace, __ -> new
HashSet<>()).add(bundleRange);
+ }
+ }
+
+ public int getBundleRangeCount(String broker, String namespace) {
+ final var namespaceToBundleRange = data.computeIfAbsent(broker, __ ->
new HashMap<>());
+ synchronized (namespaceToBundleRange) {
+ final var bundleRangeSet = namespaceToBundleRange.get(namespace);
+ return bundleRangeSet != null ? bundleRangeSet.size() : 0;
+ }
+ }
+
+ /**
+ * Get the map whose key is the broker and value is the namespace that has
at least 1 cached bundle range.
+ */
+ public Map<String, List<String>> getBrokerToNamespacesMap() {
+ final var brokerToNamespaces = new HashMap<String, List<String>>();
+ for (var entry : data.entrySet()) {
+ final var broker = entry.getKey();
+ final var namespaceToBundleRange = entry.getValue();
+ synchronized (namespaceToBundleRange) {
+ brokerToNamespaces.put(broker,
namespaceToBundleRange.keySet().stream().toList());
+ }
+ }
+ return brokerToNamespaces;
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index 3d627db6cfa..7ca2b926db7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -46,8 +46,6 @@ import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
@@ -282,24 +280,6 @@ public class LoadManagerShared {
return brokerCandidateCache;
});
}
- /**
- * Using the given bundles, populate the namespace to bundle range map.
- *
- * @param bundles
- * Bundles with which to populate.
- * @param target
- * Map to fill.
- */
- public static void fillNamespaceToBundlesMap(final Set<String> bundles,
- final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>
target) {
- bundles.forEach(bundleName -> {
- final String namespaceName =
getNamespaceNameFromBundleName(bundleName);
- final String bundleRange =
getBundleRangeFromBundleName(bundleName);
- target.computeIfAbsent(namespaceName,
- k -> ConcurrentOpenHashSet.<String>newBuilder().build())
- .add(bundleRange);
- });
- }
// From a full bundle name, extract the bundle range.
public static String getBundleRangeFromBundleName(String bundleName) {
@@ -359,8 +339,7 @@ public class LoadManagerShared {
public static void removeMostServicingBrokersForNamespace(
final String assignedBundleName,
final Set<String> candidates,
- final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String,
ConcurrentOpenHashSet<String>>>
- brokerToNamespaceToBundleRange) {
+ final BundleRangeCache brokerToNamespaceToBundleRange) {
if (candidates.isEmpty()) {
return;
}
@@ -369,13 +348,7 @@ public class LoadManagerShared {
int leastBundles = Integer.MAX_VALUE;
for (final String broker : candidates) {
- int bundles = (int) brokerToNamespaceToBundleRange
- .computeIfAbsent(broker,
- k -> ConcurrentOpenHashMap.<String,
-
ConcurrentOpenHashSet<String>>newBuilder().build())
- .computeIfAbsent(namespaceName,
- k ->
ConcurrentOpenHashSet.<String>newBuilder().build())
- .size();
+ int bundles =
brokerToNamespaceToBundleRange.getBundleRangeCount(broker, namespaceName);
leastBundles = Math.min(leastBundles, bundles);
if (leastBundles == 0) {
break;
@@ -386,13 +359,8 @@ public class LoadManagerShared {
// `leastBundles` may differ from the actual value.
final int finalLeastBundles = leastBundles;
- candidates.removeIf(
- broker ->
brokerToNamespaceToBundleRange.computeIfAbsent(broker,
- k -> ConcurrentOpenHashMap.<String,
-
ConcurrentOpenHashSet<String>>newBuilder().build())
- .computeIfAbsent(namespaceName,
- k ->
ConcurrentOpenHashSet.<String>newBuilder().build())
- .size() > finalLeastBundles);
+ candidates.removeIf(broker ->
+ brokerToNamespaceToBundleRange.getBundleRangeCount(broker,
namespaceName) > finalLeastBundles);
}
/**
@@ -426,8 +394,7 @@ public class LoadManagerShared {
public static void filterAntiAffinityGroupOwnedBrokers(
final PulsarService pulsar, final String assignedBundleName,
final Set<String> candidates,
- final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String,
ConcurrentOpenHashSet<String>>>
- brokerToNamespaceToBundleRange,
+ final BundleRangeCache brokerToNamespaceToBundleRange,
Map<String, String> brokerToDomainMap) {
if (candidates.isEmpty()) {
return;
@@ -572,8 +539,7 @@ public class LoadManagerShared {
*/
public static CompletableFuture<Map<String, Integer>>
getAntiAffinityNamespaceOwnedBrokers(
final PulsarService pulsar, final String namespaceName,
- final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String,
ConcurrentOpenHashSet<String>>>
- brokerToNamespaceToBundleRange) {
+ final BundleRangeCache brokerToNamespaceToBundleRange) {
CompletableFuture<Map<String, Integer>> antiAffinityNsBrokersResult =
new CompletableFuture<>();
getNamespaceAntiAffinityGroupAsync(pulsar, namespaceName)
@@ -584,21 +550,16 @@ public class LoadManagerShared {
}
final String antiAffinityGroup = antiAffinityGroupOptional.get();
final Map<String, Integer> brokerToAntiAffinityNamespaceCount =
new ConcurrentHashMap<>();
- final List<CompletableFuture<Void>> futures = new ArrayList<>();
- brokerToNamespaceToBundleRange.forEach((broker, nsToBundleRange)
-> {
- nsToBundleRange.forEach((ns, bundleRange) -> {
- if (bundleRange.isEmpty()) {
- return;
- }
-
- CompletableFuture<Void> future = new CompletableFuture<>();
- futures.add(future);
- countAntiAffinityNamespaceOwnedBrokers(broker, ns, future,
+ final var brokerToNamespaces =
brokerToNamespaceToBundleRange.getBrokerToNamespacesMap();
+
FutureUtil.waitForAll(brokerToNamespaces.entrySet().stream().flatMap(e -> {
+ final var broker = e.getKey();
+ return e.getValue().stream().map(namespace -> {
+ final var future = new CompletableFuture<Void>();
+ countAntiAffinityNamespaceOwnedBrokers(broker, namespace,
future,
pulsar, antiAffinityGroup,
brokerToAntiAffinityNamespaceCount);
+ return future;
});
- });
- FutureUtil.waitForAll(futures)
- .thenAccept(r ->
antiAffinityNsBrokersResult.complete(brokerToAntiAffinityNamespaceCount));
+ }).toList()).thenAccept(__ ->
antiAffinityNsBrokersResult.complete(brokerToAntiAffinityNamespaceCount));
}).exceptionally(ex -> {
// namespace-policies has not been created yet
antiAffinityNsBrokersResult.complete(null);
@@ -698,7 +659,6 @@ public class LoadManagerShared {
* by different broker.
*
* @param namespace
- * @param bundle
* @param currentBroker
* @param pulsar
* @param brokerToNamespaceToBundleRange
@@ -707,10 +667,9 @@ public class LoadManagerShared {
* @throws Exception
*/
public static boolean shouldAntiAffinityNamespaceUnload(
- String namespace, String bundle, String currentBroker,
+ String namespace, String currentBroker,
final PulsarService pulsar,
- final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String,
ConcurrentOpenHashSet<String>>>
- brokerToNamespaceToBundleRange,
+ final BundleRangeCache brokerToNamespaceToBundleRange,
Set<String> candidateBrokers) throws Exception {
Map<String, Integer> brokerNamespaceCount =
getAntiAffinityNamespaceOwnedBrokers(pulsar, namespace,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 05c984d0349..48a6121b9dd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
@@ -72,8 +73,6 @@ import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Reflections;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.Notification;
@@ -116,10 +115,7 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
// Broker host usage object used to calculate system resource usage.
private BrokerHostUsage brokerHostUsage;
- // Map from brokers to namespaces to the bundle ranges in that namespace
assigned to that broker.
- // Used to distribute bundles within a namespace evenly across brokers.
- private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String,
ConcurrentOpenHashSet<String>>>
- brokerToNamespaceToBundleRange;
+ private final BundleRangeCache brokerToNamespaceToBundleRange = new
BundleRangeCache();
// Path to the ZNode containing the LocalBrokerData json for this broker.
private String brokerZnodePath;
@@ -199,10 +195,6 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
*/
public ModularLoadManagerImpl() {
brokerCandidateCache = new HashSet<>();
- brokerToNamespaceToBundleRange =
- ConcurrentOpenHashMap.<String,
- ConcurrentOpenHashMap<String,
ConcurrentOpenHashSet<String>>>newBuilder()
- .build();
defaultStats = new NamespaceBundleStats();
filterPipeline = new ArrayList<>();
loadData = new LoadData();
@@ -582,17 +574,9 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
TimeAverageBrokerData timeAverageData = new
TimeAverageBrokerData();
timeAverageData.reset(statsMap.keySet(), bundleData, defaultStats);
brokerData.setTimeAverageData(timeAverageData);
- final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>
namespaceToBundleRange =
- brokerToNamespaceToBundleRange
- .computeIfAbsent(broker, k ->
- ConcurrentOpenHashMap.<String,
-
ConcurrentOpenHashSet<String>>newBuilder()
- .build());
- synchronized (namespaceToBundleRange) {
- namespaceToBundleRange.clear();
- LoadManagerShared.fillNamespaceToBundlesMap(statsMap.keySet(),
namespaceToBundleRange);
-
LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundleData.keySet(),
namespaceToBundleRange);
- }
+
+ brokerToNamespaceToBundleRange.reloadFromBundles(broker,
+ Stream.of(statsMap.keySet(),
preallocatedBundleData.keySet()).flatMap(Collection::stream));
}
// Remove not active bundle from loadData
@@ -736,7 +720,7 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
.getBundle(namespace, bundle);
LoadManagerShared.applyNamespacePolicies(serviceUnit,
policies, brokerCandidateCache,
getAvailableBrokers(), brokerTopicLoadingPredicate);
- return
LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle,
currentBroker, pulsar,
+ return
LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, currentBroker,
pulsar,
brokerToNamespaceToBundleRange, brokerCandidateCache);
}
@@ -873,17 +857,7 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
final String namespaceName =
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundle);
- final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>
namespaceToBundleRange =
- brokerToNamespaceToBundleRange
- .computeIfAbsent(broker,
- k -> ConcurrentOpenHashMap.<String,
-
ConcurrentOpenHashSet<String>>newBuilder()
- .build());
- synchronized (namespaceToBundleRange) {
- namespaceToBundleRange.computeIfAbsent(namespaceName,
- k -> ConcurrentOpenHashSet.<String>newBuilder().build())
- .add(bundleRange);
- }
+ brokerToNamespaceToBundleRange.add(broker, namespaceName, bundleRange);
}
@VisibleForTesting
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index be0580808ca..30a7359ce0e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
import java.time.LocalDateTime;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -47,6 +48,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.broker.PulsarServerException;
@@ -62,8 +64,6 @@ import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
@@ -107,10 +107,7 @@ public class SimpleLoadManagerImpl implements LoadManager,
Consumer<Notification
private final Set<String> bundleGainsCache;
private final Set<String> bundleLossesCache;
- // Map from brokers to namespaces to the bundle ranges in that namespace
assigned to that broker.
- // Used to distribute bundles within a namespace evenly across brokers.
- private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String,
- ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange;
+ private final BundleRangeCache brokerToNamespaceToBundleRange = new
BundleRangeCache();
// CPU usage per msg/sec
private double realtimeCpuLoadFactor = 0.025;
@@ -205,10 +202,6 @@ public class SimpleLoadManagerImpl implements LoadManager,
Consumer<Notification
bundleLossesCache = new HashSet<>();
brokerCandidateCache = new HashSet<>();
availableBrokersCache = new HashSet<>();
- brokerToNamespaceToBundleRange =
- ConcurrentOpenHashMap.<String,
- ConcurrentOpenHashMap<String,
ConcurrentOpenHashSet<String>>>newBuilder()
- .build();
this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
public boolean isEnablePersistentTopics(String brokerId) {
@@ -853,14 +846,7 @@ public class SimpleLoadManagerImpl implements LoadManager,
Consumer<Notification
ResourceQuota quota = this.getResourceQuota(serviceUnitId);
// Add preallocated bundle range so incoming bundles from the
same namespace are not assigned to the
// same broker.
- brokerToNamespaceToBundleRange
- .computeIfAbsent(selectedRU.getResourceId(),
- k -> ConcurrentOpenHashMap.<String,
-
ConcurrentOpenHashSet<String>>newBuilder()
- .build())
- .computeIfAbsent(namespaceName, k ->
-
ConcurrentOpenHashSet.<String>newBuilder().build())
- .add(bundleRange);
+ brokerToNamespaceToBundleRange.add(selectedRU.getResourceId(),
namespaceName, bundleRange);
ranking.addPreAllocatedServiceUnit(serviceUnitId, quota);
resourceUnitRankings.put(selectedRU, ranking);
}
@@ -1272,15 +1258,8 @@ public class SimpleLoadManagerImpl implements
LoadManager, Consumer<Notification
final String broker = resourceUnit.getResourceId();
final Set<String> loadedBundles = ranking.getLoadedBundles();
final Set<String> preallocatedBundles =
resourceUnitRankings.get(resourceUnit).getPreAllocatedBundles();
- final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>
namespaceToBundleRange =
- brokerToNamespaceToBundleRange
- .computeIfAbsent(broker,
- k -> ConcurrentOpenHashMap.<String,
-
ConcurrentOpenHashSet<String>>newBuilder()
- .build());
- namespaceToBundleRange.clear();
- LoadManagerShared.fillNamespaceToBundlesMap(loadedBundles,
namespaceToBundleRange);
- LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundles,
namespaceToBundleRange);
+ brokerToNamespaceToBundleRange.reloadFromBundles(broker,
+ Stream.of(loadedBundles,
preallocatedBundles).flatMap(Collection::stream));
});
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index fc2fec96294..f1e462c4ec7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
+import org.apache.pulsar.broker.loadbalance.impl.BundleRangeCache;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
@@ -59,8 +60,6 @@ import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
@@ -167,12 +166,10 @@ public class AntiAffinityNamespaceGroupTest extends
MockedPulsarServiceBaseTest
}
}
-
protected Object getBundleOwnershipData(){
- return ConcurrentOpenHashMap.<String, ConcurrentOpenHashMap<String,
ConcurrentOpenHashSet<String>>>newBuilder().build();
+ return new BundleRangeCache();
}
-
protected String getLoadManagerClassName() {
return ModularLoadManagerImpl.class.getName();
}
@@ -366,17 +363,8 @@ public class AntiAffinityNamespaceGroupTest extends
MockedPulsarServiceBaseTest
Object ownershipData,
String broker, String namespace, String assignedBundleName) {
- ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String,
ConcurrentOpenHashSet<String>>>
- brokerToNamespaceToBundleRange =
- (ConcurrentOpenHashMap<String,
- ConcurrentOpenHashMap<String,
ConcurrentOpenHashSet<String>>>) ownershipData;
- ConcurrentOpenHashSet<String> bundleSet =
- ConcurrentOpenHashSet.<String>newBuilder().build();
- bundleSet.add(assignedBundleName);
- ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>
nsToBundleMap =
- ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder().build();
- nsToBundleMap.put(namespace, bundleSet);
- brokerToNamespaceToBundleRange.put(broker, nsToBundleMap);
+ final var brokerToNamespaceToBundleRange = (BundleRangeCache)
ownershipData;
+ brokerToNamespaceToBundleRange.add(broker, namespace,
assignedBundleName);
}
/**
@@ -562,10 +550,9 @@ public class AntiAffinityNamespaceGroupTest extends
MockedPulsarServiceBaseTest
if (ownershipData instanceof Set) {
LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar,
assignedNamespace, brokers,
(Set<Map.Entry<String, ServiceUnitStateData>>)
ownershipData, brokerToDomainMap);
- } else if (ownershipData instanceof ConcurrentOpenHashMap) {
+ } else if (ownershipData instanceof BundleRangeCache) {
LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar,
assignedNamespace, brokers,
- (ConcurrentOpenHashMap<String,
ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>)
- ownershipData, brokerToDomainMap);
+ (BundleRangeCache) ownershipData, brokerToDomainMap);
} else {
throw new RuntimeException("Unknown ownershipData class type");
}
@@ -582,11 +569,9 @@ public class AntiAffinityNamespaceGroupTest extends
MockedPulsarServiceBaseTest
if (ownershipData instanceof Set) {
return
LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle,
currentBroker, pulsar, (Set<Map.Entry<String,
ServiceUnitStateData>>) ownershipData, candidate);
- } else if (ownershipData instanceof ConcurrentOpenHashMap) {
- return
LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle,
- currentBroker, pulsar,
- (ConcurrentOpenHashMap<String,
ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>)
- ownershipData, candidate);
+ } else if (ownershipData instanceof BundleRangeCache) {
+ return
LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, currentBroker,
pulsar,
+ (BundleRangeCache) ownershipData, candidate);
} else {
throw new RuntimeException("Unknown ownershipData class type");
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java
index 8bc097779b0..465e8e2d852 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java
@@ -18,13 +18,9 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;
+import com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.Set;
-
-import com.google.common.collect.Sets;
-
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -37,59 +33,44 @@ public class LoadManagerSharedTest {
String assignedBundle = namespace + "/0x00000000_0x40000000";
Set<String> candidates = new HashSet<>();
- ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String,
ConcurrentOpenHashSet<String>>> map =
- ConcurrentOpenHashMap.<String,
- ConcurrentOpenHashMap<String,
ConcurrentOpenHashSet<String>>>newBuilder()
- .build();
-
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle,
candidates, map);
+ final var cache = new BundleRangeCache();
+
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle,
candidates, cache);
Assert.assertEquals(candidates.size(), 0);
candidates = Sets.newHashSet("broker1");
-
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle,
candidates, map);
+
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle,
candidates, cache);
Assert.assertEquals(candidates.size(), 1);
Assert.assertTrue(candidates.contains("broker1"));
candidates = Sets.newHashSet("broker1");
- fillBrokerToNamespaceToBundleMap(map, "broker1", namespace,
"0x40000000_0x80000000");
-
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle,
candidates, map);
+ cache.add("broker1", namespace, "0x40000000_0x80000000");
+
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle,
candidates, cache);
Assert.assertEquals(candidates.size(), 1);
Assert.assertTrue(candidates.contains("broker1"));
candidates = Sets.newHashSet("broker1", "broker2");
-
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle,
candidates, map);
+
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle,
candidates, cache);
Assert.assertEquals(candidates.size(), 1);
Assert.assertTrue(candidates.contains("broker2"));
candidates = Sets.newHashSet("broker1", "broker2");
- fillBrokerToNamespaceToBundleMap(map, "broker2", namespace,
"0x80000000_0xc0000000");
-
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle,
candidates, map);
+ cache.add("broker2", namespace, "0x80000000_0xc0000000");
+
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle,
candidates, cache);
Assert.assertEquals(candidates.size(), 2);
Assert.assertTrue(candidates.contains("broker1"));
Assert.assertTrue(candidates.contains("broker2"));
candidates = Sets.newHashSet("broker1", "broker2");
- fillBrokerToNamespaceToBundleMap(map, "broker2", namespace,
"0xc0000000_0xd0000000");
-
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle,
candidates, map);
+ cache.add("broker2", namespace, "0xc0000000_0xd0000000");
+
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle,
candidates, cache);
Assert.assertEquals(candidates.size(), 1);
Assert.assertTrue(candidates.contains("broker1"));
candidates = Sets.newHashSet("broker1", "broker2", "broker3");
- fillBrokerToNamespaceToBundleMap(map, "broker3", namespace,
"0xd0000000_0xffffffff");
-
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle,
candidates, map);
+ cache.add("broker3", namespace, "0xd0000000_0xffffffff");
+
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle,
candidates, cache);
Assert.assertEquals(candidates.size(), 2);
Assert.assertTrue(candidates.contains("broker1"));
Assert.assertTrue(candidates.contains("broker3"));
}
-
- private static void fillBrokerToNamespaceToBundleMap(
- ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String,
ConcurrentOpenHashSet<String>>> map,
- String broker, String namespace, String bundle) {
- map.computeIfAbsent(broker,
- k -> ConcurrentOpenHashMap.<String,
- ConcurrentOpenHashSet<String>>newBuilder().build())
- .computeIfAbsent(namespace,
- k ->
ConcurrentOpenHashSet.<String>newBuilder().build())
- .add(bundle);
- }
-
}