This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5aa5e7d9fbe [fix] [broker] fix replicated namespaces filter in
filterAndUnloadMatchedNamespaceAsync (#23100)
5aa5e7d9fbe is described below
commit 5aa5e7d9fbe55d7e22625d67e618aa4934c78ecb
Author: Anshul Singh <[email protected]>
AuthorDate: Wed Jul 31 11:39:41 2024 +0530
[fix] [broker] fix replicated namespaces filter in
filterAndUnloadMatchedNamespaceAsync (#23100)
Co-authored-by: Lari Hotari <[email protected]>
---
.../pulsar/broker/admin/impl/ClustersBase.java | 77 ++++++++------
...AdminApiNamespaceIsolationMultiBrokersTest.java | 114 +++++++++++++++++++++
2 files changed, 158 insertions(+), 33 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 6eb324a63f3..4fe8a01e679 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -33,8 +33,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
@@ -723,8 +723,8 @@ public class ClustersBase extends AdminResource {
).thenCompose(nsIsolationPolicies -> {
nsIsolationPolicies.setPolicy(policyName, policyData);
return namespaceIsolationPolicies()
- .setIsolationDataAsync(cluster, old ->
nsIsolationPolicies.getPolicies());
- }).thenCompose(__ ->
filterAndUnloadMatchedNamespaceAsync(policyData))
+ .setIsolationDataAsync(cluster, old ->
nsIsolationPolicies.getPolicies());
+ }).thenCompose(__ ->
filterAndUnloadMatchedNamespaceAsync(cluster, policyData))
.thenAccept(__ -> {
log.info("[{}] Successful to update
clusters/{}/namespaceIsolationPolicies/{}.",
clientAppId(), cluster, policyName);
@@ -758,42 +758,53 @@ public class ClustersBase extends AdminResource {
/**
* Get matched namespaces; call unload for each namespaces.
*/
- private CompletableFuture<Void>
filterAndUnloadMatchedNamespaceAsync(NamespaceIsolationDataImpl policyData) {
+ private CompletableFuture<Void>
filterAndUnloadMatchedNamespaceAsync(String cluster,
+
NamespaceIsolationDataImpl policyData) {
PulsarAdmin adminClient;
try {
adminClient = pulsar().getAdminClient();
} catch (PulsarServerException e) {
return FutureUtil.failedFuture(e);
}
- return adminClient.tenants().getTenantsAsync()
- .thenCompose(tenants -> {
- Stream<CompletableFuture<List<String>>>
completableFutureStream = tenants.stream()
- .map(tenant ->
adminClient.namespaces().getNamespacesAsync(tenant));
- return FutureUtil.waitForAll(completableFutureStream)
- .thenApply(namespaces -> {
- // if namespace match any policy regex, add it
to ns list to be unload.
- return namespaces.stream()
- .filter(namespaceName ->
-
policyData.getNamespaces().stream().anyMatch(namespaceName::matches))
- .collect(Collectors.toList());
- });
- }).thenCompose(shouldUnloadNamespaces -> {
- if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) {
- return CompletableFuture.completedFuture(null);
- }
- List<CompletableFuture<Void>> futures =
shouldUnloadNamespaces.stream()
- .map(namespaceName ->
adminClient.namespaces().unloadAsync(namespaceName))
- .collect(Collectors.toList());
- return FutureUtil.waitForAll(futures)
- .thenAccept(__ -> {
- try {
- // write load info to load manager to make
the load happens fast
-
pulsar().getLoadManager().get().writeLoadReportOnZookeeper(true);
- } catch (Exception e) {
- log.warn("[{}] Failed to
writeLoadReportOnZookeeper.", clientAppId(), e);
- }
- });
- });
+ // compile regex patterns once
+ List<Pattern> namespacePatterns =
policyData.getNamespaces().stream().map(Pattern::compile).toList();
+ return adminClient.tenants().getTenantsAsync().thenCompose(tenants -> {
+ List<CompletableFuture<List<String>>>
filteredNamespacesForEachTenant = tenants.stream()
+ .map(tenant ->
adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> {
+ List<CompletableFuture<String>>
namespaceNamesInCluster = namespaces.stream()
+ .filter(namespaceName ->
namespacePatterns.stream()
+ .anyMatch(pattern ->
pattern.matcher(namespaceName).matches()))
+ .map(namespaceName ->
adminClient.namespaces().getPoliciesAsync(namespaceName)
+ .thenApply(policies ->
policies.replication_clusters.contains(cluster)
+ ? namespaceName : null))
+ .collect(Collectors.toList());
+ return
FutureUtil.waitForAll(namespaceNamesInCluster).thenApply(
+ __ -> namespaceNamesInCluster.stream()
+ .map(CompletableFuture::join)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList()));
+ })).toList();
+ return FutureUtil.waitForAll(filteredNamespacesForEachTenant)
+ .thenApply(__ -> filteredNamespacesForEachTenant.stream()
+ .map(CompletableFuture::join)
+ .flatMap(List::stream)
+ .collect(Collectors.toList()));
+ }).thenCompose(shouldUnloadNamespaces -> {
+ if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) {
+ return CompletableFuture.completedFuture(null);
+ }
+ List<CompletableFuture<Void>> futures =
shouldUnloadNamespaces.stream()
+ .map(namespaceName ->
adminClient.namespaces().unloadAsync(namespaceName))
+ .collect(Collectors.toList());
+ return FutureUtil.waitForAll(futures).thenAccept(__ -> {
+ try {
+ // write load info to load manager to make the load
happens fast
+
pulsar().getLoadManager().get().writeLoadReportOnZookeeper(true);
+ } catch (Exception e) {
+ log.warn("[{}] Failed to writeLoadReportOnZookeeper.",
clientAppId(), e);
+ }
+ });
+ });
}
@DELETE
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java
new file mode 100644
index 00000000000..da7d95d677a
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertEquals;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.MultiBrokerBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
+import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Test multi-broker admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiNamespaceIsolationMultiBrokersTest extends
MultiBrokerBaseTest {
+
+ PulsarAdmin localAdmin;
+ PulsarAdmin remoteAdmin;
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ this.conf.setManagedLedgerMaxEntriesPerLedger(10);
+ }
+
+ @Override
+ protected void onCleanup() {
+ super.onCleanup();
+ }
+
+ @BeforeClass
+ public void setupClusters() throws Exception {
+ localAdmin = getAllAdmins().get(1);
+ remoteAdmin = getAllAdmins().get(2);
+ String localBrokerWebService =
additionalPulsarTestContexts.get(0).getPulsarService().getWebServiceAddress();
+ String remoteBrokerWebService =
additionalPulsarTestContexts.get(1).getPulsarService().getWebServiceAddress();
+ localAdmin.clusters()
+ .createCluster("cluster-1",
ClusterData.builder().serviceUrl(localBrokerWebService).build());
+ remoteAdmin.clusters()
+ .createCluster("cluster-2",
ClusterData.builder().serviceUrl(remoteBrokerWebService).build());
+ TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of(""),
Set.of("test", "cluster-1", "cluster-2"));
+ localAdmin.tenants().createTenant("prop-ig", tenantInfo);
+ localAdmin.namespaces().createNamespace("prop-ig/ns1", Set.of("test",
"cluster-1"));
+ }
+
+ public void testNamespaceIsolationPolicyForReplNS() throws Exception {
+
+ // Verify that namespace is not present in cluster-2.
+ Set<String> replicationClusters =
localAdmin.namespaces().getPolicies("prop-ig/ns1").replication_clusters;
+ Assert.assertFalse(replicationClusters.contains("cluster-2"));
+
+ // setup ns-isolation-policy in both the clusters.
+ String policyName1 = "policy-1";
+ Map<String, String> parameters1 = new HashMap<>();
+ parameters1.put("min_limit", "1");
+ parameters1.put("usage_threshold", "100");
+ List<String> nsRegexList = new
ArrayList<>(Arrays.asList("prop-ig/.*"));
+
+ NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder()
+ // "prop-ig/ns1" is present in test cluster, policy set on
test2 should work
+ .namespaces(nsRegexList)
+ .primary(Collections.singletonList(".*"))
+ .secondary(Collections.singletonList(""))
+ .autoFailoverPolicy(AutoFailoverPolicyData.builder()
+ .policyType(AutoFailoverPolicyType.min_available)
+ .parameters(parameters1)
+ .build())
+ .build();
+
+ localAdmin.clusters().createNamespaceIsolationPolicy("test",
policyName1, nsPolicyData1);
+ // verify policy is present in local cluster
+ Map<String, ? extends NamespaceIsolationData> policiesMap =
+ localAdmin.clusters().getNamespaceIsolationPolicies("test");
+ assertEquals(policiesMap.get(policyName1), nsPolicyData1);
+
+ remoteAdmin.clusters().createNamespaceIsolationPolicy("cluster-2",
policyName1, nsPolicyData1);
+ // verify policy is present in remote cluster
+ policiesMap =
remoteAdmin.clusters().getNamespaceIsolationPolicies("cluster-2");
+ assertEquals(policiesMap.get(policyName1), nsPolicyData1);
+
+ }
+
+}