This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push: new 31d59cfa080 [branch-2.10][fix][broker] Avoid infinite bundle unloading (#20822) (#20877) 31d59cfa080 is described below commit 31d59cfa080d6fba89e2f7074faa1d5c5fe973f9 Author: Kai Wang <kw...@apache.org> AuthorDate: Tue Aug 8 14:41:49 2023 +0800 [branch-2.10][fix][broker] Avoid infinite bundle unloading (#20822) (#20877) --- .../broker/loadbalance/impl/LoadManagerShared.java | 6 + .../loadbalance/impl/ModularLoadManagerImpl.java | 170 ++++++++++++--------- .../{ => impl}/ModularLoadManagerImplTest.java | 62 ++++++-- 3 files changed, 155 insertions(+), 83 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java index e0127bd3864..26c31a6e8a9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java @@ -554,4 +554,10 @@ public class LoadManagerShared { brokerCandidateCache.addAll(filteredBrokerCandidates); } } + + public static NamespaceBundle getNamespaceBundle(PulsarService pulsar, String bundle) { + final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle); + final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle); + return pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName, bundleRange); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 07408305289..301f19213c3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.loadbalance.impl; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; @@ -642,9 +643,21 @@ public class ModularLoadManagerImpl implements ModularLoadManager { if (!shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) { return; } + NamespaceBundle bundleToUnload = LoadManagerShared.getNamespaceBundle(pulsar, bundle); + Optional<String> destBroker = this.selectBroker(bundleToUnload); + if (!destBroker.isPresent()) { + log.info("[{}] No broker available to unload bundle {} from broker {}", + strategy.getClass().getSimpleName(), bundle, broker); + return; + } + if (destBroker.get().equals(broker)) { + log.warn("[{}] The destination broker {} is the same as the current owner broker for Bundle {}", + strategy.getClass().getSimpleName(), destBroker.get(), bundle); + return; + } - log.info("[{}] Unloading bundle: {} from broker {}", - strategy.getClass().getSimpleName(), bundle, broker); + log.info("[{}] Unloading bundle: {} from broker {} to dest broker {}", + strategy.getClass().getSimpleName(), bundle, broker, destBroker.get()); try { pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, bundleRange); loadData.getRecentlyUnloadedBundles().put(bundle, System.currentTimeMillis()); @@ -799,16 +812,56 @@ public class ModularLoadManagerImpl implements ModularLoadManager { // If the given bundle is already in preallocated, return the selected broker. return Optional.of(preallocatedBundleToBroker.get(bundle)); } - final BundleData data = loadData.getBundleData().computeIfAbsent(bundle, - key -> getBundleDataOrDefault(bundle)); - brokerCandidateCache.clear(); - LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, - getAvailableBrokers(), - brokerTopicLoadingPredicate); - // filter brokers which owns topic higher than threshold - LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData, - conf.getLoadBalancerBrokerMaxTopics()); + Optional<String> broker = selectBroker(serviceUnit); + if (!broker.isPresent()) { + // If no broker is selected, return empty. + return broker; + } + // Add new bundle to preallocated. + preallocateBundle(bundle, broker.get()); + return broker; + } + } finally { + selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + } + } + + private void preallocateBundle(String bundle, String broker) { + final BundleData data = loadData.getBundleData().computeIfAbsent(bundle, + key -> getBundleDataOrDefault(bundle)); + loadData.getBrokerData().get(broker).getPreallocatedBundleData().put(bundle, data); + preallocatedBundleToBroker.put(bundle, broker); + + final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle); + final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle); + final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange = + brokerToNamespaceToBundleRange + .computeIfAbsent(broker, + k -> ConcurrentOpenHashMap.<String, + ConcurrentOpenHashSet<String>>newBuilder() + .build()); + synchronized (namespaceToBundleRange) { + namespaceToBundleRange.computeIfAbsent(namespaceName, + k -> ConcurrentOpenHashSet.<String>newBuilder().build()) + .add(bundleRange); + } + } + + @VisibleForTesting + Optional<String> selectBroker(final ServiceUnitId serviceUnit) { + synchronized (brokerCandidateCache) { + final String bundle = serviceUnit.toString(); + final BundleData data = loadData.getBundleData().computeIfAbsent(bundle, + key -> getBundleDataOrDefault(bundle)); + brokerCandidateCache.clear(); + LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, + getAvailableBrokers(), + brokerTopicLoadingPredicate); + + // filter brokers which owns topic higher than threshold + LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData, + conf.getLoadBalancerBrokerMaxTopics()); // distribute namespaces to domain and brokers according to anti-affinity-group LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, serviceUnit.toString(), @@ -820,71 +873,50 @@ public class ModularLoadManagerImpl implements ModularLoadManager { brokerToNamespaceToBundleRange); log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle); - // Use the filter pipeline to finalize broker candidates. - try { - for (BrokerFilter filter : filterPipeline) { - filter.filter(brokerCandidateCache, data, loadData, conf); - } - } catch (BrokerFilterException x) { - // restore the list of brokers to the full set - LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, - getAvailableBrokers(), - brokerTopicLoadingPredicate); - } - - if (brokerCandidateCache.isEmpty()) { - // restore the list of brokers to the full set - LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, - getAvailableBrokers(), - brokerTopicLoadingPredicate); + // Use the filter pipeline to finalize broker candidates. + try { + for (BrokerFilter filter : filterPipeline) { + filter.filter(brokerCandidateCache, data, loadData, conf); } + } catch (BrokerFilterException x) { + // restore the list of brokers to the full set + LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, + getAvailableBrokers(), + brokerTopicLoadingPredicate); + } - // Choose a broker among the potentially smaller filtered list, when possible - Optional<String> broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf); - if (log.isDebugEnabled()) { - log.debug("Selected broker {} from candidate brokers {}", broker, brokerCandidateCache); - } + if (brokerCandidateCache.isEmpty()) { + // restore the list of brokers to the full set + LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, + getAvailableBrokers(), + brokerTopicLoadingPredicate); + } - if (!broker.isPresent()) { - // No brokers available - return broker; - } + // Choose a broker among the potentially smaller filtered list, when possible + Optional<String> broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf); + if (log.isDebugEnabled()) { + log.debug("Selected broker {} from candidate brokers {}", broker, brokerCandidateCache); + } - final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0; - final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage(); - if (maxUsage > overloadThreshold) { - // All brokers that were in the filtered list were overloaded, so check if there is a better broker - LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, - getAvailableBrokers(), - brokerTopicLoadingPredicate); - Optional<String> brokerTmp = - placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf); - if (brokerTmp.isPresent()) { - broker = brokerTmp; - } - } + if (!broker.isPresent()) { + // No brokers available + return broker; + } - // Add new bundle to preallocated. - loadData.getBrokerData().get(broker.get()).getPreallocatedBundleData().put(bundle, data); - preallocatedBundleToBroker.put(bundle, broker.get()); - - final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle); - final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle); - final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange = - brokerToNamespaceToBundleRange - .computeIfAbsent(broker.get(), - k -> ConcurrentOpenHashMap.<String, - ConcurrentOpenHashSet<String>>newBuilder() - .build()); - synchronized (namespaceToBundleRange) { - namespaceToBundleRange.computeIfAbsent(namespaceName, - k -> ConcurrentOpenHashSet.<String>newBuilder().build()) - .add(bundleRange); + final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0; + final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage(); + if (maxUsage > overloadThreshold) { + // All brokers that were in the filtered list were overloaded, so check if there is a better broker + LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, + getAvailableBrokers(), + brokerTopicLoadingPredicate); + Optional<String> brokerTmp = + placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf); + if (brokerTmp.isPresent()) { + broker = brokerTmp; } - return broker; } - } finally { - selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + return broker; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java similarity index 91% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java index 6b5898f4174..dae160b1421 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java @@ -16,9 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.loadbalance; +package org.apache.pulsar.broker.loadbalance.impl; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -51,12 +53,12 @@ import org.apache.pulsar.broker.BundleData; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.LoadBalancerTestingUtils; +import org.apache.pulsar.broker.loadbalance.LoadData; +import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.TimeAverageMessageData; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate; -import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; -import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper; -import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies; import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -329,38 +331,70 @@ public class ModularLoadManagerImplTest { bundleReference.set(invocation.getArguments()[0].toString() + '/' + invocation.getArguments()[1]); return null; }).when(namespacesSpy1).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString()); + + AtomicReference<Optional<String>> selectedBrokerRef = new AtomicReference<>(); + ModularLoadManagerImpl primaryLoadManagerSpy = spy(primaryLoadManager); + doAnswer(invocation -> { + ServiceUnitId serviceUnitId = (ServiceUnitId) invocation.getArguments()[0]; + Optional<String> broker = primaryLoadManager.selectBroker(serviceUnitId); + selectedBrokerRef.set(broker); + return broker; + }).when(primaryLoadManagerSpy).selectBroker(any()); + setField(pulsar1.getAdminClient(), "namespaces", namespacesSpy1); pulsar1.getConfiguration().setLoadBalancerEnabled(true); - final LoadData loadData = (LoadData) getField(primaryLoadManager, "loadData"); + final LoadData loadData = (LoadData) getField(primaryLoadManagerSpy, "loadData"); final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData(); final BrokerData brokerDataSpy1 = spy(brokerDataMap.get(primaryHost)); when(brokerDataSpy1.getLocalData()).thenReturn(localBrokerData); brokerDataMap.put(primaryHost, brokerDataSpy1); // Need to update all the bundle data for the shredder to see the spy. - primaryLoadManager.handleDataNotification(new Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + "/broker:8080")); + primaryLoadManagerSpy.handleDataNotification(new Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + "/broker:8080")); Thread.sleep(100); localBrokerData.setCpu(new ResourceUsage(80, 100)); - primaryLoadManager.doLoadShedding(); + primaryLoadManagerSpy.doLoadShedding(); // 80% is below overload threshold: verify nothing is unloaded. - verify(namespacesSpy1, Mockito.times(0)).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString()); + verify(namespacesSpy1, Mockito.times(0)) + .unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString()); localBrokerData.setCpu(new ResourceUsage(90, 100)); - primaryLoadManager.doLoadShedding(); + primaryLoadManagerSpy.doLoadShedding(); // Most expensive bundle will be unloaded. - verify(namespacesSpy1, Mockito.times(1)).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString()); + verify(namespacesSpy1, Mockito.times(1)) + .unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString()); assertEquals(bundleReference.get(), mockBundleName(2)); + assertEquals(selectedBrokerRef.get().get(), secondaryHost); - primaryLoadManager.doLoadShedding(); + primaryLoadManagerSpy.doLoadShedding(); // Now less expensive bundle will be unloaded (normally other bundle would move off and nothing would be // unloaded, but this is not the case due to the spy's behavior). - verify(namespacesSpy1, Mockito.times(2)).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString()); + verify(namespacesSpy1, Mockito.times(2)) + .unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString()); assertEquals(bundleReference.get(), mockBundleName(1)); + assertEquals(selectedBrokerRef.get().get(), secondaryHost); - primaryLoadManager.doLoadShedding(); + primaryLoadManagerSpy.doLoadShedding(); // Now both are in grace period: neither should be unloaded. - verify(namespacesSpy1, Mockito.times(2)).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString()); + verify(namespacesSpy1, Mockito.times(2)) + .unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString()); + assertEquals(selectedBrokerRef.get().get(), secondaryHost); + + // Test bundle transfer to same broker + + loadData.getRecentlyUnloadedBundles().clear(); + primaryLoadManagerSpy.doLoadShedding(); + verify(namespacesSpy1, Mockito.times(3)) + .unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString()); + + doReturn(Optional.of(primaryHost)).when(primaryLoadManagerSpy).selectBroker(any()); + loadData.getRecentlyUnloadedBundles().clear(); + primaryLoadManagerSpy.doLoadShedding(); + // The bundle shouldn't be unloaded because the broker is the same. + verify(namespacesSpy1, Mockito.times(3)) + .unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString()); + } // Test that ModularLoadManagerImpl will determine that writing local data to ZooKeeper is necessary if certain