This is an automated email from the ASF dual-hosted git repository.
zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0b315eb2b68 [improve][broker]add NamespacePolicies check before unload
bundle when doLoadShedding (#16476)
0b315eb2b68 is described below
commit 0b315eb2b688bc84c1062ef260bdb020304f9194
Author: lixinyang <[email protected]>
AuthorDate: Thu Jul 21 10:45:34 2022 +0800
[improve][broker]add NamespacePolicies check before unload bundle when
doLoadShedding (#16476)
* add NamespacePolicies check before unload bundle when doLoadShedding
* fix typos
Co-authored-by: nicklixinyang <[email protected]>
---
.../loadbalance/impl/ModularLoadManagerImpl.java | 18 ++++++++
.../loadbalance/ModularLoadManagerImplTest.java | 51 ++++++++++++++++++++++
2 files changed, 69 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 5cb30eb6ad5..8c0df048de7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -656,6 +656,10 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
bundles.forEach(bundle -> {
final String namespaceName =
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundle);
+ if (!shouldNamespacePoliciesUnload(namespaceName,
bundleRange, broker)) {
+ return;
+ }
+
if (!shouldAntiAffinityNamespaceUnload(namespaceName,
bundleRange, broker)) {
return;
}
@@ -696,6 +700,20 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
this.bundleUnloadMetrics.set(metrics);
}
+ public boolean shouldNamespacePoliciesUnload(String namespace, String
bundle, String currentBroker) {
+ synchronized (brokerCandidateCache) {
+ brokerCandidateCache.clear();
+ ServiceUnitId serviceUnit =
pulsar.getNamespaceService().getNamespaceBundleFactory()
+ .getBundle(namespace, bundle);
+ LoadManagerShared.applyNamespacePolicies(serviceUnit, policies,
brokerCandidateCache,
+ getAvailableBrokers(), brokerTopicLoadingPredicate);
+
+ // if only current broker satisfy the NamespacePolicies should not
unload.
+ brokerCandidateCache.remove(currentBroker);
+ return !brokerCandidateCache.isEmpty();
+ }
+ }
+
public boolean shouldAntiAffinityNamespaceUnload(String namespace, String
bundle, String currentBroker) {
try {
Optional<LocalPolicies> nsPolicies =
pulsar.getPulsarResources().getLocalPolicies()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index 12f95a2b796..3169fbb32fd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -47,6 +47,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
@@ -58,6 +59,8 @@ import
org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
@@ -578,6 +581,54 @@ public class ModularLoadManagerImplTest {
}
+ @Test
+ public void testLoadSheddingWithNamespaceIsolationPolicies() throws
Exception {
+
+ final String cluster = "use";
+ final String tenant = "my-tenant";
+ final String namespace = "my-tenant/use/my-ns";
+ final String bundle = "0x00000000_0xffffffff";
+ final String brokerAddress = pulsar1.getAdvertisedAddress();
+ final String broker1Address = pulsar1.getAdvertisedAddress() + 1;
+
+ admin1.clusters().createCluster(cluster,
ClusterData.builder().serviceUrl("http://" +
pulsar1.getAdvertisedAddress()).build());
+ admin1.tenants().createTenant(tenant,
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
Sets.newHashSet(cluster)));
+ admin1.namespaces().createNamespace(namespace);
+
+ @Cleanup
+ PulsarClient pulsarClient =
PulsarClient.builder().serviceUrl(pulsar1.getSafeWebServiceAddress()).build();
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic("persistent://" + namespace + "/my-topic1")
+ .create();
+ ModularLoadManagerImpl loadManager = (ModularLoadManagerImpl)
((ModularLoadManagerWrapper) pulsar1
+ .getLoadManager().get()).getLoadManager();
+ pulsar1.getBrokerService().updateRates();
+ loadManager.updateAll();
+
+ // test1: no isolation policy
+ assertTrue(loadManager.shouldNamespacePoliciesUnload(namespace,
bundle, primaryHost));
+
+ // test2: as isolation policy, there are not another broker to load
the bundle.
+ String newPolicyJsonTemplate =
"{\"namespaces\":[\"%s.*\"],\"primary\":[\"%s\"],"
+ +
"\"secondary\":[\"%s\"],\"auto_failover_policy\":{\"policy_type\":\"min_available\",\"parameters\":{\"min_limit\":%s,\"usage_threshold\":80}}}";
+ String newPolicyJson = String.format(newPolicyJsonTemplate, namespace,
broker1Address,broker1Address, 1);
+ String newPolicyName = "my-ns-isolation-policies";
+ ObjectMapper jsonMapper = ObjectMapperFactory.create();
+ NamespaceIsolationDataImpl nsPolicyData =
jsonMapper.readValue(newPolicyJson.getBytes(),
+ NamespaceIsolationDataImpl.class);
+ admin1.clusters().createNamespaceIsolationPolicy(cluster,
newPolicyName, nsPolicyData);
+ assertFalse(loadManager.shouldNamespacePoliciesUnload(namespace,
bundle, broker1Address));
+
+ // test3: as isolation policy, there are another can load the bundle.
+ String newPolicyJson1 = String.format(newPolicyJsonTemplate,
namespace, brokerAddress,brokerAddress, 1);
+ NamespaceIsolationDataImpl nsPolicyData1 =
jsonMapper.readValue(newPolicyJson1.getBytes(),
+ NamespaceIsolationDataImpl.class);
+ admin1.clusters().updateNamespaceIsolationPolicy(cluster,
newPolicyName, nsPolicyData1);
+ assertTrue(loadManager.shouldNamespacePoliciesUnload(namespace,
bundle, primaryHost));
+
+ producer.close();
+ }
+
/**
* It verifies that pulsar-service fails if load-manager tries to create
ephemeral znode for broker which is already
* created by other zk-session-id.