This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 31d59cfa080 [branch-2.10][fix][broker] Avoid infinite bundle unloading 
(#20822) (#20877)
31d59cfa080 is described below

commit 31d59cfa080d6fba89e2f7074faa1d5c5fe973f9
Author: Kai Wang <kw...@apache.org>
AuthorDate: Tue Aug 8 14:41:49 2023 +0800

    [branch-2.10][fix][broker] Avoid infinite bundle unloading (#20822) (#20877)
---
 .../broker/loadbalance/impl/LoadManagerShared.java |   6 +
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 170 ++++++++++++---------
 .../{ => impl}/ModularLoadManagerImplTest.java     |  62 ++++++--
 3 files changed, 155 insertions(+), 83 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index e0127bd3864..26c31a6e8a9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -554,4 +554,10 @@ public class LoadManagerShared {
             brokerCandidateCache.addAll(filteredBrokerCandidates);
         }
     }
+
+    public static NamespaceBundle getNamespaceBundle(PulsarService pulsar, 
String bundle) {
+        final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+        final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundle);
+        return 
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName,
 bundleRange);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 07408305289..301f19213c3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
@@ -642,9 +643,21 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
                     if (!shouldAntiAffinityNamespaceUnload(namespaceName, 
bundleRange, broker)) {
                         return;
                     }
+                    NamespaceBundle bundleToUnload = 
LoadManagerShared.getNamespaceBundle(pulsar, bundle);
+                    Optional<String> destBroker = 
this.selectBroker(bundleToUnload);
+                    if (!destBroker.isPresent()) {
+                        log.info("[{}] No broker available to unload bundle {} 
from broker {}",
+                                strategy.getClass().getSimpleName(), bundle, 
broker);
+                        return;
+                    }
+                    if (destBroker.get().equals(broker)) {
+                        log.warn("[{}] The destination broker {} is the same 
as the current owner broker for Bundle {}",
+                                strategy.getClass().getSimpleName(), 
destBroker.get(), bundle);
+                        return;
+                    }
 
-                    log.info("[{}] Unloading bundle: {} from broker {}",
-                            strategy.getClass().getSimpleName(), bundle, 
broker);
+                    log.info("[{}] Unloading bundle: {} from broker {} to dest 
broker {}",
+                            strategy.getClass().getSimpleName(), bundle, 
broker, destBroker.get());
                     try {
                         
pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, 
bundleRange);
                         loadData.getRecentlyUnloadedBundles().put(bundle, 
System.currentTimeMillis());
@@ -799,16 +812,56 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
                     // If the given bundle is already in preallocated, return 
the selected broker.
                     return Optional.of(preallocatedBundleToBroker.get(bundle));
                 }
-                final BundleData data = 
loadData.getBundleData().computeIfAbsent(bundle,
-                        key -> getBundleDataOrDefault(bundle));
-                brokerCandidateCache.clear();
-                LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache,
-                        getAvailableBrokers(),
-                        brokerTopicLoadingPredicate);
 
-                // filter brokers which owns topic higher than threshold
-                
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, 
loadData,
-                        conf.getLoadBalancerBrokerMaxTopics());
+                Optional<String> broker = selectBroker(serviceUnit);
+                if (!broker.isPresent()) {
+                    // If no broker is selected, return empty.
+                    return broker;
+                }
+                // Add new bundle to preallocated.
+                preallocateBundle(bundle, broker.get());
+                return broker;
+            }
+        } finally {
+            selectBrokerForAssignment.observe(System.nanoTime() - startTime, 
TimeUnit.NANOSECONDS);
+        }
+    }
+
+    private void preallocateBundle(String bundle, String broker) {
+        final BundleData data = 
loadData.getBundleData().computeIfAbsent(bundle,
+                key -> getBundleDataOrDefault(bundle));
+        
loadData.getBrokerData().get(broker).getPreallocatedBundleData().put(bundle, 
data);
+        preallocatedBundleToBroker.put(bundle, broker);
+
+        final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+        final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundle);
+        final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> 
namespaceToBundleRange =
+                brokerToNamespaceToBundleRange
+                        .computeIfAbsent(broker,
+                                k -> ConcurrentOpenHashMap.<String,
+                                        
ConcurrentOpenHashSet<String>>newBuilder()
+                                        .build());
+        synchronized (namespaceToBundleRange) {
+            namespaceToBundleRange.computeIfAbsent(namespaceName,
+                    k -> ConcurrentOpenHashSet.<String>newBuilder().build())
+                    .add(bundleRange);
+        }
+    }
+
+    @VisibleForTesting
+    Optional<String> selectBroker(final ServiceUnitId serviceUnit) {
+        synchronized (brokerCandidateCache) {
+            final String bundle = serviceUnit.toString();
+            final BundleData data = 
loadData.getBundleData().computeIfAbsent(bundle,
+                    key -> getBundleDataOrDefault(bundle));
+            brokerCandidateCache.clear();
+            LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, 
brokerCandidateCache,
+                    getAvailableBrokers(),
+                    brokerTopicLoadingPredicate);
+
+            // filter brokers which owns topic higher than threshold
+            
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, 
loadData,
+                    conf.getLoadBalancerBrokerMaxTopics());
 
                 // distribute namespaces to domain and brokers according to 
anti-affinity-group
                 LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, 
serviceUnit.toString(),
@@ -820,71 +873,50 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
                         brokerToNamespaceToBundleRange);
                 log.info("{} brokers being considered for assignment of {}", 
brokerCandidateCache.size(), bundle);
 
-                // Use the filter pipeline to finalize broker candidates.
-                try {
-                    for (BrokerFilter filter : filterPipeline) {
-                        filter.filter(brokerCandidateCache, data, loadData, 
conf);
-                    }
-                } catch (BrokerFilterException x) {
-                    // restore the list of brokers to the full set
-                    LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache,
-                            getAvailableBrokers(),
-                            brokerTopicLoadingPredicate);
-                }
-
-                if (brokerCandidateCache.isEmpty()) {
-                    // restore the list of brokers to the full set
-                    LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache,
-                            getAvailableBrokers(),
-                            brokerTopicLoadingPredicate);
+            // Use the filter pipeline to finalize broker candidates.
+            try {
+                for (BrokerFilter filter : filterPipeline) {
+                    filter.filter(brokerCandidateCache, data, loadData, conf);
                 }
+            } catch (BrokerFilterException x) {
+                // restore the list of brokers to the full set
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache,
+                        getAvailableBrokers(),
+                        brokerTopicLoadingPredicate);
+            }
 
-                // Choose a broker among the potentially smaller filtered 
list, when possible
-                Optional<String> broker = 
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
-                if (log.isDebugEnabled()) {
-                    log.debug("Selected broker {} from candidate brokers {}", 
broker, brokerCandidateCache);
-                }
+            if (brokerCandidateCache.isEmpty()) {
+                // restore the list of brokers to the full set
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache,
+                        getAvailableBrokers(),
+                        brokerTopicLoadingPredicate);
+            }
 
-                if (!broker.isPresent()) {
-                    // No brokers available
-                    return broker;
-                }
+            // Choose a broker among the potentially smaller filtered list, 
when possible
+            Optional<String> broker = 
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
+            if (log.isDebugEnabled()) {
+                log.debug("Selected broker {} from candidate brokers {}", 
broker, brokerCandidateCache);
+            }
 
-                final double overloadThreshold = 
conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
-                final double maxUsage = 
loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
-                if (maxUsage > overloadThreshold) {
-                    // All brokers that were in the filtered list were 
overloaded, so check if there is a better broker
-                    LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache,
-                            getAvailableBrokers(),
-                            brokerTopicLoadingPredicate);
-                    Optional<String> brokerTmp =
-                            
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
-                    if (brokerTmp.isPresent()) {
-                        broker = brokerTmp;
-                    }
-                }
+            if (!broker.isPresent()) {
+                // No brokers available
+                return broker;
+            }
 
-                // Add new bundle to preallocated.
-                
loadData.getBrokerData().get(broker.get()).getPreallocatedBundleData().put(bundle,
 data);
-                preallocatedBundleToBroker.put(bundle, broker.get());
-
-                final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
-                final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundle);
-                final ConcurrentOpenHashMap<String, 
ConcurrentOpenHashSet<String>> namespaceToBundleRange =
-                        brokerToNamespaceToBundleRange
-                                .computeIfAbsent(broker.get(),
-                                        k -> ConcurrentOpenHashMap.<String,
-                                                
ConcurrentOpenHashSet<String>>newBuilder()
-                                                .build());
-                synchronized (namespaceToBundleRange) {
-                    namespaceToBundleRange.computeIfAbsent(namespaceName,
-                            k -> 
ConcurrentOpenHashSet.<String>newBuilder().build())
-                            .add(bundleRange);
+            final double overloadThreshold = 
conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
+            final double maxUsage = 
loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
+            if (maxUsage > overloadThreshold) {
+                // All brokers that were in the filtered list were overloaded, 
so check if there is a better broker
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache,
+                        getAvailableBrokers(),
+                        brokerTopicLoadingPredicate);
+                Optional<String> brokerTmp =
+                        placementStrategy.selectBroker(brokerCandidateCache, 
data, loadData, conf);
+                if (brokerTmp.isPresent()) {
+                    broker = brokerTmp;
                 }
-                return broker;
             }
-        } finally {
-            selectBrokerForAssignment.observe(System.nanoTime() - startTime, 
TimeUnit.NANOSECONDS);
+            return broker;
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
similarity index 91%
rename from 
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
rename to 
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index 6b5898f4174..dae160b1421 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -16,9 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.loadbalance;
+package org.apache.pulsar.broker.loadbalance.impl;
 
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -51,12 +53,12 @@ import org.apache.pulsar.broker.BundleData;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadBalancerTestingUtils;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.TimeAverageMessageData;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import 
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
-import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
-import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
-import 
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -329,38 +331,70 @@ public class ModularLoadManagerImplTest {
             bundleReference.set(invocation.getArguments()[0].toString() + '/' 
+ invocation.getArguments()[1]);
             return null;
         }).when(namespacesSpy1).unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString());
+
+        AtomicReference<Optional<String>> selectedBrokerRef = new 
AtomicReference<>();
+        ModularLoadManagerImpl primaryLoadManagerSpy = spy(primaryLoadManager);
+        doAnswer(invocation -> {
+            ServiceUnitId serviceUnitId = (ServiceUnitId) 
invocation.getArguments()[0];
+            Optional<String> broker = 
primaryLoadManager.selectBroker(serviceUnitId);
+            selectedBrokerRef.set(broker);
+            return broker;
+        }).when(primaryLoadManagerSpy).selectBroker(any());
+
         setField(pulsar1.getAdminClient(), "namespaces", namespacesSpy1);
         pulsar1.getConfiguration().setLoadBalancerEnabled(true);
-        final LoadData loadData = (LoadData) getField(primaryLoadManager, 
"loadData");
+        final LoadData loadData = (LoadData) getField(primaryLoadManagerSpy, 
"loadData");
         final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
         final BrokerData brokerDataSpy1 = spy(brokerDataMap.get(primaryHost));
         when(brokerDataSpy1.getLocalData()).thenReturn(localBrokerData);
         brokerDataMap.put(primaryHost, brokerDataSpy1);
         // Need to update all the bundle data for the shredder to see the spy.
-        primaryLoadManager.handleDataNotification(new 
Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + 
"/broker:8080"));
+        primaryLoadManagerSpy.handleDataNotification(new 
Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + 
"/broker:8080"));
 
         Thread.sleep(100);
         localBrokerData.setCpu(new ResourceUsage(80, 100));
-        primaryLoadManager.doLoadShedding();
+        primaryLoadManagerSpy.doLoadShedding();
 
         // 80% is below overload threshold: verify nothing is unloaded.
-        verify(namespacesSpy1, 
Mockito.times(0)).unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString());
+        verify(namespacesSpy1, Mockito.times(0))
+                .unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString());
 
         localBrokerData.setCpu(new ResourceUsage(90, 100));
-        primaryLoadManager.doLoadShedding();
+        primaryLoadManagerSpy.doLoadShedding();
         // Most expensive bundle will be unloaded.
-        verify(namespacesSpy1, 
Mockito.times(1)).unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString());
+        verify(namespacesSpy1, Mockito.times(1))
+                .unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString());
         assertEquals(bundleReference.get(), mockBundleName(2));
+        assertEquals(selectedBrokerRef.get().get(), secondaryHost);
 
-        primaryLoadManager.doLoadShedding();
+        primaryLoadManagerSpy.doLoadShedding();
         // Now less expensive bundle will be unloaded (normally other bundle 
would move off and nothing would be
         // unloaded, but this is not the case due to the spy's behavior).
-        verify(namespacesSpy1, 
Mockito.times(2)).unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString());
+        verify(namespacesSpy1, Mockito.times(2))
+                .unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString());
         assertEquals(bundleReference.get(), mockBundleName(1));
+        assertEquals(selectedBrokerRef.get().get(), secondaryHost);
 
-        primaryLoadManager.doLoadShedding();
+        primaryLoadManagerSpy.doLoadShedding();
         // Now both are in grace period: neither should be unloaded.
-        verify(namespacesSpy1, 
Mockito.times(2)).unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString());
+        verify(namespacesSpy1, Mockito.times(2))
+                .unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString());
+        assertEquals(selectedBrokerRef.get().get(), secondaryHost);
+
+        // Test bundle transfer to same broker
+
+        loadData.getRecentlyUnloadedBundles().clear();
+        primaryLoadManagerSpy.doLoadShedding();
+        verify(namespacesSpy1, Mockito.times(3))
+                .unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString());
+
+        
doReturn(Optional.of(primaryHost)).when(primaryLoadManagerSpy).selectBroker(any());
+        loadData.getRecentlyUnloadedBundles().clear();
+        primaryLoadManagerSpy.doLoadShedding();
+        // The bundle shouldn't be unloaded because the broker is the same.
+        verify(namespacesSpy1, Mockito.times(3))
+                .unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString());
+
     }
 
     // Test that ModularLoadManagerImpl will determine that writing local data 
to ZooKeeper is necessary if certain

Reply via email to