This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 4f002590450 [improve][admin] PIP-369 Change default value of `unload-scope` in `ns-isolation-policy set` (#23253) 4f002590450 is described below commit 4f002590450f756a26375fb551136bb11dd72666 Author: Girish Sharma <grs...@users.noreply.github.com> AuthorDate: Wed Sep 18 14:14:49 2024 +0530 [improve][admin] PIP-369 Change default value of `unload-scope` in `ns-isolation-policy set` (#23253) Co-authored-by: Zixuan Liu <node...@gmail.com> --- .../pulsar/broker/admin/impl/ClustersBase.java | 65 +++++++++++----------- .../apache/pulsar/broker/admin/AdminApi2Test.java | 38 +++++++++++-- .../admin/cli/CmdNamespaceIsolationPolicy.java | 12 ++-- .../policies/data/NamespaceIsolationDataImpl.java | 4 +- 4 files changed, 72 insertions(+), 47 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 132c99ce16b..b261033ca52 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -778,15 +778,16 @@ public class ClustersBase extends AdminResource { } catch (PulsarServerException e) { return FutureUtil.failedFuture(e); } - // compile regex patterns once - List<Pattern> namespacePatterns = policyData.getNamespaces().stream().map(Pattern::compile).toList(); - // TODO for 4.x, we should include both old and new namespace regex pattern for unload `all_matching` option + Set<String> combinedNamespaces = new HashSet<>(policyData.getNamespaces()); + final List<String> oldNamespaces = new ArrayList<>(); + if (oldPolicy != null) { + oldNamespaces.addAll(oldPolicy.getNamespaces()); + combinedNamespaces.addAll(oldNamespaces); + } return adminClient.tenants().getTenantsAsync().thenCompose(tenants -> { List<CompletableFuture<List<String>>> filteredNamespacesForEachTenant = tenants.stream() .map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> { List<CompletableFuture<String>> namespaceNamesInCluster = namespaces.stream() - .filter(namespaceName -> namespacePatterns.stream() - .anyMatch(pattern -> pattern.matcher(namespaceName).matches())) .map(namespaceName -> adminClient.namespaces().getPoliciesAsync(namespaceName) .thenApply(policies -> policies.replication_clusters.contains(cluster) ? namespaceName : null)) @@ -802,46 +803,44 @@ public class ClustersBase extends AdminResource { .map(CompletableFuture::join) .flatMap(List::stream) .collect(Collectors.toList())); - }).thenCompose(shouldUnloadNamespaces -> { - if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) { + }).thenCompose(clusterLocalNamespaces -> { + if (CollectionUtils.isEmpty(clusterLocalNamespaces)) { return CompletableFuture.completedFuture(null); } // If unload type is 'changed', we need to figure out a further subset of namespaces whose placement might // actually have been changed. log.debug("Old policy: {} ; new policy: {}", oldPolicy, policyData); - if (oldPolicy != null && NamespaceIsolationPolicyUnloadScope.changed.equals(policyData.getUnloadScope())) { - // We also compare that the previous primary broker list is same as current, in case all namespaces need - // to be placed again anyway. - if (CollectionUtils.isEqualCollection(oldPolicy.getPrimary(), policyData.getPrimary())) { - // list is same, so we continue finding the changed namespaces. - // We create a union regex list contains old + new regexes - Set<String> combinedNamespaces = new HashSet<>(oldPolicy.getNamespaces()); - combinedNamespaces.addAll(policyData.getNamespaces()); - // We create a intersection of the old and new regexes. These won't need to be unloaded - Set<String> commonNamespaces = new HashSet<>(oldPolicy.getNamespaces()); - commonNamespaces.retainAll(policyData.getNamespaces()); + boolean unloadAllNamespaces = false; + // We also compare that the previous primary broker list is same as current, in case all namespaces need + // to be placed again anyway. + if (NamespaceIsolationPolicyUnloadScope.all_matching.equals(policyData.getUnloadScope()) + || (oldPolicy != null + && !CollectionUtils.isEqualCollection(oldPolicy.getPrimary(), policyData.getPrimary()))) { + unloadAllNamespaces = true; + } + // list is same, so we continue finding the changed namespaces. - log.debug("combined regexes: {}; common regexes:{}", combinedNamespaces, combinedNamespaces); + // We create a intersection of the old and new regexes. These won't need to be unloaded. + Set<String> commonNamespaces = new HashSet<>(policyData.getNamespaces()); + commonNamespaces.retainAll(oldNamespaces); - // Find the changed regexes (new - new ∩ old). TODO for 4.x, make this (new U old - new ∩ old) - combinedNamespaces.removeAll(commonNamespaces); + log.debug("combined regexes: {}; common regexes:{}", combinedNamespaces, commonNamespaces); - log.debug("changed regexes: {}", commonNamespaces); + if (!unloadAllNamespaces) { + // Find the changed regexes ((new U old) - (new ∩ old)). + combinedNamespaces.removeAll(commonNamespaces); + log.debug("changed regexes: {}", commonNamespaces); + } - // Now we further filter the filtered namespaces based on this combinedNamespaces set - shouldUnloadNamespaces = shouldUnloadNamespaces.stream() - .filter(name -> combinedNamespaces.stream() - .map(Pattern::compile) - .anyMatch(pattern -> pattern.matcher(name).matches()) - ).toList(); + // Now we further filter the filtered namespaces based on this combinedNamespaces set + List<Pattern> namespacePatterns = combinedNamespaces.stream().map(Pattern::compile).toList(); + clusterLocalNamespaces = clusterLocalNamespaces.stream() + .filter(name -> namespacePatterns.stream().anyMatch(pattern -> pattern.matcher(name).matches())) + .toList(); - } - } - // unload type is either null or not in (changed, none), so we proceed to unload all namespaces - // TODO - default in 4.x should become `changed` - List<CompletableFuture<Void>> futures = shouldUnloadNamespaces.stream() + List<CompletableFuture<Void>> futures = clusterLocalNamespaces.stream() .map(namespaceName -> adminClient.namespaces().unloadAsync(namespaceName)) .collect(Collectors.toList()); return FutureUtil.waitForAll(futures).thenAccept(__ -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 155994c814c..df9862691d6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -3488,7 +3488,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { parameters1.put("usage_threshold", "100"); List<String> nsRegexList = new ArrayList<>(namespaces); - return NamespaceIsolationData.builder() + NamespaceIsolationData.Builder build = NamespaceIsolationData.builder() // "prop-ig/ns1" is present in test cluster, policy set on test2 should work .namespaces(nsRegexList) .primary(primaryBrokers) @@ -3496,9 +3496,11 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { .autoFailoverPolicy(AutoFailoverPolicyData.builder() .policyType(AutoFailoverPolicyType.min_available) .parameters(parameters1) - .build()) - .unloadScope(scope) - .build(); + .build()); + if (scope != null) { + build.unloadScope(scope); + } + return build.build(); } private boolean allTopicsUnloaded(List<String> topics) { @@ -3624,18 +3626,42 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { testIsolationPolicyUnloadsNSWithScope( topicType, "policy-all", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"), all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"), + all_matching, List.of(".*-unload-test-c.*"), List.of("b1", "b2"), + Collections.singletonList(".*") + ); + } + + @Test(dataProvider = "topicType") + public void testIsolationPolicyUnloadsNSWithChangedScope1(final String topicType) throws Exception { + String nsPrefix1 = newUniqueName(defaultTenant + "/") + "-unload-test-"; + // Addition case + testIsolationPolicyUnloadsNSWithScope( + topicType, "policy-changed1", nsPrefix1, List.of("a1", "a2", "b1", "b2", "c1"), + all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"), + changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2"), + Collections.singletonList(".*") + ); + } + + @Test(dataProvider = "topicType") + public void testIsolationPolicyUnloadsNSWithChangedScope2(final String topicType) throws Exception { + String nsPrefix2 = newUniqueName(defaultTenant + "/") + "-unload-test-"; + // removal case + testIsolationPolicyUnloadsNSWithScope( + topicType, "policy-changed2", nsPrefix2, List.of("a1", "a2", "b1", "b2", "c1"), all_matching, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("b1", "b2"), + changed, List.of(".*-unload-test-c.*"), List.of("b1", "b2", "c1"), Collections.singletonList(".*") ); } @Test(dataProvider = "topicType") - public void testIsolationPolicyUnloadsNSWithChangedScope(final String topicType) throws Exception { + public void testIsolationPolicyUnloadsNSWithScopeMissing(final String topicType) throws Exception { String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-"; testIsolationPolicyUnloadsNSWithScope( topicType, "policy-changed", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"), all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"), - changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2"), + null, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2"), Collections.singletonList(".*") ); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java index 0f5f6b211a5..ef36eb41713 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java @@ -75,12 +75,12 @@ public class CmdNamespaceIsolationPolicy extends CmdBase { private Map<String, String> autoFailoverPolicyParams; @Option(names = "--unload-scope", description = "configure the type of unload to do -" - + " ['all_matching', 'none', 'changed'] namespaces. By default, all namespaces matching the namespaces" - + " regex will be unloaded and placed again. You can choose to not unload any namespace while setting" - + " this new policy by choosing `none` or choose to unload only the namespaces whose placement will" - + " actually change. If you chose 'none', you will need to manually unload the namespaces for them to" - + " be placed correctly, or wait till some namespaces get load balanced automatically based on load" - + " shedding configurations.") + + " ['all_matching', 'none', 'changed'] namespaces. By default, only namespaces whose placement will" + + " actually change would be unloaded and placed again. You can choose to not unload any namespace" + + " while setting this new policy by choosing `none` or choose to unload all namespaces matching" + + " old (if any) and new namespace regex. If you chose 'none', you will need to manually unload the" + + " namespaces for them to be placed correctly, or wait till some namespaces get load balanced" + + " automatically based on load shedding configurations.") private NamespaceIsolationPolicyUnloadScope unloadScope; void run() throws PulsarAdminException { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java index 1e72f0e50ee..85be8090f52 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java @@ -78,8 +78,8 @@ public class NamespaceIsolationDataImpl implements NamespaceIsolationData { @ApiModelProperty( name = "unload_scope", value = "The type of unload to perform while applying the new isolation policy.", - example = "'all_matching' (default) for unloading all matching namespaces. 'none' for not unloading " - + "any namespace. 'changed' for unloading only the namespaces whose placement is actually changing" + example = "'changed' (default) for unloading only the namespaces whose placement is actually changing. " + + "'all_matching' for unloading all matching namespaces. 'none' for not unloading any namespaces." ) @JsonProperty("unload_scope") private NamespaceIsolationPolicyUnloadScope unloadScope;