This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6e5fa84e402273e576ac8763d48c131afd5ea9c0 Author: Anshul Singh <[email protected]> AuthorDate: Wed Jul 31 11:39:41 2024 +0530 [fix] [broker] fix replicated namespaces filter in filterAndUnloadMatchedNamespaceAsync (#23100) Co-authored-by: Lari Hotari <[email protected]> (cherry picked from commit 5aa5e7d9fbe55d7e22625d67e618aa4934c78ecb) --- .../pulsar/broker/admin/impl/ClustersBase.java | 77 ++++++++------ ...AdminApiNamespaceIsolationMultiBrokersTest.java | 114 +++++++++++++++++++++ 2 files changed, 158 insertions(+), 33 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 6eb324a63f3..4fe8a01e679 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -33,8 +33,8 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.regex.Pattern; import java.util.stream.Collectors; -import java.util.stream.Stream; import javax.ws.rs.DELETE; import javax.ws.rs.GET; import javax.ws.rs.POST; @@ -723,8 +723,8 @@ public class ClustersBase extends AdminResource { ).thenCompose(nsIsolationPolicies -> { nsIsolationPolicies.setPolicy(policyName, policyData); return namespaceIsolationPolicies() - .setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies()); - }).thenCompose(__ -> filterAndUnloadMatchedNamespaceAsync(policyData)) + .setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies()); + }).thenCompose(__ -> filterAndUnloadMatchedNamespaceAsync(cluster, policyData)) .thenAccept(__ -> { log.info("[{}] Successful to update clusters/{}/namespaceIsolationPolicies/{}.", clientAppId(), cluster, policyName); @@ -758,42 +758,53 @@ public class ClustersBase extends AdminResource { /** * Get matched namespaces; call unload for each namespaces. */ - private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(NamespaceIsolationDataImpl policyData) { + private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(String cluster, + NamespaceIsolationDataImpl policyData) { PulsarAdmin adminClient; try { adminClient = pulsar().getAdminClient(); } catch (PulsarServerException e) { return FutureUtil.failedFuture(e); } - return adminClient.tenants().getTenantsAsync() - .thenCompose(tenants -> { - Stream<CompletableFuture<List<String>>> completableFutureStream = tenants.stream() - .map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant)); - return FutureUtil.waitForAll(completableFutureStream) - .thenApply(namespaces -> { - // if namespace match any policy regex, add it to ns list to be unload. - return namespaces.stream() - .filter(namespaceName -> - policyData.getNamespaces().stream().anyMatch(namespaceName::matches)) - .collect(Collectors.toList()); - }); - }).thenCompose(shouldUnloadNamespaces -> { - if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) { - return CompletableFuture.completedFuture(null); - } - List<CompletableFuture<Void>> futures = shouldUnloadNamespaces.stream() - .map(namespaceName -> adminClient.namespaces().unloadAsync(namespaceName)) - .collect(Collectors.toList()); - return FutureUtil.waitForAll(futures) - .thenAccept(__ -> { - try { - // write load info to load manager to make the load happens fast - pulsar().getLoadManager().get().writeLoadReportOnZookeeper(true); - } catch (Exception e) { - log.warn("[{}] Failed to writeLoadReportOnZookeeper.", clientAppId(), e); - } - }); - }); + // compile regex patterns once + List<Pattern> namespacePatterns = policyData.getNamespaces().stream().map(Pattern::compile).toList(); + return adminClient.tenants().getTenantsAsync().thenCompose(tenants -> { + List<CompletableFuture<List<String>>> filteredNamespacesForEachTenant = tenants.stream() + .map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> { + List<CompletableFuture<String>> namespaceNamesInCluster = namespaces.stream() + .filter(namespaceName -> namespacePatterns.stream() + .anyMatch(pattern -> pattern.matcher(namespaceName).matches())) + .map(namespaceName -> adminClient.namespaces().getPoliciesAsync(namespaceName) + .thenApply(policies -> policies.replication_clusters.contains(cluster) + ? namespaceName : null)) + .collect(Collectors.toList()); + return FutureUtil.waitForAll(namespaceNamesInCluster).thenApply( + __ -> namespaceNamesInCluster.stream() + .map(CompletableFuture::join) + .filter(Objects::nonNull) + .collect(Collectors.toList())); + })).toList(); + return FutureUtil.waitForAll(filteredNamespacesForEachTenant) + .thenApply(__ -> filteredNamespacesForEachTenant.stream() + .map(CompletableFuture::join) + .flatMap(List::stream) + .collect(Collectors.toList())); + }).thenCompose(shouldUnloadNamespaces -> { + if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) { + return CompletableFuture.completedFuture(null); + } + List<CompletableFuture<Void>> futures = shouldUnloadNamespaces.stream() + .map(namespaceName -> adminClient.namespaces().unloadAsync(namespaceName)) + .collect(Collectors.toList()); + return FutureUtil.waitForAll(futures).thenAccept(__ -> { + try { + // write load info to load manager to make the load happens fast + pulsar().getLoadManager().get().writeLoadReportOnZookeeper(true); + } catch (Exception e) { + log.warn("[{}] Failed to writeLoadReportOnZookeeper.", clientAppId(), e); + } + }); + }); } @DELETE diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java new file mode 100644 index 00000000000..da7d95d677a --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import static org.testng.Assert.assertEquals; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.MultiBrokerBaseTest; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData; +import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.NamespaceIsolationData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Test multi-broker admin api. + */ +@Slf4j +@Test(groups = "broker-admin") +public class AdminApiNamespaceIsolationMultiBrokersTest extends MultiBrokerBaseTest { + + PulsarAdmin localAdmin; + PulsarAdmin remoteAdmin; + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + this.conf.setManagedLedgerMaxEntriesPerLedger(10); + } + + @Override + protected void onCleanup() { + super.onCleanup(); + } + + @BeforeClass + public void setupClusters() throws Exception { + localAdmin = getAllAdmins().get(1); + remoteAdmin = getAllAdmins().get(2); + String localBrokerWebService = additionalPulsarTestContexts.get(0).getPulsarService().getWebServiceAddress(); + String remoteBrokerWebService = additionalPulsarTestContexts.get(1).getPulsarService().getWebServiceAddress(); + localAdmin.clusters() + .createCluster("cluster-1", ClusterData.builder().serviceUrl(localBrokerWebService).build()); + remoteAdmin.clusters() + .createCluster("cluster-2", ClusterData.builder().serviceUrl(remoteBrokerWebService).build()); + TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of(""), Set.of("test", "cluster-1", "cluster-2")); + localAdmin.tenants().createTenant("prop-ig", tenantInfo); + localAdmin.namespaces().createNamespace("prop-ig/ns1", Set.of("test", "cluster-1")); + } + + public void testNamespaceIsolationPolicyForReplNS() throws Exception { + + // Verify that namespace is not present in cluster-2. + Set<String> replicationClusters = localAdmin.namespaces().getPolicies("prop-ig/ns1").replication_clusters; + Assert.assertFalse(replicationClusters.contains("cluster-2")); + + // setup ns-isolation-policy in both the clusters. + String policyName1 = "policy-1"; + Map<String, String> parameters1 = new HashMap<>(); + parameters1.put("min_limit", "1"); + parameters1.put("usage_threshold", "100"); + List<String> nsRegexList = new ArrayList<>(Arrays.asList("prop-ig/.*")); + + NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder() + // "prop-ig/ns1" is present in test cluster, policy set on test2 should work + .namespaces(nsRegexList) + .primary(Collections.singletonList(".*")) + .secondary(Collections.singletonList("")) + .autoFailoverPolicy(AutoFailoverPolicyData.builder() + .policyType(AutoFailoverPolicyType.min_available) + .parameters(parameters1) + .build()) + .build(); + + localAdmin.clusters().createNamespaceIsolationPolicy("test", policyName1, nsPolicyData1); + // verify policy is present in local cluster + Map<String, ? extends NamespaceIsolationData> policiesMap = + localAdmin.clusters().getNamespaceIsolationPolicies("test"); + assertEquals(policiesMap.get(policyName1), nsPolicyData1); + + remoteAdmin.clusters().createNamespaceIsolationPolicy("cluster-2", policyName1, nsPolicyData1); + // verify policy is present in remote cluster + policiesMap = remoteAdmin.clusters().getNamespaceIsolationPolicies("cluster-2"); + assertEquals(policiesMap.get(policyName1), nsPolicyData1); + + } + +}
