This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new f9283dab56c [improve][admin] PIP-369 Introduce `unload` flag in
`ns-isolation-policy set` call (#23120)
f9283dab56c is described below
commit f9283dab56cc96a4202c98f37342ebc302acac3a
Author: Girish Sharma <[email protected]>
AuthorDate: Sun Sep 1 09:49:45 2024 +0530
[improve][admin] PIP-369 Introduce `unload` flag in `ns-isolation-policy
set` call (#23120)
Co-authored-by: Zixuan Liu <[email protected]>
(cherry picked from commit 8da3bf8322c536c495541c80926cdf9389612515)
---
.../pulsar/broker/admin/impl/ClustersBase.java | 54 +++++-
.../apache/pulsar/broker/admin/AdminApi2Test.java | 208 ++++++++++++++++++---
.../policies/data/NamespaceIsolationData.java | 4 +
...va => NamespaceIsolationPolicyUnloadScope.java} | 44 ++---
.../admin/cli/CmdNamespaceIsolationPolicy.java | 17 +-
.../common/policies/NamespaceIsolationPolicy.java | 6 +
.../policies/data/NamespaceIsolationDataImpl.java | 17 +-
.../impl/NamespaceIsolationPolicyImpl.java | 8 +
8 files changed, 302 insertions(+), 56 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 4fe8a01e679..132c99ce16b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -27,11 +27,13 @@ import io.swagger.annotations.Example;
import io.swagger.annotations.ExampleProperty;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -65,6 +67,7 @@ import
org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
+import
org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
import org.apache.pulsar.common.util.FutureUtil;
@@ -721,10 +724,13 @@ public class ClustersBase extends AdminResource {
.setIsolationDataWithCreateAsync(cluster, (p) -> Collections.emptyMap())
.thenApply(__ -> new
NamespaceIsolationPolicies()))
).thenCompose(nsIsolationPolicies -> {
+ NamespaceIsolationDataImpl oldPolicy = nsIsolationPolicies
+ .getPolicies().getOrDefault(policyName, null);
nsIsolationPolicies.setPolicy(policyName, policyData);
return namespaceIsolationPolicies()
- .setIsolationDataAsync(cluster, old ->
nsIsolationPolicies.getPolicies());
- }).thenCompose(__ ->
filterAndUnloadMatchedNamespaceAsync(cluster, policyData))
+ .setIsolationDataAsync(cluster, old ->
nsIsolationPolicies.getPolicies())
+ .thenApply(__ -> oldPolicy);
+ }).thenCompose(oldPolicy ->
filterAndUnloadMatchedNamespaceAsync(cluster, policyData, oldPolicy))
.thenAccept(__ -> {
log.info("[{}] Successful to update
clusters/{}/namespaceIsolationPolicies/{}.",
clientAppId(), cluster, policyName);
@@ -759,7 +765,13 @@ public class ClustersBase extends AdminResource {
* Get matched namespaces; call unload for each namespaces.
*/
private CompletableFuture<Void>
filterAndUnloadMatchedNamespaceAsync(String cluster,
-
NamespaceIsolationDataImpl policyData) {
+
NamespaceIsolationDataImpl policyData,
+
NamespaceIsolationDataImpl oldPolicy) {
+ // exit early if none of the namespaces need to be unloaded
+ if
(NamespaceIsolationPolicyUnloadScope.none.equals(policyData.getUnloadScope())) {
+ return CompletableFuture.completedFuture(null);
+ }
+
PulsarAdmin adminClient;
try {
adminClient = pulsar().getAdminClient();
@@ -768,6 +780,7 @@ public class ClustersBase extends AdminResource {
}
// compile regex patterns once
List<Pattern> namespacePatterns =
policyData.getNamespaces().stream().map(Pattern::compile).toList();
+ // TODO for 4.x, we should include both old and new namespace regex
pattern for unload `all_matching` option
return adminClient.tenants().getTenantsAsync().thenCompose(tenants -> {
List<CompletableFuture<List<String>>>
filteredNamespacesForEachTenant = tenants.stream()
.map(tenant ->
adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> {
@@ -793,6 +806,41 @@ public class ClustersBase extends AdminResource {
if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) {
return CompletableFuture.completedFuture(null);
}
+ // If unload type is 'changed', we need to figure out a further
subset of namespaces whose placement might
+ // actually have been changed.
+
+ log.debug("Old policy: {} ; new policy: {}", oldPolicy,
policyData);
+ if (oldPolicy != null &&
NamespaceIsolationPolicyUnloadScope.changed.equals(policyData.getUnloadScope()))
{
+ // We also compare that the previous primary broker list is
same as current, in case all namespaces need
+ // to be placed again anyway.
+ if (CollectionUtils.isEqualCollection(oldPolicy.getPrimary(),
policyData.getPrimary())) {
+ // list is same, so we continue finding the changed
namespaces.
+
+ // We create a union regex list contains old + new regexes
+ Set<String> combinedNamespaces = new
HashSet<>(oldPolicy.getNamespaces());
+ combinedNamespaces.addAll(policyData.getNamespaces());
+ // We create a intersection of the old and new regexes.
These won't need to be unloaded
+ Set<String> commonNamespaces = new
HashSet<>(oldPolicy.getNamespaces());
+ commonNamespaces.retainAll(policyData.getNamespaces());
+
+ log.debug("combined regexes: {}; common regexes:{}",
combinedNamespaces, combinedNamespaces);
+
+ // Find the changed regexes (new - new ∩ old). TODO for
4.x, make this (new U old - new ∩ old)
+ combinedNamespaces.removeAll(commonNamespaces);
+
+ log.debug("changed regexes: {}", commonNamespaces);
+
+ // Now we further filter the filtered namespaces based on
this combinedNamespaces set
+ shouldUnloadNamespaces = shouldUnloadNamespaces.stream()
+ .filter(name -> combinedNamespaces.stream()
+ .map(Pattern::compile)
+ .anyMatch(pattern ->
pattern.matcher(name).matches())
+ ).toList();
+
+ }
+ }
+ // unload type is either null or not in (changed, none), so we
proceed to unload all namespaces
+ // TODO - default in 4.x should become `changed`
List<CompletableFuture<Void>> futures =
shouldUnloadNamespaces.stream()
.map(namespaceName ->
adminClient.namespaces().unloadAsync(namespaceName))
.collect(Collectors.toList());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 40e2ca8cce9..155994c814c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -22,6 +22,7 @@ import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
import static
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
+import static
org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
@@ -53,6 +54,7 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.NotAcceptableException;
@@ -109,27 +111,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
-import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
-import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
-import org.apache.pulsar.common.policies.data.BacklogQuota;
-import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
-import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
-import org.apache.pulsar.common.policies.data.BundlesData;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ConsumerStats;
-import org.apache.pulsar.common.policies.data.EntryFilters;
-import org.apache.pulsar.common.policies.data.FailureDomain;
-import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
-import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
-import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
-import org.apache.pulsar.common.policies.data.PersistencePolicies;
-import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
-import org.apache.pulsar.common.policies.data.RetentionPolicies;
-import org.apache.pulsar.common.policies.data.SubscriptionStats;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.policies.data.*;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -3496,4 +3478,188 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
// cleanup.
admin.topics().deletePartitionedTopic(partitionedTp);
}
+
+ private NamespaceIsolationData
createPolicyData(NamespaceIsolationPolicyUnloadScope scope, List<String>
namespaces,
+ List<String> primaryBrokers
+ ) {
+ // setup ns-isolation-policy in both the clusters.
+ Map<String, String> parameters1 = new HashMap<>();
+ parameters1.put("min_limit", "1");
+ parameters1.put("usage_threshold", "100");
+ List<String> nsRegexList = new ArrayList<>(namespaces);
+
+ return NamespaceIsolationData.builder()
+ // "prop-ig/ns1" is present in test cluster, policy set on
test2 should work
+ .namespaces(nsRegexList)
+ .primary(primaryBrokers)
+ .secondary(Collections.singletonList(""))
+ .autoFailoverPolicy(AutoFailoverPolicyData.builder()
+ .policyType(AutoFailoverPolicyType.min_available)
+ .parameters(parameters1)
+ .build())
+ .unloadScope(scope)
+ .build();
+ }
+
+ private boolean allTopicsUnloaded(List<String> topics) {
+ for (String topic : topics) {
+ if
(pulsar.getBrokerService().getTopicReference(topic).isPresent()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void loadTopics(List<String> topics) throws PulsarClientException,
ExecutionException, InterruptedException {
+ // create a topic by creating a producer so that the topic is present
on the broker
+ for (String topic : topics) {
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
+ producer.close();
+ pulsar.getBrokerService().getTopicIfExists(topic).get();
+ }
+
+ // All namespaces are loaded onto broker. Assert that
+ for (String topic : topics) {
+
assertTrue(pulsar.getBrokerService().getTopicReference(topic).isPresent());
+ }
+ }
+
+ /**
+ * Validates that the namespace isolation policy set and update is
unloading only the relevant namespaces based on
+ * the unload scope provided.
+ *
+ * @param topicType persistent or non persistent.
+ * @param policyName policy name.
+ * @param nsPrefix unique namespace prefix.
+ * @param totalNamespaces total namespaces to create. Only the end part.
Each namespace also gets a topic t1.
+ * @param initialScope unload scope while creating the policy.
+ * @param initialNamespaceRegex namespace regex while creating the policy.
+ * @param initialLoadedNS expected namespaces to be still loaded after the
policy create call. Remaining namespaces
+ * will be asserted to be unloaded within 20
seconds.
+ * @param updatedScope unload scope while updating the policy.
+ * @param updatedNamespaceRegex namespace regex while updating the policy.
+ * @param updatedLoadedNS expected namespaces to be loaded after policy
update call. Remaining namespaces will be
+ * asserted to be unloaded within 20 seconds.
+ * @throws PulsarAdminException
+ * @throws PulsarClientException
+ * @throws ExecutionException
+ * @throws InterruptedException
+ */
+ private void testIsolationPolicyUnloadsNSWithScope(String topicType,
String policyName, String nsPrefix,
+ List<String>
totalNamespaces,
+
NamespaceIsolationPolicyUnloadScope initialScope,
+ List<String>
initialNamespaceRegex, List<String> initialLoadedNS,
+
NamespaceIsolationPolicyUnloadScope updatedScope,
+ List<String>
updatedNamespaceRegex, List<String> updatedLoadedNS,
+ List<String>
updatedBrokerRegex)
+ throws PulsarAdminException, PulsarClientException,
ExecutionException, InterruptedException {
+
+ // Create all namespaces
+ List<String> allTopics = new ArrayList<>();
+ for (String namespacePart: totalNamespaces) {
+ admin.namespaces().createNamespace(nsPrefix + namespacePart,
Set.of("test"));
+ allTopics.add(topicType + "://" + nsPrefix + namespacePart +
"/t1");
+ }
+ // Load all topics so that they are present. Assume topic t1 under
each namespace
+ loadTopics(allTopics);
+
+ // Create the policy
+ NamespaceIsolationData nsPolicyData1 = createPolicyData(
+ initialScope, initialNamespaceRegex,
Collections.singletonList(".*")
+ );
+ admin.clusters().createNamespaceIsolationPolicy("test", policyName,
nsPolicyData1);
+
+ List<String> initialLoadedTopics = new ArrayList<>();
+ for (String namespacePart: initialLoadedNS) {
+ initialLoadedTopics.add(topicType + "://" + nsPrefix +
namespacePart + "/t1");
+ }
+
+ List<String> initialUnloadedTopics = new ArrayList<>(allTopics);
+ initialUnloadedTopics.removeAll(initialLoadedTopics);
+
+ // Assert that all topics (and thus ns) not under initialLoadedNS
namespaces are unloaded
+ if (initialUnloadedTopics.isEmpty()) {
+ // Just wait a bit to ensure we don't miss lazy unloading of
topics we expect not to unload
+ TimeUnit.SECONDS.sleep(5);
+ } else {
+ Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .until(() -> allTopicsUnloaded(initialUnloadedTopics));
+ }
+ // Assert that all topics under initialLoadedNS are still present
+ initialLoadedTopics.forEach(t ->
assertTrue(pulsar.getBrokerService().getTopicReference(t).isPresent()));
+
+ // Load the topics again
+ loadTopics(allTopics);
+
+ // Update policy using updatedScope with updated namespace regex
+ nsPolicyData1 = createPolicyData(updatedScope, updatedNamespaceRegex,
updatedBrokerRegex);
+ admin.clusters().updateNamespaceIsolationPolicy("test", policyName,
nsPolicyData1);
+
+ List<String> updatedLoadedTopics = new ArrayList<>();
+ for (String namespacePart : updatedLoadedNS) {
+ updatedLoadedTopics.add(topicType + "://" + nsPrefix +
namespacePart + "/t1");
+ }
+
+ List<String> updatedUnloadedTopics = new ArrayList<>(allTopics);
+ updatedUnloadedTopics.removeAll(updatedLoadedTopics);
+
+ // Assert that all topics (and thus ns) not under updatedLoadedNS
namespaces are unloaded
+ if (updatedUnloadedTopics.isEmpty()) {
+ // Just wait a bit to ensure we don't miss lazy unloading of
topics we expect not to unload
+ TimeUnit.SECONDS.sleep(5);
+ } else {
+ Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .until(() -> allTopicsUnloaded(updatedUnloadedTopics));
+ }
+ // Assert that all topics under updatedLoadedNS are still present
+ updatedLoadedTopics.forEach(t ->
assertTrue(pulsar.getBrokerService().getTopicReference(t).isPresent()));
+
+ }
+
+ @Test(dataProvider = "topicType")
+ public void testIsolationPolicyUnloadsNSWithAllScope(final String
topicType) throws Exception {
+ String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
+ testIsolationPolicyUnloadsNSWithScope(
+ topicType, "policy-all", nsPrefix, List.of("a1", "a2", "b1",
"b2", "c1"),
+ all_matching, List.of(".*-unload-test-a.*"), List.of("b1",
"b2", "c1"),
+ all_matching, List.of(".*-unload-test-a.*",
".*-unload-test-c.*"), List.of("b1", "b2"),
+ Collections.singletonList(".*")
+ );
+ }
+
+ @Test(dataProvider = "topicType")
+ public void testIsolationPolicyUnloadsNSWithChangedScope(final String
topicType) throws Exception {
+ String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
+ testIsolationPolicyUnloadsNSWithScope(
+ topicType, "policy-changed", nsPrefix, List.of("a1", "a2",
"b1", "b2", "c1"),
+ all_matching, List.of(".*-unload-test-a.*"), List.of("b1",
"b2", "c1"),
+ changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"),
List.of("a1", "a2", "b1", "b2"),
+ Collections.singletonList(".*")
+ );
+ }
+
+ @Test(dataProvider = "topicType")
+ public void testIsolationPolicyUnloadsNSWithNoneScope(final String
topicType) throws Exception {
+ String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
+ testIsolationPolicyUnloadsNSWithScope(
+ topicType, "policy-none", nsPrefix, List.of("a1", "a2", "b1",
"b2", "c1"),
+ all_matching, List.of(".*-unload-test-a.*"), List.of("b1",
"b2", "c1"),
+ none, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"),
List.of("a1", "a2", "b1", "b2", "c1"),
+ Collections.singletonList(".*")
+ );
+ }
+
+ @Test(dataProvider = "topicType")
+ public void testIsolationPolicyUnloadsNSWithPrimaryChanged(final String
topicType) throws Exception {
+ String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
+ // As per changed flag, only c1 should unload, but due to primary
change, both a* and c* will.
+ testIsolationPolicyUnloadsNSWithScope(
+ topicType, "policy-primary-changed", nsPrefix, List.of("a1",
"a2", "b1", "b2", "c1"),
+ all_matching, List.of(".*-unload-test-a.*"), List.of("b1",
"b2", "c1"),
+ changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"),
List.of("b1", "b2"),
+ List.of(".*", "broker.*")
+ );
+ }
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java
index aa48e69c145..4f367f72fda 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java
@@ -31,6 +31,8 @@ public interface NamespaceIsolationData {
AutoFailoverPolicyData getAutoFailoverPolicy();
+ NamespaceIsolationPolicyUnloadScope getUnloadScope();
+
void validate();
interface Builder {
@@ -42,6 +44,8 @@ public interface NamespaceIsolationData {
Builder autoFailoverPolicy(AutoFailoverPolicyData
autoFailoverPolicyData);
+ Builder unloadScope(NamespaceIsolationPolicyUnloadScope unloadScope);
+
NamespaceIsolationData build();
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationPolicyUnloadScope.java
similarity index 53%
copy from
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java
copy to
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationPolicyUnloadScope.java
index aa48e69c145..2edeac45630 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationPolicyUnloadScope.java
@@ -18,34 +18,20 @@
*/
package org.apache.pulsar.common.policies.data;
-import java.util.List;
-import org.apache.pulsar.client.admin.utils.ReflectionUtils;
-
-public interface NamespaceIsolationData {
-
- List<String> getNamespaces();
-
- List<String> getPrimary();
-
- List<String> getSecondary();
-
- AutoFailoverPolicyData getAutoFailoverPolicy();
-
- void validate();
-
- interface Builder {
- Builder namespaces(List<String> namespaces);
-
- Builder primary(List<String> primary);
-
- Builder secondary(List<String> secondary);
-
- Builder autoFailoverPolicy(AutoFailoverPolicyData
autoFailoverPolicyData);
-
- NamespaceIsolationData build();
- }
-
- static Builder builder() {
- return
ReflectionUtils.newBuilder("org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl");
+/**
+ * The type of unload to perform while setting the isolation policy.
+ */
+public enum NamespaceIsolationPolicyUnloadScope {
+ all_matching, // unloads all matching namespaces as per new regex
+ none, // unloads no namespaces
+ changed; // unloads only the namespaces which are newly added or
removed from the regex list
+
+ public static NamespaceIsolationPolicyUnloadScope fromString(String
unloadScopeString) {
+ for (NamespaceIsolationPolicyUnloadScope unloadScope :
NamespaceIsolationPolicyUnloadScope.values()) {
+ if (unloadScope.toString().equalsIgnoreCase(unloadScopeString)) {
+ return unloadScope;
+ }
+ }
+ return null;
}
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java
index e9896decd8c..0f5f6b211a5 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java
@@ -32,6 +32,7 @@ import
org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
+import
org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import picocli.CommandLine.Parameters;
@@ -73,10 +74,19 @@ public class CmdNamespaceIsolationPolicy extends CmdBase {
required = true, split = ",")
private Map<String, String> autoFailoverPolicyParams;
+ @Option(names = "--unload-scope", description = "configure the type of
unload to do -"
+ + " ['all_matching', 'none', 'changed'] namespaces. By
default, all namespaces matching the namespaces"
+ + " regex will be unloaded and placed again. You can choose to
not unload any namespace while setting"
+ + " this new policy by choosing `none` or choose to unload
only the namespaces whose placement will"
+ + " actually change. If you chose 'none', you will need to
manually unload the namespaces for them to"
+ + " be placed correctly, or wait till some namespaces get load
balanced automatically based on load"
+ + " shedding configurations.")
+ private NamespaceIsolationPolicyUnloadScope unloadScope;
+
void run() throws PulsarAdminException {
// validate and create the POJO
NamespaceIsolationData namespaceIsolationData =
createNamespaceIsolationData(namespaces, primary, secondary,
- autoFailoverPolicyTypeName, autoFailoverPolicyParams);
+ autoFailoverPolicyTypeName, autoFailoverPolicyParams,
unloadScope);
getAdmin().clusters().createNamespaceIsolationPolicy(clusterName,
policyName, namespaceIsolationData);
}
@@ -167,7 +177,8 @@ public class CmdNamespaceIsolationPolicy extends CmdBase {
List<String>
primary,
List<String>
secondary,
String
autoFailoverPolicyTypeName,
- Map<String,
String> autoFailoverPolicyParams) {
+ Map<String,
String> autoFailoverPolicyParams,
+
NamespaceIsolationPolicyUnloadScope unload) {
// validate
namespaces = validateList(namespaces);
@@ -234,6 +245,8 @@ public class CmdNamespaceIsolationPolicy extends CmdBase {
throw new ParameterException("Unknown auto failover policy type
specified : " + autoFailoverPolicyTypeName);
}
+ nsIsolationDataBuilder.unloadScope(unload);
+
return nsIsolationDataBuilder.build();
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java
index bd28d30d4ce..52480d91eef 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.SortedSet;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.BrokerStatus;
+import
org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope;
/**
* Namespace isolation policy.
@@ -43,6 +44,11 @@ public interface NamespaceIsolationPolicy {
*/
List<String> getSecondaryBrokers();
+ /**
+ * Get the unload scope for the policy set call.
+ */
+ NamespaceIsolationPolicyUnloadScope getUnloadScope();
+
/**
* Get the list of primary brokers for the namespace according to the
policy.
*
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java
index bdb51f63f89..1e72f0e50ee 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java
@@ -75,6 +75,15 @@ public class NamespaceIsolationDataImpl implements
NamespaceIsolationData {
@JsonProperty("auto_failover_policy")
private AutoFailoverPolicyData autoFailoverPolicy;
+ @ApiModelProperty(
+ name = "unload_scope",
+ value = "The type of unload to perform while applying the new
isolation policy.",
+ example = "'all_matching' (default) for unloading all matching
namespaces. 'none' for not unloading "
+ + "any namespace. 'changed' for unloading only the
namespaces whose placement is actually changing"
+ )
+ @JsonProperty("unload_scope")
+ private NamespaceIsolationPolicyUnloadScope unloadScope;
+
public static NamespaceIsolationDataImplBuilder builder() {
return new NamespaceIsolationDataImplBuilder();
}
@@ -106,6 +115,7 @@ public class NamespaceIsolationDataImpl implements
NamespaceIsolationData {
private List<String> primary = new ArrayList<>();
private List<String> secondary = new ArrayList<>();
private AutoFailoverPolicyData autoFailoverPolicy;
+ private NamespaceIsolationPolicyUnloadScope unloadScope;
public NamespaceIsolationDataImplBuilder namespaces(List<String>
namespaces) {
this.namespaces = namespaces;
@@ -127,8 +137,13 @@ public class NamespaceIsolationDataImpl implements
NamespaceIsolationData {
return this;
}
+ public NamespaceIsolationDataImplBuilder
unloadScope(NamespaceIsolationPolicyUnloadScope unloadScope) {
+ this.unloadScope = unloadScope;
+ return this;
+ }
+
public NamespaceIsolationDataImpl build() {
- return new NamespaceIsolationDataImpl(namespaces, primary,
secondary, autoFailoverPolicy);
+ return new NamespaceIsolationDataImpl(namespaces, primary,
secondary, autoFailoverPolicy, unloadScope);
}
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java
index af3663869fa..440282f29cb 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.common.policies.AutoFailoverPolicy;
import org.apache.pulsar.common.policies.NamespaceIsolationPolicy;
import org.apache.pulsar.common.policies.data.BrokerStatus;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
+import
org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope;
/**
* Implementation of the namespace isolation policy.
@@ -39,6 +40,7 @@ public class NamespaceIsolationPolicyImpl implements
NamespaceIsolationPolicy {
private List<String> primary;
private List<String> secondary;
private AutoFailoverPolicy autoFailoverPolicy;
+ private NamespaceIsolationPolicyUnloadScope unloadScope;
private boolean matchNamespaces(String fqnn) {
for (String nsRegex : namespaces) {
@@ -64,6 +66,7 @@ public class NamespaceIsolationPolicyImpl implements
NamespaceIsolationPolicy {
this.primary = policyData.getPrimary();
this.secondary = policyData.getSecondary();
this.autoFailoverPolicy =
AutoFailoverPolicyFactory.create(policyData.getAutoFailoverPolicy());
+ this.unloadScope = policyData.getUnloadScope();
}
@Override
@@ -76,6 +79,11 @@ public class NamespaceIsolationPolicyImpl implements
NamespaceIsolationPolicy {
return this.secondary;
}
+ @Override
+ public NamespaceIsolationPolicyUnloadScope getUnloadScope() {
+ return this.unloadScope;
+ }
+
@Override
public List<URL> findPrimaryBrokers(List<URL> availableBrokers,
NamespaceName namespace) {
if (!this.matchNamespaces(namespace.toString())) {