This is an automated email from the ASF dual-hosted git repository.

zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b315eb2b68 [improve][broker]add NamespacePolicies check before unload 
bundle when doLoadShedding (#16476)
0b315eb2b68 is described below

commit 0b315eb2b688bc84c1062ef260bdb020304f9194
Author: lixinyang <[email protected]>
AuthorDate: Thu Jul 21 10:45:34 2022 +0800

    [improve][broker]add NamespacePolicies check before unload bundle when 
doLoadShedding (#16476)
    
    * add NamespacePolicies check before unload bundle when doLoadShedding
    
    * fix typos
    
    Co-authored-by: nicklixinyang <[email protected]>
---
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 18 ++++++++
 .../loadbalance/ModularLoadManagerImplTest.java    | 51 ++++++++++++++++++++++
 2 files changed, 69 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 5cb30eb6ad5..8c0df048de7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -656,6 +656,10 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
                 bundles.forEach(bundle -> {
                     final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
                     final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundle);
+                    if (!shouldNamespacePoliciesUnload(namespaceName, 
bundleRange, broker)) {
+                        return;
+                    }
+
                     if (!shouldAntiAffinityNamespaceUnload(namespaceName, 
bundleRange, broker)) {
                         return;
                     }
@@ -696,6 +700,20 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
         this.bundleUnloadMetrics.set(metrics);
     }
 
+    public boolean shouldNamespacePoliciesUnload(String namespace, String 
bundle, String currentBroker) {
+        synchronized (brokerCandidateCache) {
+            brokerCandidateCache.clear();
+            ServiceUnitId serviceUnit = 
pulsar.getNamespaceService().getNamespaceBundleFactory()
+                    .getBundle(namespace, bundle);
+            LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, 
brokerCandidateCache,
+                    getAvailableBrokers(), brokerTopicLoadingPredicate);
+
+            // if only current broker satisfy the NamespacePolicies should not 
unload.
+            brokerCandidateCache.remove(currentBroker);
+            return !brokerCandidateCache.isEmpty();
+        }
+    }
+
     public boolean shouldAntiAffinityNamespaceUnload(String namespace, String 
bundle, String currentBroker) {
         try {
             Optional<LocalPolicies> nsPolicies = 
pulsar.getPulsarResources().getLocalPolicies()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index 12f95a2b796..3169fbb32fd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -47,6 +47,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -58,6 +59,8 @@ import 
org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
 import 
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceBundles;
@@ -578,6 +581,54 @@ public class ModularLoadManagerImplTest {
 
     }
 
+    @Test
+    public void testLoadSheddingWithNamespaceIsolationPolicies() throws 
Exception {
+
+        final String cluster = "use";
+        final String tenant = "my-tenant";
+        final String namespace = "my-tenant/use/my-ns";
+        final String bundle = "0x00000000_0xffffffff";
+        final String brokerAddress = pulsar1.getAdvertisedAddress();
+        final String broker1Address = pulsar1.getAdvertisedAddress() + 1;
+
+        admin1.clusters().createCluster(cluster, 
ClusterData.builder().serviceUrl("http://"; + 
pulsar1.getAdvertisedAddress()).build());
+        admin1.tenants().createTenant(tenant,
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet(cluster)));
+        admin1.namespaces().createNamespace(namespace);
+
+        @Cleanup
+        PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(pulsar1.getSafeWebServiceAddress()).build();
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://" + namespace + "/my-topic1")
+                .create();
+        ModularLoadManagerImpl loadManager = (ModularLoadManagerImpl) 
((ModularLoadManagerWrapper) pulsar1
+                .getLoadManager().get()).getLoadManager();
+        pulsar1.getBrokerService().updateRates();
+        loadManager.updateAll();
+
+        // test1: no isolation policy
+        assertTrue(loadManager.shouldNamespacePoliciesUnload(namespace, 
bundle, primaryHost));
+
+        // test2: as isolation policy, there are not another broker to load 
the bundle.
+        String newPolicyJsonTemplate = 
"{\"namespaces\":[\"%s.*\"],\"primary\":[\"%s\"],"
+                + 
"\"secondary\":[\"%s\"],\"auto_failover_policy\":{\"policy_type\":\"min_available\",\"parameters\":{\"min_limit\":%s,\"usage_threshold\":80}}}";
+        String newPolicyJson = String.format(newPolicyJsonTemplate, namespace, 
broker1Address,broker1Address, 1);
+        String newPolicyName = "my-ns-isolation-policies";
+        ObjectMapper jsonMapper = ObjectMapperFactory.create();
+        NamespaceIsolationDataImpl nsPolicyData = 
jsonMapper.readValue(newPolicyJson.getBytes(),
+                NamespaceIsolationDataImpl.class);
+        admin1.clusters().createNamespaceIsolationPolicy(cluster, 
newPolicyName, nsPolicyData);
+        assertFalse(loadManager.shouldNamespacePoliciesUnload(namespace, 
bundle, broker1Address));
+
+        // test3: as isolation policy, there are another can load the bundle.
+        String newPolicyJson1 = String.format(newPolicyJsonTemplate, 
namespace, brokerAddress,brokerAddress, 1);
+        NamespaceIsolationDataImpl nsPolicyData1 = 
jsonMapper.readValue(newPolicyJson1.getBytes(),
+                NamespaceIsolationDataImpl.class);
+        admin1.clusters().updateNamespaceIsolationPolicy(cluster, 
newPolicyName, nsPolicyData1);
+        assertTrue(loadManager.shouldNamespacePoliciesUnload(namespace, 
bundle, primaryHost));
+
+        producer.close();
+    }
+
     /**
      * It verifies that pulsar-service fails if load-manager tries to create 
ephemeral znode for broker which is already
      * created by other zk-session-id.

Reply via email to