This is an automated email from the ASF dual-hosted git repository.
kwang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f674ad1e64e [fix][broker] Avoid infinite bundle unloading (#20822)
f674ad1e64e is described below
commit f674ad1e64ef5671be4383edc3949f47af4ca4d3
Author: Kai Wang <[email protected]>
AuthorDate: Tue Jul 25 22:33:59 2023 +0800
[fix][broker] Avoid infinite bundle unloading (#20822)
(cherry picked from commit 3f637680bbbc268d4801172d3212279ace38c4d4)
---
.../loadbalance/impl/ModularLoadManagerImpl.java | 200 ++++++++++++---------
.../{ => impl}/ModularLoadManagerImplTest.java | 65 +++++--
2 files changed, 166 insertions(+), 99 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 30a2ef5cdf2..aa3b3c49646 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.util.ArrayList;
@@ -661,11 +662,24 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
if (!shouldAntiAffinityNamespaceUnload(namespaceName,
bundleRange, broker)) {
return;
}
+ NamespaceBundle bundleToUnload =
LoadManagerShared.getNamespaceBundle(pulsar, bundle);
+ Optional<String> destBroker =
this.selectBroker(bundleToUnload);
+ if (!destBroker.isPresent()) {
+ log.info("[{}] No broker available to unload bundle {}
from broker {}",
+ strategy.getClass().getSimpleName(), bundle,
broker);
+ return;
+ }
+ if (destBroker.get().equals(broker)) {
+ log.warn("[{}] The destination broker {} is the same
as the current owner broker for Bundle {}",
+ strategy.getClass().getSimpleName(),
destBroker.get(), bundle);
+ return;
+ }
- log.info("[{}] Unloading bundle: {} from broker {}",
- strategy.getClass().getSimpleName(), bundle,
broker);
+ log.info("[{}] Unloading bundle: {} from broker {} to dest
broker {}",
+ strategy.getClass().getSimpleName(), bundle,
broker, destBroker.get());
try {
-
pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName,
bundleRange);
+ pulsar.getAdminClient().namespaces()
+ .unloadNamespaceBundle(namespaceName,
bundleRange, destBroker.get());
loadData.getRecentlyUnloadedBundles().put(bundle,
System.currentTimeMillis());
} catch (PulsarServerException | PulsarAdminException e) {
log.warn("Error when trying to perform load shedding
on {} for broker {}", bundle, broker, e);
@@ -839,99 +853,119 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
// If the given bundle is already in preallocated, return
the selected broker.
return Optional.of(preallocatedBundleToBroker.get(bundle));
}
- final BundleData data =
loadData.getBundleData().computeIfAbsent(bundle,
- key -> getBundleDataOrDefault(bundle));
- brokerCandidateCache.clear();
- LoadManagerShared.applyNamespacePolicies(serviceUnit,
policies, brokerCandidateCache,
- getAvailableBrokers(),
- brokerTopicLoadingPredicate);
- // filter brokers which owns topic higher than threshold
-
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache,
loadData,
- conf.getLoadBalancerBrokerMaxTopics());
-
- // distribute namespaces to domain and brokers according to
anti-affinity-group
- LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar,
serviceUnit.toString(),
- brokerCandidateCache,
- brokerToNamespaceToBundleRange,
brokerToFailureDomainMap);
-
- // distribute bundles evenly to candidate-brokers if enable
- if (conf.isLoadBalancerDistributeBundlesEvenlyEnabled()) {
-
LoadManagerShared.removeMostServicingBrokersForNamespace(serviceUnit.toString(),
- brokerCandidateCache,
- brokerToNamespaceToBundleRange);
- if (log.isDebugEnabled()) {
- log.debug("enable distribute bundles evenly to
candidate-brokers, broker candidate count={}",
- brokerCandidateCache.size());
- }
+ Optional<String> broker = selectBroker(serviceUnit);
+ if (!broker.isPresent()) {
+ // If no broker is selected, return empty.
+ return broker;
}
- log.info("{} brokers being considered for assignment of {}",
brokerCandidateCache.size(), bundle);
+ // Add new bundle to preallocated.
+ preallocateBundle(bundle, broker.get());
+ return broker;
+ }
+ } finally {
+ selectBrokerForAssignment.observe(System.nanoTime() - startTime,
TimeUnit.NANOSECONDS);
+ }
+ }
- // Use the filter pipeline to finalize broker candidates.
- try {
- for (BrokerFilter filter : filterPipeline) {
- filter.filter(brokerCandidateCache, data, loadData,
conf);
- }
- } catch (BrokerFilterException x) {
- // restore the list of brokers to the full set
- LoadManagerShared.applyNamespacePolicies(serviceUnit,
policies, brokerCandidateCache,
- getAvailableBrokers(),
- brokerTopicLoadingPredicate);
- }
+ private void preallocateBundle(String bundle, String broker) {
+ final BundleData data =
loadData.getBundleData().computeIfAbsent(bundle,
+ key -> getBundleDataOrDefault(bundle));
+
loadData.getBrokerData().get(broker).getPreallocatedBundleData().put(bundle,
data);
+ preallocatedBundleToBroker.put(bundle, broker);
+
+ final String namespaceName =
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+ final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundle);
+ final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>
namespaceToBundleRange =
+ brokerToNamespaceToBundleRange
+ .computeIfAbsent(broker,
+ k -> ConcurrentOpenHashMap.<String,
+
ConcurrentOpenHashSet<String>>newBuilder()
+ .build());
+ synchronized (namespaceToBundleRange) {
+ namespaceToBundleRange.computeIfAbsent(namespaceName,
+ k -> ConcurrentOpenHashSet.<String>newBuilder().build())
+ .add(bundleRange);
+ }
+ }
- if (brokerCandidateCache.isEmpty()) {
- // restore the list of brokers to the full set
- LoadManagerShared.applyNamespacePolicies(serviceUnit,
policies, brokerCandidateCache,
- getAvailableBrokers(),
- brokerTopicLoadingPredicate);
- }
+ @VisibleForTesting
+ Optional<String> selectBroker(final ServiceUnitId serviceUnit) {
+ synchronized (brokerCandidateCache) {
+ final String bundle = serviceUnit.toString();
+ final BundleData data =
loadData.getBundleData().computeIfAbsent(bundle,
+ key -> getBundleDataOrDefault(bundle));
+ brokerCandidateCache.clear();
+ LoadManagerShared.applyNamespacePolicies(serviceUnit, policies,
brokerCandidateCache,
+ getAvailableBrokers(),
+ brokerTopicLoadingPredicate);
+
+ // filter brokers which owns topic higher than threshold
+
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache,
loadData,
+ conf.getLoadBalancerBrokerMaxTopics());
- // Choose a broker among the potentially smaller filtered
list, when possible
- Optional<String> broker =
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
+ // distribute namespaces to domain and brokers according to
anti-affinity-group
+ LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar,
bundle,
+ brokerCandidateCache,
+ brokerToNamespaceToBundleRange, brokerToFailureDomainMap);
+
+ // distribute bundles evenly to candidate-brokers if enable
+ if (conf.isLoadBalancerDistributeBundlesEvenlyEnabled()) {
+
LoadManagerShared.removeMostServicingBrokersForNamespace(bundle,
+ brokerCandidateCache,
+ brokerToNamespaceToBundleRange);
if (log.isDebugEnabled()) {
- log.debug("Selected broker {} from candidate brokers {}",
broker, brokerCandidateCache);
+ log.debug("enable distribute bundles evenly to
candidate-brokers, broker candidate count={}",
+ brokerCandidateCache.size());
}
+ }
- if (!broker.isPresent()) {
- // No brokers available
- return broker;
- }
+ log.info("{} brokers being considered for assignment of {}",
brokerCandidateCache.size(), bundle);
- final double overloadThreshold =
conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
- final double maxUsage =
loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
- if (maxUsage > overloadThreshold) {
- // All brokers that were in the filtered list were
overloaded, so check if there is a better broker
- LoadManagerShared.applyNamespacePolicies(serviceUnit,
policies, brokerCandidateCache,
- getAvailableBrokers(),
- brokerTopicLoadingPredicate);
- Optional<String> brokerTmp =
-
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
- if (brokerTmp.isPresent()) {
- broker = brokerTmp;
- }
+ // Use the filter pipeline to finalize broker candidates.
+ try {
+ for (BrokerFilter filter : filterPipeline) {
+ filter.filter(brokerCandidateCache, data, loadData, conf);
}
+ } catch (BrokerFilterException x) {
+ // restore the list of brokers to the full set
+ LoadManagerShared.applyNamespacePolicies(serviceUnit,
policies, brokerCandidateCache,
+ getAvailableBrokers(),
+ brokerTopicLoadingPredicate);
+ }
- // Add new bundle to preallocated.
-
loadData.getBrokerData().get(broker.get()).getPreallocatedBundleData().put(bundle,
data);
- preallocatedBundleToBroker.put(bundle, broker.get());
-
- final String namespaceName =
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
- final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundle);
- final ConcurrentOpenHashMap<String,
ConcurrentOpenHashSet<String>> namespaceToBundleRange =
- brokerToNamespaceToBundleRange
- .computeIfAbsent(broker.get(),
- k -> ConcurrentOpenHashMap.<String,
-
ConcurrentOpenHashSet<String>>newBuilder()
- .build());
- synchronized (namespaceToBundleRange) {
- namespaceToBundleRange.computeIfAbsent(namespaceName,
- k ->
ConcurrentOpenHashSet.<String>newBuilder().build())
- .add(bundleRange);
- }
+ if (brokerCandidateCache.isEmpty()) {
+ // restore the list of brokers to the full set
+ LoadManagerShared.applyNamespacePolicies(serviceUnit,
policies, brokerCandidateCache,
+ getAvailableBrokers(),
+ brokerTopicLoadingPredicate);
+ }
+
+ // Choose a broker among the potentially smaller filtered list,
when possible
+ Optional<String> broker =
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
+ if (log.isDebugEnabled()) {
+ log.debug("Selected broker {} from candidate brokers {}",
broker, brokerCandidateCache);
+ }
+
+ if (!broker.isPresent()) {
+ // No brokers available
return broker;
}
- } finally {
- selectBrokerForAssignment.observe(System.nanoTime() - startTime,
TimeUnit.NANOSECONDS);
+
+ final double overloadThreshold =
conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
+ final double maxUsage =
loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
+ if (maxUsage > overloadThreshold) {
+ // All brokers that were in the filtered list were overloaded,
so check if there is a better broker
+ LoadManagerShared.applyNamespacePolicies(serviceUnit,
policies, brokerCandidateCache,
+ getAvailableBrokers(),
+ brokerTopicLoadingPredicate);
+ Optional<String> brokerTmp =
+ placementStrategy.selectBroker(brokerCandidateCache,
data, loadData, conf);
+ if (brokerTmp.isPresent()) {
+ broker = brokerTmp;
+ }
+ }
+ return broker;
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
similarity index 92%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
rename to
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index 4b9f679f19d..3fb62f486ab 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -16,11 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.loadbalance;
+package org.apache.pulsar.broker.loadbalance.impl;
import static java.lang.Thread.sleep;
import static
org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -55,11 +57,10 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.broker.loadbalance.LoadBalancerTestingUtils;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
import
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
-import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
-import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
-import
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Producer;
@@ -410,39 +411,71 @@ public class ModularLoadManagerImplTest {
doAnswer(invocation -> {
bundleReference.set(invocation.getArguments()[0].toString() + '/'
+ invocation.getArguments()[1]);
return null;
- }).when(namespacesSpy1).unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString());
+ }).when(namespacesSpy1).unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString(), Mockito.anyString());
+
+ AtomicReference<Optional<String>> selectedBrokerRef = new
AtomicReference<>();
+ ModularLoadManagerImpl primaryLoadManagerSpy = spy(primaryLoadManager);
+ doAnswer(invocation -> {
+ ServiceUnitId serviceUnitId = (ServiceUnitId)
invocation.getArguments()[0];
+ Optional<String> broker =
primaryLoadManager.selectBroker(serviceUnitId);
+ selectedBrokerRef.set(broker);
+ return broker;
+ }).when(primaryLoadManagerSpy).selectBroker(any());
+
setField(pulsar1.getAdminClient(), "namespaces", namespacesSpy1);
pulsar1.getConfiguration().setLoadBalancerEnabled(true);
- final LoadData loadData = (LoadData) getField(primaryLoadManager,
"loadData");
+ final LoadData loadData = (LoadData) getField(primaryLoadManagerSpy,
"loadData");
final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
final BrokerData brokerDataSpy1 = spy(brokerDataMap.get(primaryHost));
when(brokerDataSpy1.getLocalData()).thenReturn(localBrokerData);
brokerDataMap.put(primaryHost, brokerDataSpy1);
// Need to update all the bundle data for the shredder to see the spy.
- primaryLoadManager.handleDataNotification(new
Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT +
"/broker:8080"));
+ primaryLoadManagerSpy.handleDataNotification(new
Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT +
"/broker:8080"));
sleep(100);
localBrokerData.setCpu(new ResourceUsage(80, 100));
- primaryLoadManager.doLoadShedding();
+ primaryLoadManagerSpy.doLoadShedding();
// 80% is below overload threshold: verify nothing is unloaded.
- verify(namespacesSpy1,
Mockito.times(0)).unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString());
+ verify(namespacesSpy1, Mockito.times(0))
+ .unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString(), Mockito.anyString());
localBrokerData.setCpu(new ResourceUsage(90, 100));
- primaryLoadManager.doLoadShedding();
+ primaryLoadManagerSpy.doLoadShedding();
// Most expensive bundle will be unloaded.
- verify(namespacesSpy1,
Mockito.times(1)).unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString());
+ verify(namespacesSpy1, Mockito.times(1))
+ .unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString(), Mockito.anyString());
assertEquals(bundleReference.get(), mockBundleName(2));
+ assertEquals(selectedBrokerRef.get().get(), secondaryHost);
- primaryLoadManager.doLoadShedding();
+ primaryLoadManagerSpy.doLoadShedding();
// Now less expensive bundle will be unloaded (normally other bundle
would move off and nothing would be
// unloaded, but this is not the case due to the spy's behavior).
- verify(namespacesSpy1,
Mockito.times(2)).unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString());
+ verify(namespacesSpy1, Mockito.times(2))
+ .unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString(), Mockito.anyString());
assertEquals(bundleReference.get(), mockBundleName(1));
+ assertEquals(selectedBrokerRef.get().get(), secondaryHost);
- primaryLoadManager.doLoadShedding();
+ primaryLoadManagerSpy.doLoadShedding();
// Now both are in grace period: neither should be unloaded.
- verify(namespacesSpy1,
Mockito.times(2)).unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString());
+ verify(namespacesSpy1, Mockito.times(2))
+ .unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString(), Mockito.anyString());
+ assertEquals(selectedBrokerRef.get().get(), secondaryHost);
+
+ // Test bundle transfer to same broker
+
+ loadData.getRecentlyUnloadedBundles().clear();
+ primaryLoadManagerSpy.doLoadShedding();
+ verify(namespacesSpy1, Mockito.times(3))
+ .unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString(), Mockito.anyString());
+
+
doReturn(Optional.of(primaryHost)).when(primaryLoadManagerSpy).selectBroker(any());
+ loadData.getRecentlyUnloadedBundles().clear();
+ primaryLoadManagerSpy.doLoadShedding();
+ // The bundle shouldn't be unloaded because the broker is the same.
+ verify(namespacesSpy1, Mockito.times(3))
+ .unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString(), Mockito.anyString());
+
}
// Test that ModularLoadManagerImpl will determine that writing local data
to ZooKeeper is necessary if certain