This is an automated email from the ASF dual-hosted git repository.

kwang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new f674ad1e64e [fix][broker] Avoid infinite bundle unloading (#20822)
f674ad1e64e is described below

commit f674ad1e64ef5671be4383edc3949f47af4ca4d3
Author: Kai Wang <[email protected]>
AuthorDate: Tue Jul 25 22:33:59 2023 +0800

    [fix][broker] Avoid infinite bundle unloading (#20822)
    
    (cherry picked from commit 3f637680bbbc268d4801172d3212279ace38c4d4)
---
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 200 ++++++++++++---------
 .../{ => impl}/ModularLoadManagerImplTest.java     |  65 +++++--
 2 files changed, 166 insertions(+), 99 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 30a2ef5cdf2..aa3b3c49646 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import java.util.ArrayList;
@@ -661,11 +662,24 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
                     if (!shouldAntiAffinityNamespaceUnload(namespaceName, 
bundleRange, broker)) {
                         return;
                     }
+                    NamespaceBundle bundleToUnload = 
LoadManagerShared.getNamespaceBundle(pulsar, bundle);
+                    Optional<String> destBroker = 
this.selectBroker(bundleToUnload);
+                    if (!destBroker.isPresent()) {
+                        log.info("[{}] No broker available to unload bundle {} 
from broker {}",
+                                strategy.getClass().getSimpleName(), bundle, 
broker);
+                        return;
+                    }
+                    if (destBroker.get().equals(broker)) {
+                        log.warn("[{}] The destination broker {} is the same 
as the current owner broker for Bundle {}",
+                                strategy.getClass().getSimpleName(), 
destBroker.get(), bundle);
+                        return;
+                    }
 
-                    log.info("[{}] Unloading bundle: {} from broker {}",
-                            strategy.getClass().getSimpleName(), bundle, 
broker);
+                    log.info("[{}] Unloading bundle: {} from broker {} to dest 
broker {}",
+                            strategy.getClass().getSimpleName(), bundle, 
broker, destBroker.get());
                     try {
-                        
pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, 
bundleRange);
+                        pulsar.getAdminClient().namespaces()
+                                .unloadNamespaceBundle(namespaceName, 
bundleRange, destBroker.get());
                         loadData.getRecentlyUnloadedBundles().put(bundle, 
System.currentTimeMillis());
                     } catch (PulsarServerException | PulsarAdminException e) {
                         log.warn("Error when trying to perform load shedding 
on {} for broker {}", bundle, broker, e);
@@ -839,99 +853,119 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
                     // If the given bundle is already in preallocated, return 
the selected broker.
                     return Optional.of(preallocatedBundleToBroker.get(bundle));
                 }
-                final BundleData data = 
loadData.getBundleData().computeIfAbsent(bundle,
-                        key -> getBundleDataOrDefault(bundle));
-                brokerCandidateCache.clear();
-                LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache,
-                        getAvailableBrokers(),
-                        brokerTopicLoadingPredicate);
 
-                // filter brokers which owns topic higher than threshold
-                
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, 
loadData,
-                        conf.getLoadBalancerBrokerMaxTopics());
-
-                // distribute namespaces to domain and brokers according to 
anti-affinity-group
-                LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, 
serviceUnit.toString(),
-                        brokerCandidateCache,
-                        brokerToNamespaceToBundleRange, 
brokerToFailureDomainMap);
-
-                // distribute bundles evenly to candidate-brokers if enable
-                if (conf.isLoadBalancerDistributeBundlesEvenlyEnabled()) {
-                    
LoadManagerShared.removeMostServicingBrokersForNamespace(serviceUnit.toString(),
-                            brokerCandidateCache,
-                            brokerToNamespaceToBundleRange);
-                    if (log.isDebugEnabled()) {
-                        log.debug("enable distribute bundles evenly to 
candidate-brokers, broker candidate count={}",
-                                brokerCandidateCache.size());
-                    }
+                Optional<String> broker = selectBroker(serviceUnit);
+                if (!broker.isPresent()) {
+                    // If no broker is selected, return empty.
+                    return broker;
                 }
-                log.info("{} brokers being considered for assignment of {}", 
brokerCandidateCache.size(), bundle);
+                // Add new bundle to preallocated.
+                preallocateBundle(bundle, broker.get());
+                return broker;
+            }
+        } finally {
+            selectBrokerForAssignment.observe(System.nanoTime() - startTime, 
TimeUnit.NANOSECONDS);
+        }
+    }
 
-                // Use the filter pipeline to finalize broker candidates.
-                try {
-                    for (BrokerFilter filter : filterPipeline) {
-                        filter.filter(brokerCandidateCache, data, loadData, 
conf);
-                    }
-                } catch (BrokerFilterException x) {
-                    // restore the list of brokers to the full set
-                    LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache,
-                            getAvailableBrokers(),
-                            brokerTopicLoadingPredicate);
-                }
+    private void preallocateBundle(String bundle, String broker) {
+        final BundleData data = 
loadData.getBundleData().computeIfAbsent(bundle,
+                key -> getBundleDataOrDefault(bundle));
+        
loadData.getBrokerData().get(broker).getPreallocatedBundleData().put(bundle, 
data);
+        preallocatedBundleToBroker.put(bundle, broker);
+
+        final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+        final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundle);
+        final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> 
namespaceToBundleRange =
+                brokerToNamespaceToBundleRange
+                        .computeIfAbsent(broker,
+                                k -> ConcurrentOpenHashMap.<String,
+                                        
ConcurrentOpenHashSet<String>>newBuilder()
+                                        .build());
+        synchronized (namespaceToBundleRange) {
+            namespaceToBundleRange.computeIfAbsent(namespaceName,
+                    k -> ConcurrentOpenHashSet.<String>newBuilder().build())
+                    .add(bundleRange);
+        }
+    }
 
-                if (brokerCandidateCache.isEmpty()) {
-                    // restore the list of brokers to the full set
-                    LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache,
-                            getAvailableBrokers(),
-                            brokerTopicLoadingPredicate);
-                }
+    @VisibleForTesting
+    Optional<String> selectBroker(final ServiceUnitId serviceUnit) {
+        synchronized (brokerCandidateCache) {
+            final String bundle = serviceUnit.toString();
+            final BundleData data = 
loadData.getBundleData().computeIfAbsent(bundle,
+                    key -> getBundleDataOrDefault(bundle));
+            brokerCandidateCache.clear();
+            LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, 
brokerCandidateCache,
+                    getAvailableBrokers(),
+                    brokerTopicLoadingPredicate);
+
+            // filter brokers which owns topic higher than threshold
+            
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, 
loadData,
+                    conf.getLoadBalancerBrokerMaxTopics());
 
-                // Choose a broker among the potentially smaller filtered 
list, when possible
-                Optional<String> broker = 
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
+            // distribute namespaces to domain and brokers according to 
anti-affinity-group
+            LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, 
bundle,
+                    brokerCandidateCache,
+                    brokerToNamespaceToBundleRange, brokerToFailureDomainMap);
+
+            // distribute bundles evenly to candidate-brokers if enable
+            if (conf.isLoadBalancerDistributeBundlesEvenlyEnabled()) {
+                
LoadManagerShared.removeMostServicingBrokersForNamespace(bundle,
+                        brokerCandidateCache,
+                        brokerToNamespaceToBundleRange);
                 if (log.isDebugEnabled()) {
-                    log.debug("Selected broker {} from candidate brokers {}", 
broker, brokerCandidateCache);
+                    log.debug("enable distribute bundles evenly to 
candidate-brokers, broker candidate count={}",
+                            brokerCandidateCache.size());
                 }
+            }
 
-                if (!broker.isPresent()) {
-                    // No brokers available
-                    return broker;
-                }
+            log.info("{} brokers being considered for assignment of {}", 
brokerCandidateCache.size(), bundle);
 
-                final double overloadThreshold = 
conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
-                final double maxUsage = 
loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
-                if (maxUsage > overloadThreshold) {
-                    // All brokers that were in the filtered list were 
overloaded, so check if there is a better broker
-                    LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache,
-                            getAvailableBrokers(),
-                            brokerTopicLoadingPredicate);
-                    Optional<String> brokerTmp =
-                            
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
-                    if (brokerTmp.isPresent()) {
-                        broker = brokerTmp;
-                    }
+            // Use the filter pipeline to finalize broker candidates.
+            try {
+                for (BrokerFilter filter : filterPipeline) {
+                    filter.filter(brokerCandidateCache, data, loadData, conf);
                 }
+            } catch (BrokerFilterException x) {
+                // restore the list of brokers to the full set
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache,
+                        getAvailableBrokers(),
+                        brokerTopicLoadingPredicate);
+            }
 
-                // Add new bundle to preallocated.
-                
loadData.getBrokerData().get(broker.get()).getPreallocatedBundleData().put(bundle,
 data);
-                preallocatedBundleToBroker.put(bundle, broker.get());
-
-                final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
-                final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundle);
-                final ConcurrentOpenHashMap<String, 
ConcurrentOpenHashSet<String>> namespaceToBundleRange =
-                        brokerToNamespaceToBundleRange
-                                .computeIfAbsent(broker.get(),
-                                        k -> ConcurrentOpenHashMap.<String,
-                                                
ConcurrentOpenHashSet<String>>newBuilder()
-                                                .build());
-                synchronized (namespaceToBundleRange) {
-                    namespaceToBundleRange.computeIfAbsent(namespaceName,
-                            k -> 
ConcurrentOpenHashSet.<String>newBuilder().build())
-                            .add(bundleRange);
-                }
+            if (brokerCandidateCache.isEmpty()) {
+                // restore the list of brokers to the full set
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache,
+                        getAvailableBrokers(),
+                        brokerTopicLoadingPredicate);
+            }
+
+            // Choose a broker among the potentially smaller filtered list, 
when possible
+            Optional<String> broker = 
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
+            if (log.isDebugEnabled()) {
+                log.debug("Selected broker {} from candidate brokers {}", 
broker, brokerCandidateCache);
+            }
+
+            if (!broker.isPresent()) {
+                // No brokers available
                 return broker;
             }
-        } finally {
-            selectBrokerForAssignment.observe(System.nanoTime() - startTime, 
TimeUnit.NANOSECONDS);
+
+            final double overloadThreshold = 
conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
+            final double maxUsage = 
loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
+            if (maxUsage > overloadThreshold) {
+                // All brokers that were in the filtered list were overloaded, 
so check if there is a better broker
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache,
+                        getAvailableBrokers(),
+                        brokerTopicLoadingPredicate);
+                Optional<String> brokerTmp =
+                        placementStrategy.selectBroker(brokerCandidateCache, 
data, loadData, conf);
+                if (brokerTmp.isPresent()) {
+                    broker = brokerTmp;
+                }
+            }
+            return broker;
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
similarity index 92%
rename from 
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
rename to 
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index 4b9f679f19d..3fb62f486ab 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -16,11 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.loadbalance;
+package org.apache.pulsar.broker.loadbalance.impl;
 
 import static java.lang.Thread.sleep;
 import static 
org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -55,11 +57,10 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.broker.loadbalance.LoadBalancerTestingUtils;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
 import 
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
-import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
-import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
-import 
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Producer;
@@ -410,39 +411,71 @@ public class ModularLoadManagerImplTest {
         doAnswer(invocation -> {
             bundleReference.set(invocation.getArguments()[0].toString() + '/' 
+ invocation.getArguments()[1]);
             return null;
-        }).when(namespacesSpy1).unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString());
+        }).when(namespacesSpy1).unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyString());
+
+        AtomicReference<Optional<String>> selectedBrokerRef = new 
AtomicReference<>();
+        ModularLoadManagerImpl primaryLoadManagerSpy = spy(primaryLoadManager);
+        doAnswer(invocation -> {
+            ServiceUnitId serviceUnitId = (ServiceUnitId) 
invocation.getArguments()[0];
+            Optional<String> broker = 
primaryLoadManager.selectBroker(serviceUnitId);
+            selectedBrokerRef.set(broker);
+            return broker;
+        }).when(primaryLoadManagerSpy).selectBroker(any());
+
         setField(pulsar1.getAdminClient(), "namespaces", namespacesSpy1);
         pulsar1.getConfiguration().setLoadBalancerEnabled(true);
-        final LoadData loadData = (LoadData) getField(primaryLoadManager, 
"loadData");
+        final LoadData loadData = (LoadData) getField(primaryLoadManagerSpy, 
"loadData");
         final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
         final BrokerData brokerDataSpy1 = spy(brokerDataMap.get(primaryHost));
         when(brokerDataSpy1.getLocalData()).thenReturn(localBrokerData);
         brokerDataMap.put(primaryHost, brokerDataSpy1);
         // Need to update all the bundle data for the shredder to see the spy.
-        primaryLoadManager.handleDataNotification(new 
Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + 
"/broker:8080"));
+        primaryLoadManagerSpy.handleDataNotification(new 
Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + 
"/broker:8080"));
 
         sleep(100);
         localBrokerData.setCpu(new ResourceUsage(80, 100));
-        primaryLoadManager.doLoadShedding();
+        primaryLoadManagerSpy.doLoadShedding();
 
         // 80% is below overload threshold: verify nothing is unloaded.
-        verify(namespacesSpy1, 
Mockito.times(0)).unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString());
+        verify(namespacesSpy1, Mockito.times(0))
+                .unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyString());
 
         localBrokerData.setCpu(new ResourceUsage(90, 100));
-        primaryLoadManager.doLoadShedding();
+        primaryLoadManagerSpy.doLoadShedding();
         // Most expensive bundle will be unloaded.
-        verify(namespacesSpy1, 
Mockito.times(1)).unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString());
+        verify(namespacesSpy1, Mockito.times(1))
+                .unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyString());
         assertEquals(bundleReference.get(), mockBundleName(2));
+        assertEquals(selectedBrokerRef.get().get(), secondaryHost);
 
-        primaryLoadManager.doLoadShedding();
+        primaryLoadManagerSpy.doLoadShedding();
         // Now less expensive bundle will be unloaded (normally other bundle 
would move off and nothing would be
         // unloaded, but this is not the case due to the spy's behavior).
-        verify(namespacesSpy1, 
Mockito.times(2)).unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString());
+        verify(namespacesSpy1, Mockito.times(2))
+                .unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyString());
         assertEquals(bundleReference.get(), mockBundleName(1));
+        assertEquals(selectedBrokerRef.get().get(), secondaryHost);
 
-        primaryLoadManager.doLoadShedding();
+        primaryLoadManagerSpy.doLoadShedding();
         // Now both are in grace period: neither should be unloaded.
-        verify(namespacesSpy1, 
Mockito.times(2)).unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString());
+        verify(namespacesSpy1, Mockito.times(2))
+                .unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyString());
+        assertEquals(selectedBrokerRef.get().get(), secondaryHost);
+
+        // Test bundle transfer to same broker
+
+        loadData.getRecentlyUnloadedBundles().clear();
+        primaryLoadManagerSpy.doLoadShedding();
+        verify(namespacesSpy1, Mockito.times(3))
+                .unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyString());
+
+        
doReturn(Optional.of(primaryHost)).when(primaryLoadManagerSpy).selectBroker(any());
+        loadData.getRecentlyUnloadedBundles().clear();
+        primaryLoadManagerSpy.doLoadShedding();
+        // The bundle shouldn't be unloaded because the broker is the same.
+        verify(namespacesSpy1, Mockito.times(3))
+                .unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyString());
+
     }
 
     // Test that ModularLoadManagerImpl will determine that writing local data 
to ZooKeeper is necessary if certain

Reply via email to