This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new f9283dab56c [improve][admin] PIP-369 Introduce `unload` flag in 
`ns-isolation-policy set` call (#23120)
f9283dab56c is described below

commit f9283dab56cc96a4202c98f37342ebc302acac3a
Author: Girish Sharma <[email protected]>
AuthorDate: Sun Sep 1 09:49:45 2024 +0530

    [improve][admin] PIP-369 Introduce `unload` flag in `ns-isolation-policy 
set` call (#23120)
    
    Co-authored-by: Zixuan Liu <[email protected]>
    (cherry picked from commit 8da3bf8322c536c495541c80926cdf9389612515)
---
 .../pulsar/broker/admin/impl/ClustersBase.java     |  54 +++++-
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 208 ++++++++++++++++++---
 .../policies/data/NamespaceIsolationData.java      |   4 +
 ...va => NamespaceIsolationPolicyUnloadScope.java} |  44 ++---
 .../admin/cli/CmdNamespaceIsolationPolicy.java     |  17 +-
 .../common/policies/NamespaceIsolationPolicy.java  |   6 +
 .../policies/data/NamespaceIsolationDataImpl.java  |  17 +-
 .../impl/NamespaceIsolationPolicyImpl.java         |   8 +
 8 files changed, 302 insertions(+), 56 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 4fe8a01e679..132c99ce16b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -27,11 +27,13 @@ import io.swagger.annotations.Example;
 import io.swagger.annotations.ExampleProperty;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -65,6 +67,7 @@ import 
org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
 import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl;
 import org.apache.pulsar.common.policies.data.FailureDomainImpl;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
+import 
org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -721,10 +724,13 @@ public class ClustersBase extends AdminResource {
                                         
.setIsolationDataWithCreateAsync(cluster, (p) -> Collections.emptyMap())
                                         .thenApply(__ -> new 
NamespaceIsolationPolicies()))
                 ).thenCompose(nsIsolationPolicies -> {
+                    NamespaceIsolationDataImpl oldPolicy = nsIsolationPolicies
+                            .getPolicies().getOrDefault(policyName, null);
                     nsIsolationPolicies.setPolicy(policyName, policyData);
                     return namespaceIsolationPolicies()
-                            .setIsolationDataAsync(cluster, old -> 
nsIsolationPolicies.getPolicies());
-                }).thenCompose(__ -> 
filterAndUnloadMatchedNamespaceAsync(cluster, policyData))
+                            .setIsolationDataAsync(cluster, old -> 
nsIsolationPolicies.getPolicies())
+                            .thenApply(__ -> oldPolicy);
+                }).thenCompose(oldPolicy -> 
filterAndUnloadMatchedNamespaceAsync(cluster, policyData, oldPolicy))
                 .thenAccept(__ -> {
                     log.info("[{}] Successful to update 
clusters/{}/namespaceIsolationPolicies/{}.",
                             clientAppId(), cluster, policyName);
@@ -759,7 +765,13 @@ public class ClustersBase extends AdminResource {
      * Get matched namespaces; call unload for each namespaces.
      */
     private CompletableFuture<Void> 
filterAndUnloadMatchedNamespaceAsync(String cluster,
-                                                                         
NamespaceIsolationDataImpl policyData) {
+                                                                         
NamespaceIsolationDataImpl policyData,
+                                                                         
NamespaceIsolationDataImpl oldPolicy) {
+        // exit early if none of the namespaces need to be unloaded
+        if 
(NamespaceIsolationPolicyUnloadScope.none.equals(policyData.getUnloadScope())) {
+            return CompletableFuture.completedFuture(null);
+        }
+
         PulsarAdmin adminClient;
         try {
             adminClient = pulsar().getAdminClient();
@@ -768,6 +780,7 @@ public class ClustersBase extends AdminResource {
         }
         // compile regex patterns once
         List<Pattern> namespacePatterns = 
policyData.getNamespaces().stream().map(Pattern::compile).toList();
+        // TODO for 4.x, we should include both old and new namespace regex 
pattern for unload `all_matching` option
         return adminClient.tenants().getTenantsAsync().thenCompose(tenants -> {
             List<CompletableFuture<List<String>>> 
filteredNamespacesForEachTenant = tenants.stream()
                     .map(tenant -> 
adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> {
@@ -793,6 +806,41 @@ public class ClustersBase extends AdminResource {
             if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) {
                 return CompletableFuture.completedFuture(null);
             }
+            // If unload type is 'changed', we need to figure out a further 
subset of namespaces whose placement might
+            // actually have been changed.
+
+            log.debug("Old policy: {} ; new policy: {}", oldPolicy, 
policyData);
+            if (oldPolicy != null && 
NamespaceIsolationPolicyUnloadScope.changed.equals(policyData.getUnloadScope()))
 {
+                // We also compare that the previous primary broker list is 
same as current, in case all namespaces need
+                // to be placed again anyway.
+                if (CollectionUtils.isEqualCollection(oldPolicy.getPrimary(), 
policyData.getPrimary())) {
+                    // list is same, so we continue finding the changed 
namespaces.
+
+                    // We create a union regex list contains old + new regexes
+                    Set<String> combinedNamespaces = new 
HashSet<>(oldPolicy.getNamespaces());
+                    combinedNamespaces.addAll(policyData.getNamespaces());
+                    // We create a intersection of the old and new regexes. 
These won't need to be unloaded
+                    Set<String> commonNamespaces = new 
HashSet<>(oldPolicy.getNamespaces());
+                    commonNamespaces.retainAll(policyData.getNamespaces());
+
+                    log.debug("combined regexes: {}; common regexes:{}", 
combinedNamespaces, combinedNamespaces);
+
+                    // Find the changed regexes (new - new ∩ old). TODO for 
4.x, make this (new U old - new ∩ old)
+                    combinedNamespaces.removeAll(commonNamespaces);
+
+                    log.debug("changed regexes: {}", commonNamespaces);
+
+                    // Now we further filter the filtered namespaces based on 
this combinedNamespaces set
+                    shouldUnloadNamespaces = shouldUnloadNamespaces.stream()
+                            .filter(name -> combinedNamespaces.stream()
+                                    .map(Pattern::compile)
+                                    .anyMatch(pattern -> 
pattern.matcher(name).matches())
+                            ).toList();
+
+                }
+            }
+            // unload type is either null or not in (changed, none), so we 
proceed to unload all namespaces
+            // TODO - default in 4.x should become `changed`
             List<CompletableFuture<Void>> futures = 
shouldUnloadNamespaces.stream()
                     .map(namespaceName -> 
adminClient.namespaces().unloadAsync(namespaceName))
                     .collect(Collectors.toList());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 40e2ca8cce9..155994c814c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -22,6 +22,7 @@ import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
 import static 
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
+import static 
org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope.*;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
@@ -53,6 +54,7 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.ws.rs.NotAcceptableException;
@@ -109,27 +111,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
-import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
-import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
-import org.apache.pulsar.common.policies.data.BacklogQuota;
-import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
-import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
-import org.apache.pulsar.common.policies.data.BundlesData;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ConsumerStats;
-import org.apache.pulsar.common.policies.data.EntryFilters;
-import org.apache.pulsar.common.policies.data.FailureDomain;
-import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
-import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
-import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
-import org.apache.pulsar.common.policies.data.PersistencePolicies;
-import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
-import org.apache.pulsar.common.policies.data.RetentionPolicies;
-import org.apache.pulsar.common.policies.data.SubscriptionStats;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.policies.data.*;
 import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -3496,4 +3478,188 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         // cleanup.
         admin.topics().deletePartitionedTopic(partitionedTp);
     }
+
+    private NamespaceIsolationData 
createPolicyData(NamespaceIsolationPolicyUnloadScope scope, List<String> 
namespaces,
+                                                    List<String> primaryBrokers
+    ) {
+        // setup ns-isolation-policy in both the clusters.
+        Map<String, String> parameters1 = new HashMap<>();
+        parameters1.put("min_limit", "1");
+        parameters1.put("usage_threshold", "100");
+        List<String> nsRegexList = new ArrayList<>(namespaces);
+
+        return NamespaceIsolationData.builder()
+                // "prop-ig/ns1" is present in test cluster, policy set on 
test2 should work
+                .namespaces(nsRegexList)
+                .primary(primaryBrokers)
+                .secondary(Collections.singletonList(""))
+                .autoFailoverPolicy(AutoFailoverPolicyData.builder()
+                        .policyType(AutoFailoverPolicyType.min_available)
+                        .parameters(parameters1)
+                        .build())
+                .unloadScope(scope)
+                .build();
+    }
+
+    private boolean allTopicsUnloaded(List<String> topics) {
+        for (String topic : topics) {
+            if 
(pulsar.getBrokerService().getTopicReference(topic).isPresent()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void loadTopics(List<String> topics) throws PulsarClientException, 
ExecutionException, InterruptedException {
+        // create a topic by creating a producer so that the topic is present 
on the broker
+        for (String topic : topics) {
+            Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+            producer.close();
+            pulsar.getBrokerService().getTopicIfExists(topic).get();
+        }
+
+        // All namespaces are loaded onto broker. Assert that
+        for (String topic : topics) {
+            
assertTrue(pulsar.getBrokerService().getTopicReference(topic).isPresent());
+        }
+    }
+
+    /**
+     * Validates that the namespace isolation policy set and update is 
unloading only the relevant namespaces based on
+     * the unload scope provided.
+     *
+     * @param topicType persistent or non persistent.
+     * @param policyName policy name.
+     * @param nsPrefix unique namespace prefix.
+     * @param totalNamespaces total namespaces to create. Only the end part. 
Each namespace also gets a topic t1.
+     * @param initialScope unload scope while creating the policy.
+     * @param initialNamespaceRegex namespace regex while creating the policy.
+     * @param initialLoadedNS expected namespaces to be still loaded after the 
policy create call. Remaining namespaces
+     *                        will be asserted to be unloaded within 20 
seconds.
+     * @param updatedScope unload scope while updating the policy.
+     * @param updatedNamespaceRegex namespace regex while updating the policy.
+     * @param updatedLoadedNS expected namespaces to be loaded after policy 
update call. Remaining namespaces will be
+     *                        asserted to be unloaded within 20 seconds.
+     * @throws PulsarAdminException
+     * @throws PulsarClientException
+     * @throws ExecutionException
+     * @throws InterruptedException
+     */
+    private void testIsolationPolicyUnloadsNSWithScope(String topicType, 
String policyName, String nsPrefix,
+                                                       List<String> 
totalNamespaces,
+                                                       
NamespaceIsolationPolicyUnloadScope initialScope,
+                                                       List<String> 
initialNamespaceRegex, List<String> initialLoadedNS,
+                                                       
NamespaceIsolationPolicyUnloadScope updatedScope,
+                                                       List<String> 
updatedNamespaceRegex, List<String> updatedLoadedNS,
+                                                       List<String> 
updatedBrokerRegex)
+            throws PulsarAdminException, PulsarClientException, 
ExecutionException, InterruptedException {
+
+        // Create all namespaces
+        List<String> allTopics = new ArrayList<>();
+        for (String namespacePart: totalNamespaces) {
+            admin.namespaces().createNamespace(nsPrefix + namespacePart, 
Set.of("test"));
+            allTopics.add(topicType + "://" + nsPrefix + namespacePart + 
"/t1");
+        }
+        // Load all topics so that they are present. Assume topic t1 under 
each namespace
+        loadTopics(allTopics);
+
+        // Create the policy
+        NamespaceIsolationData nsPolicyData1 = createPolicyData(
+                initialScope, initialNamespaceRegex, 
Collections.singletonList(".*")
+        );
+        admin.clusters().createNamespaceIsolationPolicy("test", policyName, 
nsPolicyData1);
+
+        List<String> initialLoadedTopics = new ArrayList<>();
+        for (String namespacePart: initialLoadedNS) {
+            initialLoadedTopics.add(topicType + "://" + nsPrefix + 
namespacePart + "/t1");
+        }
+
+        List<String> initialUnloadedTopics = new ArrayList<>(allTopics);
+        initialUnloadedTopics.removeAll(initialLoadedTopics);
+
+        // Assert that all topics (and thus ns) not under initialLoadedNS 
namespaces are unloaded
+        if (initialUnloadedTopics.isEmpty()) {
+            // Just wait a bit to ensure we don't miss lazy unloading of 
topics we expect not to unload
+            TimeUnit.SECONDS.sleep(5);
+        } else {
+            Awaitility.await()
+                    .atMost(10, TimeUnit.SECONDS)
+                    .until(() -> allTopicsUnloaded(initialUnloadedTopics));
+        }
+        // Assert that all topics under initialLoadedNS are still present
+        initialLoadedTopics.forEach(t -> 
assertTrue(pulsar.getBrokerService().getTopicReference(t).isPresent()));
+
+        // Load the topics again
+        loadTopics(allTopics);
+
+        // Update policy using updatedScope with updated namespace regex
+        nsPolicyData1 = createPolicyData(updatedScope, updatedNamespaceRegex, 
updatedBrokerRegex);
+        admin.clusters().updateNamespaceIsolationPolicy("test", policyName, 
nsPolicyData1);
+
+        List<String> updatedLoadedTopics = new ArrayList<>();
+        for (String namespacePart : updatedLoadedNS) {
+            updatedLoadedTopics.add(topicType + "://" + nsPrefix + 
namespacePart + "/t1");
+        }
+
+        List<String> updatedUnloadedTopics = new ArrayList<>(allTopics);
+        updatedUnloadedTopics.removeAll(updatedLoadedTopics);
+
+        // Assert that all topics (and thus ns) not under updatedLoadedNS 
namespaces are unloaded
+        if (updatedUnloadedTopics.isEmpty()) {
+            // Just wait a bit to ensure we don't miss lazy unloading of 
topics we expect not to unload
+            TimeUnit.SECONDS.sleep(5);
+        } else {
+            Awaitility.await()
+                    .atMost(10, TimeUnit.SECONDS)
+                    .until(() -> allTopicsUnloaded(updatedUnloadedTopics));
+        }
+        // Assert that all topics under updatedLoadedNS are still present
+        updatedLoadedTopics.forEach(t -> 
assertTrue(pulsar.getBrokerService().getTopicReference(t).isPresent()));
+
+    }
+
+    @Test(dataProvider = "topicType")
+    public void testIsolationPolicyUnloadsNSWithAllScope(final String 
topicType) throws Exception {
+        String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
+        testIsolationPolicyUnloadsNSWithScope(
+                topicType, "policy-all", nsPrefix, List.of("a1", "a2", "b1", 
"b2", "c1"),
+                all_matching, List.of(".*-unload-test-a.*"), List.of("b1", 
"b2", "c1"),
+                all_matching, List.of(".*-unload-test-a.*", 
".*-unload-test-c.*"), List.of("b1", "b2"),
+                Collections.singletonList(".*")
+        );
+    }
+
+    @Test(dataProvider = "topicType")
+    public void testIsolationPolicyUnloadsNSWithChangedScope(final String 
topicType) throws Exception {
+        String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
+        testIsolationPolicyUnloadsNSWithScope(
+                topicType, "policy-changed", nsPrefix, List.of("a1", "a2", 
"b1", "b2", "c1"),
+                all_matching, List.of(".*-unload-test-a.*"), List.of("b1", 
"b2", "c1"),
+                changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), 
List.of("a1", "a2", "b1", "b2"),
+                Collections.singletonList(".*")
+        );
+    }
+
+    @Test(dataProvider = "topicType")
+    public void testIsolationPolicyUnloadsNSWithNoneScope(final String 
topicType) throws Exception {
+        String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
+        testIsolationPolicyUnloadsNSWithScope(
+                topicType, "policy-none", nsPrefix, List.of("a1", "a2", "b1", 
"b2", "c1"),
+                all_matching, List.of(".*-unload-test-a.*"), List.of("b1", 
"b2", "c1"),
+                none, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), 
List.of("a1", "a2", "b1", "b2", "c1"),
+                Collections.singletonList(".*")
+        );
+    }
+
+    @Test(dataProvider = "topicType")
+    public void testIsolationPolicyUnloadsNSWithPrimaryChanged(final String 
topicType) throws Exception {
+        String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
+        // As per changed flag, only c1 should unload, but due to primary 
change, both a* and c* will.
+        testIsolationPolicyUnloadsNSWithScope(
+                topicType, "policy-primary-changed", nsPrefix, List.of("a1", 
"a2", "b1", "b2", "c1"),
+                all_matching, List.of(".*-unload-test-a.*"), List.of("b1", 
"b2", "c1"),
+                changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), 
List.of("b1", "b2"),
+                List.of(".*", "broker.*")
+        );
+    }
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java
index aa48e69c145..4f367f72fda 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java
@@ -31,6 +31,8 @@ public interface NamespaceIsolationData {
 
     AutoFailoverPolicyData getAutoFailoverPolicy();
 
+    NamespaceIsolationPolicyUnloadScope getUnloadScope();
+
     void validate();
 
     interface Builder {
@@ -42,6 +44,8 @@ public interface NamespaceIsolationData {
 
         Builder autoFailoverPolicy(AutoFailoverPolicyData 
autoFailoverPolicyData);
 
+        Builder unloadScope(NamespaceIsolationPolicyUnloadScope unloadScope);
+
         NamespaceIsolationData build();
     }
 
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationPolicyUnloadScope.java
similarity index 53%
copy from 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java
copy to 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationPolicyUnloadScope.java
index aa48e69c145..2edeac45630 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationPolicyUnloadScope.java
@@ -18,34 +18,20 @@
  */
 package org.apache.pulsar.common.policies.data;
 
-import java.util.List;
-import org.apache.pulsar.client.admin.utils.ReflectionUtils;
-
-public interface NamespaceIsolationData {
-
-    List<String> getNamespaces();
-
-    List<String> getPrimary();
-
-    List<String> getSecondary();
-
-    AutoFailoverPolicyData getAutoFailoverPolicy();
-
-    void validate();
-
-    interface Builder {
-        Builder namespaces(List<String> namespaces);
-
-        Builder primary(List<String> primary);
-
-        Builder secondary(List<String> secondary);
-
-        Builder autoFailoverPolicy(AutoFailoverPolicyData 
autoFailoverPolicyData);
-
-        NamespaceIsolationData build();
-    }
-
-    static Builder builder() {
-        return 
ReflectionUtils.newBuilder("org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl");
+/**
+ * The type of unload to perform while setting the isolation policy.
+ */
+public enum NamespaceIsolationPolicyUnloadScope {
+    all_matching, // unloads all matching namespaces as per new regex
+    none,         // unloads no namespaces
+    changed;      // unloads only the namespaces which are newly added or 
removed from the regex list
+
+    public static NamespaceIsolationPolicyUnloadScope fromString(String 
unloadScopeString) {
+        for (NamespaceIsolationPolicyUnloadScope unloadScope : 
NamespaceIsolationPolicyUnloadScope.values()) {
+            if (unloadScope.toString().equalsIgnoreCase(unloadScopeString)) {
+                return unloadScope;
+            }
+        }
+        return null;
     }
 }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java
index e9896decd8c..0f5f6b211a5 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java
@@ -32,6 +32,7 @@ import 
org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
+import 
org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope;
 import picocli.CommandLine.Command;
 import picocli.CommandLine.Option;
 import picocli.CommandLine.Parameters;
@@ -73,10 +74,19 @@ public class CmdNamespaceIsolationPolicy extends CmdBase {
                 required = true, split = ",")
         private Map<String, String> autoFailoverPolicyParams;
 
+        @Option(names = "--unload-scope", description = "configure the type of 
unload to do -"
+                + " ['all_matching', 'none', 'changed'] namespaces. By 
default, all namespaces matching the namespaces"
+                + " regex will be unloaded and placed again. You can choose to 
not unload any namespace while setting"
+                + " this new policy by choosing `none` or choose to unload 
only the namespaces whose placement will"
+                + " actually change. If you chose 'none', you will need to 
manually unload the namespaces for them to"
+                + " be placed correctly, or wait till some namespaces get load 
balanced automatically based on load"
+                + " shedding configurations.")
+        private NamespaceIsolationPolicyUnloadScope unloadScope;
+
         void run() throws PulsarAdminException {
             // validate and create the POJO
             NamespaceIsolationData namespaceIsolationData = 
createNamespaceIsolationData(namespaces, primary, secondary,
-                    autoFailoverPolicyTypeName, autoFailoverPolicyParams);
+                    autoFailoverPolicyTypeName, autoFailoverPolicyParams, 
unloadScope);
 
             getAdmin().clusters().createNamespaceIsolationPolicy(clusterName, 
policyName, namespaceIsolationData);
         }
@@ -167,7 +177,8 @@ public class CmdNamespaceIsolationPolicy extends CmdBase {
                                                                 List<String> 
primary,
                                                                 List<String> 
secondary,
                                                                 String 
autoFailoverPolicyTypeName,
-                                                                Map<String, 
String> autoFailoverPolicyParams) {
+                                                                Map<String, 
String> autoFailoverPolicyParams,
+                                                                
NamespaceIsolationPolicyUnloadScope unload) {
 
         // validate
         namespaces = validateList(namespaces);
@@ -234,6 +245,8 @@ public class CmdNamespaceIsolationPolicy extends CmdBase {
             throw new ParameterException("Unknown auto failover policy type 
specified : " + autoFailoverPolicyTypeName);
         }
 
+        nsIsolationDataBuilder.unloadScope(unload);
+
         return nsIsolationDataBuilder.build();
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java
index bd28d30d4ce..52480d91eef 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.SortedSet;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.BrokerStatus;
+import 
org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope;
 
 /**
  * Namespace isolation policy.
@@ -43,6 +44,11 @@ public interface NamespaceIsolationPolicy {
      */
     List<String> getSecondaryBrokers();
 
+    /**
+     * Get the unload scope for the policy set call.
+     */
+    NamespaceIsolationPolicyUnloadScope getUnloadScope();
+
     /**
      * Get the list of primary brokers for the namespace according to the 
policy.
      *
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java
index bdb51f63f89..1e72f0e50ee 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java
@@ -75,6 +75,15 @@ public class NamespaceIsolationDataImpl implements 
NamespaceIsolationData {
     @JsonProperty("auto_failover_policy")
     private AutoFailoverPolicyData autoFailoverPolicy;
 
+    @ApiModelProperty(
+            name = "unload_scope",
+            value = "The type of unload to perform while applying the new 
isolation policy.",
+            example = "'all_matching' (default) for unloading all matching 
namespaces. 'none' for not unloading "
+                    + "any namespace. 'changed' for unloading only the 
namespaces whose placement is actually changing"
+    )
+    @JsonProperty("unload_scope")
+    private NamespaceIsolationPolicyUnloadScope unloadScope;
+
     public static NamespaceIsolationDataImplBuilder builder() {
         return new NamespaceIsolationDataImplBuilder();
     }
@@ -106,6 +115,7 @@ public class NamespaceIsolationDataImpl implements 
NamespaceIsolationData {
         private List<String> primary = new ArrayList<>();
         private List<String> secondary = new ArrayList<>();
         private AutoFailoverPolicyData autoFailoverPolicy;
+        private NamespaceIsolationPolicyUnloadScope unloadScope;
 
         public NamespaceIsolationDataImplBuilder namespaces(List<String> 
namespaces) {
             this.namespaces = namespaces;
@@ -127,8 +137,13 @@ public class NamespaceIsolationDataImpl implements 
NamespaceIsolationData {
             return this;
         }
 
+        public NamespaceIsolationDataImplBuilder 
unloadScope(NamespaceIsolationPolicyUnloadScope unloadScope) {
+            this.unloadScope = unloadScope;
+            return this;
+        }
+
         public NamespaceIsolationDataImpl build() {
-            return new NamespaceIsolationDataImpl(namespaces, primary, 
secondary, autoFailoverPolicy);
+            return new NamespaceIsolationDataImpl(namespaces, primary, 
secondary, autoFailoverPolicy, unloadScope);
         }
     }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java
index af3663869fa..440282f29cb 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.common.policies.AutoFailoverPolicy;
 import org.apache.pulsar.common.policies.NamespaceIsolationPolicy;
 import org.apache.pulsar.common.policies.data.BrokerStatus;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
+import 
org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope;
 
 /**
  * Implementation of the namespace isolation policy.
@@ -39,6 +40,7 @@ public class NamespaceIsolationPolicyImpl implements 
NamespaceIsolationPolicy {
     private List<String> primary;
     private List<String> secondary;
     private AutoFailoverPolicy autoFailoverPolicy;
+    private NamespaceIsolationPolicyUnloadScope unloadScope;
 
     private boolean matchNamespaces(String fqnn) {
         for (String nsRegex : namespaces) {
@@ -64,6 +66,7 @@ public class NamespaceIsolationPolicyImpl implements 
NamespaceIsolationPolicy {
         this.primary = policyData.getPrimary();
         this.secondary = policyData.getSecondary();
         this.autoFailoverPolicy = 
AutoFailoverPolicyFactory.create(policyData.getAutoFailoverPolicy());
+        this.unloadScope = policyData.getUnloadScope();
     }
 
     @Override
@@ -76,6 +79,11 @@ public class NamespaceIsolationPolicyImpl implements 
NamespaceIsolationPolicy {
         return this.secondary;
     }
 
+    @Override
+    public NamespaceIsolationPolicyUnloadScope getUnloadScope() {
+        return this.unloadScope;
+    }
+
     @Override
     public List<URL> findPrimaryBrokers(List<URL> availableBrokers, 
NamespaceName namespace) {
         if (!this.matchNamespaces(namespace.toString())) {

Reply via email to