This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f002590450 [improve][admin] PIP-369 Change default value of 
`unload-scope` in `ns-isolation-policy set` (#23253)
4f002590450 is described below

commit 4f002590450f756a26375fb551136bb11dd72666
Author: Girish Sharma <grs...@users.noreply.github.com>
AuthorDate: Wed Sep 18 14:14:49 2024 +0530

    [improve][admin] PIP-369 Change default value of `unload-scope` in 
`ns-isolation-policy set` (#23253)
    
    Co-authored-by: Zixuan Liu <node...@gmail.com>
---
 .../pulsar/broker/admin/impl/ClustersBase.java     | 65 +++++++++++-----------
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 38 +++++++++++--
 .../admin/cli/CmdNamespaceIsolationPolicy.java     | 12 ++--
 .../policies/data/NamespaceIsolationDataImpl.java  |  4 +-
 4 files changed, 72 insertions(+), 47 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 132c99ce16b..b261033ca52 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -778,15 +778,16 @@ public class ClustersBase extends AdminResource {
         } catch (PulsarServerException e) {
             return FutureUtil.failedFuture(e);
         }
-        // compile regex patterns once
-        List<Pattern> namespacePatterns = 
policyData.getNamespaces().stream().map(Pattern::compile).toList();
-        // TODO for 4.x, we should include both old and new namespace regex 
pattern for unload `all_matching` option
+        Set<String> combinedNamespaces = new 
HashSet<>(policyData.getNamespaces());
+        final List<String> oldNamespaces = new ArrayList<>();
+        if (oldPolicy != null) {
+            oldNamespaces.addAll(oldPolicy.getNamespaces());
+            combinedNamespaces.addAll(oldNamespaces);
+        }
         return adminClient.tenants().getTenantsAsync().thenCompose(tenants -> {
             List<CompletableFuture<List<String>>> 
filteredNamespacesForEachTenant = tenants.stream()
                     .map(tenant -> 
adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> {
                         List<CompletableFuture<String>> 
namespaceNamesInCluster = namespaces.stream()
-                                .filter(namespaceName -> 
namespacePatterns.stream()
-                                        .anyMatch(pattern -> 
pattern.matcher(namespaceName).matches()))
                                 .map(namespaceName -> 
adminClient.namespaces().getPoliciesAsync(namespaceName)
                                         .thenApply(policies -> 
policies.replication_clusters.contains(cluster)
                                                 ? namespaceName : null))
@@ -802,46 +803,44 @@ public class ClustersBase extends AdminResource {
                             .map(CompletableFuture::join)
                             .flatMap(List::stream)
                             .collect(Collectors.toList()));
-        }).thenCompose(shouldUnloadNamespaces -> {
-            if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) {
+        }).thenCompose(clusterLocalNamespaces -> {
+            if (CollectionUtils.isEmpty(clusterLocalNamespaces)) {
                 return CompletableFuture.completedFuture(null);
             }
             // If unload type is 'changed', we need to figure out a further 
subset of namespaces whose placement might
             // actually have been changed.
 
             log.debug("Old policy: {} ; new policy: {}", oldPolicy, 
policyData);
-            if (oldPolicy != null && 
NamespaceIsolationPolicyUnloadScope.changed.equals(policyData.getUnloadScope()))
 {
-                // We also compare that the previous primary broker list is 
same as current, in case all namespaces need
-                // to be placed again anyway.
-                if (CollectionUtils.isEqualCollection(oldPolicy.getPrimary(), 
policyData.getPrimary())) {
-                    // list is same, so we continue finding the changed 
namespaces.
 
-                    // We create a union regex list contains old + new regexes
-                    Set<String> combinedNamespaces = new 
HashSet<>(oldPolicy.getNamespaces());
-                    combinedNamespaces.addAll(policyData.getNamespaces());
-                    // We create a intersection of the old and new regexes. 
These won't need to be unloaded
-                    Set<String> commonNamespaces = new 
HashSet<>(oldPolicy.getNamespaces());
-                    commonNamespaces.retainAll(policyData.getNamespaces());
+            boolean unloadAllNamespaces = false;
+            // We also compare that the previous primary broker list is same 
as current, in case all namespaces need
+            // to be placed again anyway.
+            if 
(NamespaceIsolationPolicyUnloadScope.all_matching.equals(policyData.getUnloadScope())
+                    || (oldPolicy != null
+                    && 
!CollectionUtils.isEqualCollection(oldPolicy.getPrimary(), 
policyData.getPrimary()))) {
+                unloadAllNamespaces = true;
+            }
+            // list is same, so we continue finding the changed namespaces.
 
-                    log.debug("combined regexes: {}; common regexes:{}", 
combinedNamespaces, combinedNamespaces);
+            // We create a intersection of the old and new regexes. These 
won't need to be unloaded.
+            Set<String> commonNamespaces = new 
HashSet<>(policyData.getNamespaces());
+            commonNamespaces.retainAll(oldNamespaces);
 
-                    // Find the changed regexes (new - new ∩ old). TODO for 
4.x, make this (new U old - new ∩ old)
-                    combinedNamespaces.removeAll(commonNamespaces);
+            log.debug("combined regexes: {}; common regexes:{}", 
combinedNamespaces, commonNamespaces);
 
-                    log.debug("changed regexes: {}", commonNamespaces);
+            if (!unloadAllNamespaces) {
+                // Find the changed regexes ((new U old) - (new ∩ old)).
+                combinedNamespaces.removeAll(commonNamespaces);
+                log.debug("changed regexes: {}", commonNamespaces);
+            }
 
-                    // Now we further filter the filtered namespaces based on 
this combinedNamespaces set
-                    shouldUnloadNamespaces = shouldUnloadNamespaces.stream()
-                            .filter(name -> combinedNamespaces.stream()
-                                    .map(Pattern::compile)
-                                    .anyMatch(pattern -> 
pattern.matcher(name).matches())
-                            ).toList();
+            // Now we further filter the filtered namespaces based on this 
combinedNamespaces set
+            List<Pattern> namespacePatterns = 
combinedNamespaces.stream().map(Pattern::compile).toList();
+            clusterLocalNamespaces = clusterLocalNamespaces.stream()
+                    .filter(name -> 
namespacePatterns.stream().anyMatch(pattern -> pattern.matcher(name).matches()))
+                    .toList();
 
-                }
-            }
-            // unload type is either null or not in (changed, none), so we 
proceed to unload all namespaces
-            // TODO - default in 4.x should become `changed`
-            List<CompletableFuture<Void>> futures = 
shouldUnloadNamespaces.stream()
+            List<CompletableFuture<Void>> futures = 
clusterLocalNamespaces.stream()
                     .map(namespaceName -> 
adminClient.namespaces().unloadAsync(namespaceName))
                     .collect(Collectors.toList());
             return FutureUtil.waitForAll(futures).thenAccept(__ -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 155994c814c..df9862691d6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -3488,7 +3488,7 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         parameters1.put("usage_threshold", "100");
         List<String> nsRegexList = new ArrayList<>(namespaces);
 
-        return NamespaceIsolationData.builder()
+        NamespaceIsolationData.Builder build = NamespaceIsolationData.builder()
                 // "prop-ig/ns1" is present in test cluster, policy set on 
test2 should work
                 .namespaces(nsRegexList)
                 .primary(primaryBrokers)
@@ -3496,9 +3496,11 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
                 .autoFailoverPolicy(AutoFailoverPolicyData.builder()
                         .policyType(AutoFailoverPolicyType.min_available)
                         .parameters(parameters1)
-                        .build())
-                .unloadScope(scope)
-                .build();
+                        .build());
+        if (scope != null) {
+            build.unloadScope(scope);
+        }
+        return build.build();
     }
 
     private boolean allTopicsUnloaded(List<String> topics) {
@@ -3624,18 +3626,42 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         testIsolationPolicyUnloadsNSWithScope(
                 topicType, "policy-all", nsPrefix, List.of("a1", "a2", "b1", 
"b2", "c1"),
                 all_matching, List.of(".*-unload-test-a.*"), List.of("b1", 
"b2", "c1"),
+                all_matching, List.of(".*-unload-test-c.*"), List.of("b1", 
"b2"),
+                Collections.singletonList(".*")
+        );
+    }
+
+    @Test(dataProvider = "topicType")
+    public void testIsolationPolicyUnloadsNSWithChangedScope1(final String 
topicType) throws Exception {
+        String nsPrefix1 = newUniqueName(defaultTenant + "/") + 
"-unload-test-";
+        // Addition case
+        testIsolationPolicyUnloadsNSWithScope(
+                topicType, "policy-changed1", nsPrefix1, List.of("a1", "a2", 
"b1", "b2", "c1"),
+                all_matching, List.of(".*-unload-test-a.*"), List.of("b1", 
"b2", "c1"),
+                changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), 
List.of("a1", "a2", "b1", "b2"),
+                Collections.singletonList(".*")
+        );
+    }
+
+    @Test(dataProvider = "topicType")
+    public void testIsolationPolicyUnloadsNSWithChangedScope2(final String 
topicType) throws Exception {
+        String nsPrefix2 = newUniqueName(defaultTenant + "/") + 
"-unload-test-";
+        // removal case
+        testIsolationPolicyUnloadsNSWithScope(
+                topicType, "policy-changed2", nsPrefix2, List.of("a1", "a2", 
"b1", "b2", "c1"),
                 all_matching, List.of(".*-unload-test-a.*", 
".*-unload-test-c.*"), List.of("b1", "b2"),
+                changed, List.of(".*-unload-test-c.*"), List.of("b1", "b2", 
"c1"),
                 Collections.singletonList(".*")
         );
     }
 
     @Test(dataProvider = "topicType")
-    public void testIsolationPolicyUnloadsNSWithChangedScope(final String 
topicType) throws Exception {
+    public void testIsolationPolicyUnloadsNSWithScopeMissing(final String 
topicType) throws Exception {
         String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
         testIsolationPolicyUnloadsNSWithScope(
                 topicType, "policy-changed", nsPrefix, List.of("a1", "a2", 
"b1", "b2", "c1"),
                 all_matching, List.of(".*-unload-test-a.*"), List.of("b1", 
"b2", "c1"),
-                changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), 
List.of("a1", "a2", "b1", "b2"),
+                null, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), 
List.of("a1", "a2", "b1", "b2"),
                 Collections.singletonList(".*")
         );
     }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java
index 0f5f6b211a5..ef36eb41713 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java
@@ -75,12 +75,12 @@ public class CmdNamespaceIsolationPolicy extends CmdBase {
         private Map<String, String> autoFailoverPolicyParams;
 
         @Option(names = "--unload-scope", description = "configure the type of 
unload to do -"
-                + " ['all_matching', 'none', 'changed'] namespaces. By 
default, all namespaces matching the namespaces"
-                + " regex will be unloaded and placed again. You can choose to 
not unload any namespace while setting"
-                + " this new policy by choosing `none` or choose to unload 
only the namespaces whose placement will"
-                + " actually change. If you chose 'none', you will need to 
manually unload the namespaces for them to"
-                + " be placed correctly, or wait till some namespaces get load 
balanced automatically based on load"
-                + " shedding configurations.")
+                + " ['all_matching', 'none', 'changed'] namespaces. By 
default, only namespaces whose placement will"
+                + " actually change would be unloaded and placed again. You 
can choose to not unload any namespace"
+                + " while setting this new policy by choosing `none` or choose 
to unload all namespaces matching"
+                + " old (if any) and new namespace regex. If you chose 'none', 
you will need to manually unload the"
+                + " namespaces for them to be placed correctly, or wait till 
some namespaces get load balanced"
+                + " automatically based on load shedding configurations.")
         private NamespaceIsolationPolicyUnloadScope unloadScope;
 
         void run() throws PulsarAdminException {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java
index 1e72f0e50ee..85be8090f52 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java
@@ -78,8 +78,8 @@ public class NamespaceIsolationDataImpl implements 
NamespaceIsolationData {
     @ApiModelProperty(
             name = "unload_scope",
             value = "The type of unload to perform while applying the new 
isolation policy.",
-            example = "'all_matching' (default) for unloading all matching 
namespaces. 'none' for not unloading "
-                    + "any namespace. 'changed' for unloading only the 
namespaces whose placement is actually changing"
+            example = "'changed' (default) for unloading only the namespaces 
whose placement is actually changing. "
+                    + "'all_matching' for unloading all matching namespaces. 
'none' for not unloading any namespaces."
     )
     @JsonProperty("unload_scope")
     private NamespaceIsolationPolicyUnloadScope unloadScope;

Reply via email to