This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5aa5e7d9fbe [fix] [broker] fix replicated namespaces filter in 
filterAndUnloadMatchedNamespaceAsync (#23100)
5aa5e7d9fbe is described below

commit 5aa5e7d9fbe55d7e22625d67e618aa4934c78ecb
Author: Anshul Singh <[email protected]>
AuthorDate: Wed Jul 31 11:39:41 2024 +0530

    [fix] [broker] fix replicated namespaces filter in 
filterAndUnloadMatchedNamespaceAsync (#23100)
    
    Co-authored-by: Lari Hotari <[email protected]>
---
 .../pulsar/broker/admin/impl/ClustersBase.java     |  77 ++++++++------
 ...AdminApiNamespaceIsolationMultiBrokersTest.java | 114 +++++++++++++++++++++
 2 files changed, 158 insertions(+), 33 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 6eb324a63f3..4fe8a01e679 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -33,8 +33,8 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
@@ -723,8 +723,8 @@ public class ClustersBase extends AdminResource {
                 ).thenCompose(nsIsolationPolicies -> {
                     nsIsolationPolicies.setPolicy(policyName, policyData);
                     return namespaceIsolationPolicies()
-                                    .setIsolationDataAsync(cluster, old -> 
nsIsolationPolicies.getPolicies());
-                }).thenCompose(__ -> 
filterAndUnloadMatchedNamespaceAsync(policyData))
+                            .setIsolationDataAsync(cluster, old -> 
nsIsolationPolicies.getPolicies());
+                }).thenCompose(__ -> 
filterAndUnloadMatchedNamespaceAsync(cluster, policyData))
                 .thenAccept(__ -> {
                     log.info("[{}] Successful to update 
clusters/{}/namespaceIsolationPolicies/{}.",
                             clientAppId(), cluster, policyName);
@@ -758,42 +758,53 @@ public class ClustersBase extends AdminResource {
     /**
      * Get matched namespaces; call unload for each namespaces.
      */
-    private CompletableFuture<Void> 
filterAndUnloadMatchedNamespaceAsync(NamespaceIsolationDataImpl policyData) {
+    private CompletableFuture<Void> 
filterAndUnloadMatchedNamespaceAsync(String cluster,
+                                                                         
NamespaceIsolationDataImpl policyData) {
         PulsarAdmin adminClient;
         try {
             adminClient = pulsar().getAdminClient();
         } catch (PulsarServerException e) {
             return FutureUtil.failedFuture(e);
         }
-        return adminClient.tenants().getTenantsAsync()
-                .thenCompose(tenants -> {
-                    Stream<CompletableFuture<List<String>>> 
completableFutureStream = tenants.stream()
-                            .map(tenant -> 
adminClient.namespaces().getNamespacesAsync(tenant));
-                    return FutureUtil.waitForAll(completableFutureStream)
-                            .thenApply(namespaces -> {
-                                // if namespace match any policy regex, add it 
to ns list to be unload.
-                                return namespaces.stream()
-                                        .filter(namespaceName ->
-                                                
policyData.getNamespaces().stream().anyMatch(namespaceName::matches))
-                                        .collect(Collectors.toList());
-                            });
-                }).thenCompose(shouldUnloadNamespaces -> {
-                    if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) {
-                        return CompletableFuture.completedFuture(null);
-                    }
-                    List<CompletableFuture<Void>> futures = 
shouldUnloadNamespaces.stream()
-                            .map(namespaceName -> 
adminClient.namespaces().unloadAsync(namespaceName))
-                            .collect(Collectors.toList());
-                    return FutureUtil.waitForAll(futures)
-                            .thenAccept(__ -> {
-                                try {
-                                    // write load info to load manager to make 
the load happens fast
-                                    
pulsar().getLoadManager().get().writeLoadReportOnZookeeper(true);
-                                } catch (Exception e) {
-                                    log.warn("[{}] Failed to 
writeLoadReportOnZookeeper.", clientAppId(), e);
-                                }
-                            });
-                });
+        // compile regex patterns once
+        List<Pattern> namespacePatterns = 
policyData.getNamespaces().stream().map(Pattern::compile).toList();
+        return adminClient.tenants().getTenantsAsync().thenCompose(tenants -> {
+            List<CompletableFuture<List<String>>> 
filteredNamespacesForEachTenant = tenants.stream()
+                    .map(tenant -> 
adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> {
+                        List<CompletableFuture<String>> 
namespaceNamesInCluster = namespaces.stream()
+                                .filter(namespaceName -> 
namespacePatterns.stream()
+                                        .anyMatch(pattern -> 
pattern.matcher(namespaceName).matches()))
+                                .map(namespaceName -> 
adminClient.namespaces().getPoliciesAsync(namespaceName)
+                                        .thenApply(policies -> 
policies.replication_clusters.contains(cluster)
+                                                ? namespaceName : null))
+                                .collect(Collectors.toList());
+                        return 
FutureUtil.waitForAll(namespaceNamesInCluster).thenApply(
+                                __ -> namespaceNamesInCluster.stream()
+                                        .map(CompletableFuture::join)
+                                        .filter(Objects::nonNull)
+                                        .collect(Collectors.toList()));
+                    })).toList();
+            return FutureUtil.waitForAll(filteredNamespacesForEachTenant)
+                    .thenApply(__ -> filteredNamespacesForEachTenant.stream()
+                            .map(CompletableFuture::join)
+                            .flatMap(List::stream)
+                            .collect(Collectors.toList()));
+        }).thenCompose(shouldUnloadNamespaces -> {
+            if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) {
+                return CompletableFuture.completedFuture(null);
+            }
+            List<CompletableFuture<Void>> futures = 
shouldUnloadNamespaces.stream()
+                    .map(namespaceName -> 
adminClient.namespaces().unloadAsync(namespaceName))
+                    .collect(Collectors.toList());
+            return FutureUtil.waitForAll(futures).thenAccept(__ -> {
+                try {
+                    // write load info to load manager to make the load 
happens fast
+                    
pulsar().getLoadManager().get().writeLoadReportOnZookeeper(true);
+                } catch (Exception e) {
+                    log.warn("[{}] Failed to writeLoadReportOnZookeeper.", 
clientAppId(), e);
+                }
+            });
+        });
     }
 
     @DELETE
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java
new file mode 100644
index 00000000000..da7d95d677a
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertEquals;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.MultiBrokerBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
+import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Test multi-broker admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiNamespaceIsolationMultiBrokersTest extends 
MultiBrokerBaseTest {
+
+    PulsarAdmin localAdmin;
+    PulsarAdmin remoteAdmin;
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        this.conf.setManagedLedgerMaxEntriesPerLedger(10);
+    }
+
+    @Override
+    protected void onCleanup() {
+        super.onCleanup();
+    }
+
+    @BeforeClass
+    public void setupClusters() throws Exception {
+        localAdmin = getAllAdmins().get(1);
+        remoteAdmin = getAllAdmins().get(2);
+        String localBrokerWebService = 
additionalPulsarTestContexts.get(0).getPulsarService().getWebServiceAddress();
+        String remoteBrokerWebService = 
additionalPulsarTestContexts.get(1).getPulsarService().getWebServiceAddress();
+        localAdmin.clusters()
+                .createCluster("cluster-1", 
ClusterData.builder().serviceUrl(localBrokerWebService).build());
+        remoteAdmin.clusters()
+                .createCluster("cluster-2", 
ClusterData.builder().serviceUrl(remoteBrokerWebService).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of(""), 
Set.of("test", "cluster-1", "cluster-2"));
+        localAdmin.tenants().createTenant("prop-ig", tenantInfo);
+        localAdmin.namespaces().createNamespace("prop-ig/ns1", Set.of("test", 
"cluster-1"));
+    }
+
+    public void testNamespaceIsolationPolicyForReplNS() throws Exception {
+
+        // Verify that namespace is not present in cluster-2.
+        Set<String> replicationClusters = 
localAdmin.namespaces().getPolicies("prop-ig/ns1").replication_clusters;
+        Assert.assertFalse(replicationClusters.contains("cluster-2"));
+
+        // setup ns-isolation-policy in both the clusters.
+        String policyName1 = "policy-1";
+        Map<String, String> parameters1 = new HashMap<>();
+        parameters1.put("min_limit", "1");
+        parameters1.put("usage_threshold", "100");
+        List<String> nsRegexList = new 
ArrayList<>(Arrays.asList("prop-ig/.*"));
+
+        NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder()
+                // "prop-ig/ns1" is present in test cluster, policy set on 
test2 should work
+                .namespaces(nsRegexList)
+                .primary(Collections.singletonList(".*"))
+                .secondary(Collections.singletonList(""))
+                .autoFailoverPolicy(AutoFailoverPolicyData.builder()
+                        .policyType(AutoFailoverPolicyType.min_available)
+                        .parameters(parameters1)
+                        .build())
+                .build();
+
+        localAdmin.clusters().createNamespaceIsolationPolicy("test", 
policyName1, nsPolicyData1);
+        // verify policy is present in local cluster
+        Map<String, ? extends NamespaceIsolationData> policiesMap =
+                localAdmin.clusters().getNamespaceIsolationPolicies("test");
+        assertEquals(policiesMap.get(policyName1), nsPolicyData1);
+
+        remoteAdmin.clusters().createNamespaceIsolationPolicy("cluster-2", 
policyName1, nsPolicyData1);
+        // verify policy is present in remote cluster
+        policiesMap = 
remoteAdmin.clusters().getNamespaceIsolationPolicies("cluster-2");
+        assertEquals(policiesMap.get(policyName1), nsPolicyData1);
+
+    }
+
+}

Reply via email to