This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3604c67 make namespaces policy update take effect on time (#8976)
3604c67 is described below
commit 3604c67c7196a4180f93ff441e105ea146584bfe
Author: Jia Zhai <[email protected]>
AuthorDate: Mon Dec 21 13:59:17 2020 +0800
make namespaces policy update take effect on time (#8976)
### Motivation
The change of namespaces isolation policy takes effect only when
load-manager re-assign the bundles to brokers again.
This change tries to make the isolation policy takes effect on time.
### Modifications
- change setNamespaceIsolationPolicy method into async.
- add parameter to enable this feature:
enableNamespaceIsolationUpdateOnTime.
- add test to cover this feature.
### Verifying this change
tests passed
---
.../apache/pulsar/broker/ServiceConfiguration.java | 7 ++
.../pulsar/broker/admin/impl/ClustersBase.java | 113 +++++++++++++++++++--
.../pulsar/broker/loadbalance/LoadManager.java | 8 ++
.../broker/loadbalance/ModularLoadManager.java | 7 ++
.../loadbalance/impl/ModularLoadManagerImpl.java | 9 +-
.../impl/ModularLoadManagerWrapper.java | 5 +
.../apache/pulsar/broker/admin/AdminApiTest2.java | 57 +++++++++++
.../org/apache/pulsar/broker/admin/AdminTest.java | 3 +-
8 files changed, 199 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 97799cd..d065c85 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -882,6 +882,13 @@ public class ServiceConfiguration implements
PulsarConfiguration {
doc = "List of interceptors for entry metadata.")
private Set<String> brokerEntryMetadataInterceptors = new HashSet<>();
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Enable namespaceIsolation policy update take effect ontime or
not," +
+ " if set to ture, then the related namespaces will be unloaded
after reset policy to make it take effect."
+ )
+ private boolean enableNamespaceIsolationUpdateOnTime = false;
+
/***** --- TLS --- ****/
@FieldContext(
category = CATEGORY_TLS,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 5e9dfbf..d2714ec 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -37,6 +37,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
@@ -44,12 +46,16 @@ import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
@@ -58,6 +64,7 @@ import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -675,6 +682,7 @@ public class ClustersBase extends AdminResource {
@ApiResponse(code = 500, message = "Internal server error.")
})
public void setNamespaceIsolationPolicy(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(
value = "The cluster name",
required = true
@@ -690,14 +698,16 @@ public class ClustersBase extends AdminResource {
required = true
)
NamespaceIsolationData policyData
- ) throws Exception {
+ ) {
validateSuperUserAccess();
validateClusterExists(cluster);
validatePoliciesReadOnlyAccess();
+ String jsonInput = null;
try {
// validate the policy data before creating the node
policyData.validate();
+ jsonInput =
ObjectMapperFactory.create().writeValueAsString(policyData);
String nsIsolationPolicyPath = path("clusters", cluster,
NAMESPACE_ISOLATION_POLICIES);
NamespaceIsolationPolicies nsIsolationPolicies =
namespaceIsolationPoliciesCache()
@@ -715,24 +725,113 @@ public class ClustersBase extends AdminResource {
-1);
// make sure that the cache content will be refreshed for the next
read access
namespaceIsolationPoliciesCache().invalidate(nsIsolationPolicyPath);
+
+ // whether or not make the isolation update on time.
+ if
(pulsar().getConfiguration().isEnableNamespaceIsolationUpdateOnTime()) {
+ filterAndUnloadMatchedNameSpaces(asyncResponse, policyData);
+ } else {
+ asyncResponse.resume(Response.noContent().build());
+ return;
+ }
} catch (IllegalArgumentException iae) {
log.info("[{}] Failed to update
clusters/{}/namespaceIsolationPolicies/{}. Input data is invalid",
clientAppId(), cluster, policyName, iae);
- String jsonInput =
ObjectMapperFactory.create().writeValueAsString(policyData);
- throw new RestException(Status.BAD_REQUEST,
- "Invalid format of input policy data. policy: " +
policyName + "; data: " + jsonInput);
+ asyncResponse.resume(new RestException(Status.BAD_REQUEST,
+ "Invalid format of input policy data. policy: " +
policyName + "; data: " + jsonInput));
} catch (KeeperException.NoNodeException nne) {
log.warn("[{}] Failed to update
clusters/{}/namespaceIsolationPolicies: Does not exist", clientAppId(),
cluster);
- throw new RestException(Status.NOT_FOUND,
- "NamespaceIsolationPolicies for cluster " + cluster + "
does not exist");
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ "NamespaceIsolationPolicies for cluster " + cluster + "
does not exist"));
} catch (Exception e) {
log.error("[{}] Failed to update
clusters/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster,
policyName, e);
- throw new RestException(e);
+ asyncResponse.resume(new RestException(e));
}
}
+ // get matched namespaces; call unload for each namespaces;
+ private void filterAndUnloadMatchedNameSpaces(AsyncResponse asyncResponse,
+ NamespaceIsolationData
policyData) throws Exception {
+ Namespaces namespaces = pulsar().getAdminClient().namespaces();
+
+ List<String> nssToUnload = Lists.newArrayList();
+
+ pulsar().getAdminClient().tenants().getTenantsAsync()
+ .whenComplete((tenants, ex) -> {
+ if (ex != null) {
+ log.error("[{}] Failed to get tenants when
setNamespaceIsolationPolicy.", clientAppId(), ex);
+ return;
+ }
+ AtomicInteger tenantsNumber = new
AtomicInteger(tenants.size());
+ // get all tenants now, for each tenants, get its namespaces
+ tenants.forEach(tenant -> namespaces.getNamespacesAsync(tenant)
+ .whenComplete((nss, e) -> {
+ int leftTenantsToHandle =
tenantsNumber.decrementAndGet();
+ if (ex != null) {
+ log.error("[{}] Failed to get namespaces for
tenant {} when setNamespaceIsolationPolicy.",
+ clientAppId(), tenant, ex);
+
+ if (leftTenantsToHandle == 0) {
+ unloadMatchedNamespacesList(asyncResponse,
nssToUnload, namespaces);
+ }
+
+ return;
+ }
+
+ AtomicInteger nssNumber = new
AtomicInteger(nss.size());
+
+ // get all namespaces for this tenant now.
+ nss.forEach(namespaceName -> {
+ int leftNssToHandle = nssNumber.decrementAndGet();
+
+ // if namespace match any policy regex, add it to
ns list to be unload.
+ if (policyData.namespaces.stream()
+ .anyMatch(nsnameRegex ->
namespaceName.matches(nsnameRegex))) {
+ nssToUnload.add(namespaceName);
+ }
+
+ // all the tenants & namespaces get filtered.
+ if (leftNssToHandle == 0 && leftTenantsToHandle ==
0) {
+ unloadMatchedNamespacesList(asyncResponse,
nssToUnload, namespaces);
+ }
+ });
+ }));
+ });
+ }
+
+ private void unloadMatchedNamespacesList(AsyncResponse asyncResponse,
+ List<String> nssToUnload,
+ Namespaces namespaces) {
+ if (nssToUnload.size() == 0) {
+ asyncResponse.resume(Response.noContent().build());
+ return;
+ }
+
+ List<CompletableFuture<Void>> futures = nssToUnload.stream()
+ .map(namespaceName -> namespaces.unloadAsync(namespaceName))
+ .collect(Collectors.toList());
+
+ FutureUtil.waitForAll(futures).whenComplete((result, exception) -> {
+ if (exception != null) {
+ log.error("[{}] Failed to unload namespace while
setNamespaceIsolationPolicy.",
+ clientAppId(), exception);
+ asyncResponse.resume(new RestException(exception));
+ return;
+ }
+
+ try {
+ // write load info to load manager to make the load happens
fast
+
pulsar().getLoadManager().get().writeLoadReportOnZookeeper(true);
+ } catch (Exception e) {
+ log.warn("[{}] Failed to writeLoadReportOnZookeeper.",
clientAppId(), e);
+ }
+
+ asyncResponse.resume(Response.noContent().build());
+ return;
+ });
+ }
+
private boolean createZnodeIfNotExist(String path, Optional<Object> value)
throws KeeperException, InterruptedException {
// create persistent node on ZooKeeper
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
index 11b3e2e..7be1898 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
@@ -81,6 +81,14 @@ public interface LoadManager {
void writeLoadReportOnZookeeper() throws Exception;
/**
+ * Publish the current load report on ZK, forced or not.
+ * By default rely on method writeLoadReportOnZookeeper().
+ */
+ default void writeLoadReportOnZookeeper(boolean force) throws Exception {
+ writeLoadReportOnZookeeper();
+ }
+
+ /**
* Update namespace bundle resource quota on ZK.
*/
void writeResourceQuotasToZooKeeper() throws Exception;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
index 8a063a3..a778cdb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
@@ -95,6 +95,13 @@ public interface ModularLoadManager {
void writeBrokerDataOnZooKeeper();
/**
+ * As any broker, write the local broker data to ZooKeeper, forced or not.
+ */
+ default void writeBrokerDataOnZooKeeper(boolean force) {
+ writeBrokerDataOnZooKeeper();
+ }
+
+ /**
* As the leader broker, write bundle data aggregated from all brokers to
ZooKeeper.
*/
void writeBundleDataOnZooKeeper();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index c2e7cd1..9df80e2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -992,16 +992,21 @@ public class ModularLoadManagerImpl implements
ModularLoadManager, ZooKeeperCach
*/
@Override
public void writeBrokerDataOnZooKeeper() {
+ writeBrokerDataOnZooKeeper(false);
+ }
+
+ @Override
+ public void writeBrokerDataOnZooKeeper(boolean force) {
try {
updateLocalBrokerData();
- if (needBrokerDataUpdate()) {
+ if (needBrokerDataUpdate() || force) {
localData.setLastUpdate(System.currentTimeMillis());
try {
zkClient.setData(brokerZnodePath,
localData.getJsonBytes(), -1);
} catch (KeeperException.NoNodeException e) {
ZkUtils.createFullPathOptimistic(zkClient,
brokerZnodePath, localData.getJsonBytes(),
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
// Clear deltas.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index 48944ab6..b8ee073 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -115,6 +115,11 @@ public class ModularLoadManagerWrapper implements
LoadManager {
}
@Override
+ public void writeLoadReportOnZookeeper(boolean force) {
+ loadManager.writeBrokerDataOnZooKeeper(force);
+ }
+
+ @Override
public void writeResourceQuotasToZooKeeper() {
loadManager.writeBundleDataOnZooKeeper();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 9690f7e..58357a8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -100,6 +100,7 @@ public class AdminApiTest2 extends
MockedPulsarServiceBaseTest {
@Override
public void setup() throws Exception {
conf.setLoadBalancerEnabled(true);
+ conf.setEnableNamespaceIsolationUpdateOnTime(true);
super.internalSetup();
// create otherbroker to test redirect on calls that need
@@ -908,6 +909,62 @@ public class AdminApiTest2 extends
MockedPulsarServiceBaseTest {
assertFalse(isolationData.isPrimary);
}
+ // create 1 namespace:
+ // 0. without isolation policy configured, lookup will success.
+ // 1. with matched isolation broker configured and matched, lookup will
success.
+ // 2. update isolation policy, without broker matched, lookup will fail.
+ @Test
+ public void brokerNamespaceIsolationPoliciesUpdateOnTime() throws
Exception {
+ String brokerName = pulsar.getAdvertisedAddress();
+ String ns1Name = "prop-xyz/test_ns1_iso_" + System.currentTimeMillis();
+ admin.namespaces().createNamespace(ns1Name, Sets.newHashSet("test"));
+
+ // 0. without isolation policy configured, lookup will success.
+ String brokerUrl = admin.lookups().lookupTopic(ns1Name + "/topic1");
+ assertTrue(brokerUrl.contains(brokerName));
+ log.info("0 get lookup url {}", brokerUrl);
+
+ // create
+ String policyName1 = "policy-1";
+ String cluster = pulsar.getConfiguration().getClusterName();
+ String namespaceRegex = ns1Name;
+ NamespaceIsolationData nsPolicyData1 = new NamespaceIsolationData();
+ nsPolicyData1.namespaces = new ArrayList<String>();
+ nsPolicyData1.namespaces.add(ns1Name);
+ nsPolicyData1.primary = new ArrayList<String>();
+ nsPolicyData1.primary.add(brokerName + ".*");
+ nsPolicyData1.auto_failover_policy = new AutoFailoverPolicyData();
+ nsPolicyData1.auto_failover_policy.policy_type =
AutoFailoverPolicyType.min_available;
+ nsPolicyData1.auto_failover_policy.parameters = new HashMap<String,
String>();
+ nsPolicyData1.auto_failover_policy.parameters.put("min_limit", "1");
+ nsPolicyData1.auto_failover_policy.parameters.put("usage_threshold",
"100");
+ admin.clusters().createNamespaceIsolationPolicyAsync(cluster,
policyName1, nsPolicyData1).get();
+
+ // 1. with matched isolation broker configured and matched, lookup
will success.
+ brokerUrl = admin.lookups().lookupTopic(ns1Name + "/topic2");
+ assertTrue(brokerUrl.contains(brokerName));
+ log.info(" 1 get lookup url {}", brokerUrl);
+
+ // 2. update isolation policy, without broker matched, lookup will
fail.
+ nsPolicyData1.primary = new ArrayList<String>();
+ nsPolicyData1.primary.add(brokerName + "not_match");
+ admin.clusters().updateNamespaceIsolationPolicyAsync(cluster,
policyName1, nsPolicyData1).get();
+
+ try {
+ admin.lookups().lookupTopic(ns1Name + "/topic3");
+ } catch (Exception e) {
+ // expected lookup fail, because no brokers matched the policy.
+ log.info(" 2 expected fail lookup");
+ }
+
+ try {
+ admin.lookups().lookupTopic(ns1Name + "/topic1");
+ } catch (Exception e) {
+ // expected lookup fail, because no brokers matched the policy.
+ log.info(" 22 expected fail lookup");
+ }
+ }
+
@Test
public void clustersList() throws PulsarAdminException {
final String cluster = pulsar.getConfiguration().getClusterName();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index d30b2e5..c80e727 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -284,7 +284,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
policyData.auto_failover_policy.parameters = new HashMap<String,
String>();
policyData.auto_failover_policy.parameters.put("min_limit", "1");
policyData.auto_failover_policy.parameters.put("usage_threshold",
"90");
- clusters.setNamespaceIsolationPolicy("use", "policy1", policyData);
+ AsyncResponse response = mock(AsyncResponse.class);
+ clusters.setNamespaceIsolationPolicy(response,"use", "policy1",
policyData);
clusters.getNamespaceIsolationPolicies("use");
try {