This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3604c67  make namespaces policy update take effect on time (#8976)
3604c67 is described below

commit 3604c67c7196a4180f93ff441e105ea146584bfe
Author: Jia Zhai <[email protected]>
AuthorDate: Mon Dec 21 13:59:17 2020 +0800

    make namespaces policy update take effect on time (#8976)
    
    ### Motivation
    
    The change of namespaces isolation policy takes effect only when 
load-manager re-assign the bundles to brokers again.
    This change tries to make the isolation policy takes effect on time.
    
    ### Modifications
    
    - change setNamespaceIsolationPolicy method into async.
    - add parameter to enable this feature: 
enableNamespaceIsolationUpdateOnTime.
    - add test to cover this feature.
    
    ### Verifying this change
    tests passed
---
 .../apache/pulsar/broker/ServiceConfiguration.java |   7 ++
 .../pulsar/broker/admin/impl/ClustersBase.java     | 113 +++++++++++++++++++--
 .../pulsar/broker/loadbalance/LoadManager.java     |   8 ++
 .../broker/loadbalance/ModularLoadManager.java     |   7 ++
 .../loadbalance/impl/ModularLoadManagerImpl.java   |   9 +-
 .../impl/ModularLoadManagerWrapper.java            |   5 +
 .../apache/pulsar/broker/admin/AdminApiTest2.java  |  57 +++++++++++
 .../org/apache/pulsar/broker/admin/AdminTest.java  |   3 +-
 8 files changed, 199 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 97799cd..d065c85 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -882,6 +882,13 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
             doc = "List of interceptors for entry metadata.")
     private Set<String> brokerEntryMetadataInterceptors = new HashSet<>();
 
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "Enable namespaceIsolation policy update take effect ontime or 
not," +
+            " if set to ture, then the related namespaces will be unloaded 
after reset policy to make it take effect."
+    )
+    private boolean enableNamespaceIsolationUpdateOnTime = false;
+
     /***** --- TLS --- ****/
     @FieldContext(
         category = CATEGORY_TLS,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 5e9dfbf..d2714ec 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -37,6 +37,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
@@ -44,12 +46,16 @@ import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamedEntity;
 import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
@@ -58,6 +64,7 @@ import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -675,6 +682,7 @@ public class ClustersBase extends AdminResource {
         @ApiResponse(code = 500, message = "Internal server error.")
     })
     public void setNamespaceIsolationPolicy(
+        @Suspended final AsyncResponse asyncResponse,
         @ApiParam(
             value = "The cluster name",
             required = true
@@ -690,14 +698,16 @@ public class ClustersBase extends AdminResource {
             required = true
         )
         NamespaceIsolationData policyData
-    ) throws Exception {
+    ) {
         validateSuperUserAccess();
         validateClusterExists(cluster);
         validatePoliciesReadOnlyAccess();
 
+        String jsonInput = null;
         try {
             // validate the policy data before creating the node
             policyData.validate();
+            jsonInput = 
ObjectMapperFactory.create().writeValueAsString(policyData);
 
             String nsIsolationPolicyPath = path("clusters", cluster, 
NAMESPACE_ISOLATION_POLICIES);
             NamespaceIsolationPolicies nsIsolationPolicies = 
namespaceIsolationPoliciesCache()
@@ -715,24 +725,113 @@ public class ClustersBase extends AdminResource {
                     -1);
             // make sure that the cache content will be refreshed for the next 
read access
             
namespaceIsolationPoliciesCache().invalidate(nsIsolationPolicyPath);
+
+            // whether or not make the isolation update on time.
+            if 
(pulsar().getConfiguration().isEnableNamespaceIsolationUpdateOnTime()) {
+                filterAndUnloadMatchedNameSpaces(asyncResponse, policyData);
+            } else {
+                asyncResponse.resume(Response.noContent().build());
+                return;
+            }
         } catch (IllegalArgumentException iae) {
             log.info("[{}] Failed to update 
clusters/{}/namespaceIsolationPolicies/{}. Input data is invalid",
                     clientAppId(), cluster, policyName, iae);
-            String jsonInput = 
ObjectMapperFactory.create().writeValueAsString(policyData);
-            throw new RestException(Status.BAD_REQUEST,
-                    "Invalid format of input policy data. policy: " + 
policyName + "; data: " + jsonInput);
+            asyncResponse.resume(new RestException(Status.BAD_REQUEST,
+                    "Invalid format of input policy data. policy: " + 
policyName + "; data: " + jsonInput));
         } catch (KeeperException.NoNodeException nne) {
             log.warn("[{}] Failed to update 
clusters/{}/namespaceIsolationPolicies: Does not exist", clientAppId(),
                     cluster);
-            throw new RestException(Status.NOT_FOUND,
-                    "NamespaceIsolationPolicies for cluster " + cluster + " 
does not exist");
+            asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                    "NamespaceIsolationPolicies for cluster " + cluster + " 
does not exist"));
         } catch (Exception e) {
             log.error("[{}] Failed to update 
clusters/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster,
                     policyName, e);
-            throw new RestException(e);
+            asyncResponse.resume(new RestException(e));
         }
     }
 
+    // get matched namespaces; call unload for each namespaces;
+    private void filterAndUnloadMatchedNameSpaces(AsyncResponse asyncResponse,
+                                                  NamespaceIsolationData 
policyData) throws Exception {
+        Namespaces namespaces = pulsar().getAdminClient().namespaces();
+
+        List<String> nssToUnload = Lists.newArrayList();
+
+        pulsar().getAdminClient().tenants().getTenantsAsync()
+            .whenComplete((tenants, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Failed to get tenants when 
setNamespaceIsolationPolicy.", clientAppId(), ex);
+                    return;
+                }
+                AtomicInteger tenantsNumber = new 
AtomicInteger(tenants.size());
+                // get all tenants now, for each tenants, get its namespaces
+                tenants.forEach(tenant -> namespaces.getNamespacesAsync(tenant)
+                    .whenComplete((nss, e) -> {
+                        int leftTenantsToHandle = 
tenantsNumber.decrementAndGet();
+                        if (ex != null) {
+                            log.error("[{}] Failed to get namespaces for 
tenant {} when setNamespaceIsolationPolicy.",
+                                clientAppId(), tenant, ex);
+
+                            if (leftTenantsToHandle == 0) {
+                                unloadMatchedNamespacesList(asyncResponse, 
nssToUnload, namespaces);
+                            }
+
+                            return;
+                        }
+
+                        AtomicInteger nssNumber = new 
AtomicInteger(nss.size());
+
+                        // get all namespaces for this tenant now.
+                        nss.forEach(namespaceName -> {
+                            int leftNssToHandle = nssNumber.decrementAndGet();
+
+                            // if namespace match any policy regex, add it to 
ns list to be unload.
+                            if (policyData.namespaces.stream()
+                                .anyMatch(nsnameRegex -> 
namespaceName.matches(nsnameRegex))) {
+                                nssToUnload.add(namespaceName);
+                            }
+
+                            // all the tenants & namespaces get filtered.
+                            if (leftNssToHandle == 0 && leftTenantsToHandle == 
0) {
+                                unloadMatchedNamespacesList(asyncResponse, 
nssToUnload, namespaces);
+                            }
+                        });
+                    }));
+            });
+    }
+
+    private void unloadMatchedNamespacesList(AsyncResponse asyncResponse,
+                                             List<String> nssToUnload,
+                                             Namespaces namespaces) {
+        if (nssToUnload.size() == 0) {
+            asyncResponse.resume(Response.noContent().build());
+            return;
+        }
+
+        List<CompletableFuture<Void>> futures = nssToUnload.stream()
+            .map(namespaceName -> namespaces.unloadAsync(namespaceName))
+            .collect(Collectors.toList());
+
+        FutureUtil.waitForAll(futures).whenComplete((result, exception) -> {
+            if (exception != null) {
+                log.error("[{}] Failed to unload namespace while 
setNamespaceIsolationPolicy.",
+                    clientAppId(), exception);
+                asyncResponse.resume(new RestException(exception));
+                return;
+            }
+
+            try {
+                // write load info to load manager to make the load happens 
fast
+                
pulsar().getLoadManager().get().writeLoadReportOnZookeeper(true);
+            } catch (Exception e) {
+                log.warn("[{}] Failed to writeLoadReportOnZookeeper.", 
clientAppId(), e);
+            }
+
+            asyncResponse.resume(Response.noContent().build());
+            return;
+        });
+    }
+
     private boolean createZnodeIfNotExist(String path, Optional<Object> value)
             throws KeeperException, InterruptedException {
         // create persistent node on ZooKeeper
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
index 11b3e2e..7be1898 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
@@ -81,6 +81,14 @@ public interface LoadManager {
     void writeLoadReportOnZookeeper() throws Exception;
 
     /**
+     * Publish the current load report on ZK, forced or not.
+     * By default rely on method writeLoadReportOnZookeeper().
+     */
+    default void writeLoadReportOnZookeeper(boolean force) throws Exception {
+        writeLoadReportOnZookeeper();
+    }
+
+    /**
      * Update namespace bundle resource quota on ZK.
      */
     void writeResourceQuotasToZooKeeper() throws Exception;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
index 8a063a3..a778cdb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
@@ -95,6 +95,13 @@ public interface ModularLoadManager {
     void writeBrokerDataOnZooKeeper();
 
     /**
+     * As any broker, write the local broker data to ZooKeeper, forced or not.
+     */
+    default void writeBrokerDataOnZooKeeper(boolean force) {
+        writeBrokerDataOnZooKeeper();
+    }
+
+    /**
      * As the leader broker, write bundle data aggregated from all brokers to 
ZooKeeper.
      */
     void writeBundleDataOnZooKeeper();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index c2e7cd1..9df80e2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -992,16 +992,21 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, ZooKeeperCach
      */
     @Override
     public void writeBrokerDataOnZooKeeper() {
+        writeBrokerDataOnZooKeeper(false);
+    }
+
+    @Override
+    public void writeBrokerDataOnZooKeeper(boolean force) {
         try {
             updateLocalBrokerData();
-            if (needBrokerDataUpdate()) {
+            if (needBrokerDataUpdate() || force) {
                 localData.setLastUpdate(System.currentTimeMillis());
 
                 try {
                     zkClient.setData(brokerZnodePath, 
localData.getJsonBytes(), -1);
                 } catch (KeeperException.NoNodeException e) {
                     ZkUtils.createFullPathOptimistic(zkClient, 
brokerZnodePath, localData.getJsonBytes(),
-                            ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                 }
 
                 // Clear deltas.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index 48944ab6..b8ee073 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -115,6 +115,11 @@ public class ModularLoadManagerWrapper implements 
LoadManager {
     }
 
     @Override
+    public void writeLoadReportOnZookeeper(boolean force) {
+        loadManager.writeBrokerDataOnZooKeeper(force);
+    }
+
+    @Override
     public void writeResourceQuotasToZooKeeper() {
         loadManager.writeBundleDataOnZooKeeper();
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 9690f7e..58357a8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -100,6 +100,7 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
     @Override
     public void setup() throws Exception {
         conf.setLoadBalancerEnabled(true);
+        conf.setEnableNamespaceIsolationUpdateOnTime(true);
         super.internalSetup();
 
         // create otherbroker to test redirect on calls that need
@@ -908,6 +909,62 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         assertFalse(isolationData.isPrimary);
     }
 
+    // create 1 namespace:
+    //  0. without isolation policy configured, lookup will success.
+    //  1. with matched isolation broker configured and matched, lookup will 
success.
+    //  2. update isolation policy, without broker matched, lookup will fail.
+    @Test
+    public void brokerNamespaceIsolationPoliciesUpdateOnTime() throws 
Exception {
+        String brokerName = pulsar.getAdvertisedAddress();
+        String ns1Name = "prop-xyz/test_ns1_iso_" + System.currentTimeMillis();
+        admin.namespaces().createNamespace(ns1Name, Sets.newHashSet("test"));
+
+        //  0. without isolation policy configured, lookup will success.
+        String brokerUrl = admin.lookups().lookupTopic(ns1Name + "/topic1");
+        assertTrue(brokerUrl.contains(brokerName));
+        log.info("0 get lookup url {}", brokerUrl);
+
+        // create
+        String policyName1 = "policy-1";
+        String cluster = pulsar.getConfiguration().getClusterName();
+        String namespaceRegex = ns1Name;
+        NamespaceIsolationData nsPolicyData1 = new NamespaceIsolationData();
+        nsPolicyData1.namespaces = new ArrayList<String>();
+        nsPolicyData1.namespaces.add(ns1Name);
+        nsPolicyData1.primary = new ArrayList<String>();
+        nsPolicyData1.primary.add(brokerName + ".*");
+        nsPolicyData1.auto_failover_policy = new AutoFailoverPolicyData();
+        nsPolicyData1.auto_failover_policy.policy_type = 
AutoFailoverPolicyType.min_available;
+        nsPolicyData1.auto_failover_policy.parameters = new HashMap<String, 
String>();
+        nsPolicyData1.auto_failover_policy.parameters.put("min_limit", "1");
+        nsPolicyData1.auto_failover_policy.parameters.put("usage_threshold", 
"100");
+        admin.clusters().createNamespaceIsolationPolicyAsync(cluster, 
policyName1, nsPolicyData1).get();
+
+        //  1. with matched isolation broker configured and matched, lookup 
will success.
+        brokerUrl = admin.lookups().lookupTopic(ns1Name + "/topic2");
+        assertTrue(brokerUrl.contains(brokerName));
+        log.info(" 1 get lookup url {}", brokerUrl);
+
+        //  2. update isolation policy, without broker matched, lookup will 
fail.
+        nsPolicyData1.primary = new ArrayList<String>();
+        nsPolicyData1.primary.add(brokerName + "not_match");
+        admin.clusters().updateNamespaceIsolationPolicyAsync(cluster, 
policyName1, nsPolicyData1).get();
+
+        try {
+            admin.lookups().lookupTopic(ns1Name + "/topic3");
+        } catch (Exception e) {
+            // expected lookup fail, because no brokers matched the policy.
+            log.info(" 2 expected fail lookup");
+        }
+
+        try {
+            admin.lookups().lookupTopic(ns1Name + "/topic1");
+        } catch (Exception e) {
+            // expected lookup fail, because no brokers matched the policy.
+            log.info(" 22 expected fail lookup");
+        }
+    }
+
     @Test
     public void clustersList() throws PulsarAdminException {
         final String cluster = pulsar.getConfiguration().getClusterName();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index d30b2e5..c80e727 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -284,7 +284,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         policyData.auto_failover_policy.parameters = new HashMap<String, 
String>();
         policyData.auto_failover_policy.parameters.put("min_limit", "1");
         policyData.auto_failover_policy.parameters.put("usage_threshold", 
"90");
-        clusters.setNamespaceIsolationPolicy("use", "policy1", policyData);
+        AsyncResponse response = mock(AsyncResponse.class);
+        clusters.setNamespaceIsolationPolicy(response,"use", "policy1", 
policyData);
         clusters.getNamespaceIsolationPolicies("use");
 
         try {

Reply via email to