This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b8e848bf24345d9eeb5e58bc89af685e7b38b12c
Author: Kai Wang <[email protected]>
AuthorDate: Tue Nov 12 16:59:47 2024 +0800

    [improve][broker] Support cleanup `replication cluster` and `allowed 
cluster` when cluster metadata teardown (#23561)
    
    (cherry picked from commit 096986933c2627e9067ee6085ab692431db22883)
---
 .../pulsar/PulsarClusterMetadataTeardown.java      | 43 ++++++++++++-
 .../zookeeper/ClusterMetadataTeardownTest.java     | 74 ++++++++++++++++++++--
 2 files changed, 112 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
index 964a49fe10f..30a0dabea98 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
@@ -19,8 +19,10 @@
 package org.apache.pulsar;
 
 import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.BKException;
@@ -29,12 +31,18 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.resources.TenantResources;
 import 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.docs.tools.CmdGenerateDocs;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -153,12 +161,45 @@ public class PulsarClusterMetadataTeardown {
                     
MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis)
                             
.configFilePath(arguments.configurationStoreConfigPath)
                             
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build());
-            deleteRecursively(configMetadataStore, "/admin/clusters/" + 
arguments.cluster).join();
+            PulsarResources resources = new PulsarResources(metadataStore, 
configMetadataStore);
+            // Cleanup replication cluster from all tenants and namespaces
+            TenantResources tenantResources = resources.getTenantResources();
+            NamespaceResources namespaceResources = 
resources.getNamespaceResources();
+            List<String> tenants = tenantResources.listTenants();
+            for (String tenant : tenants) {
+                List<String> namespaces = 
namespaceResources.listNamespacesAsync(tenant).get();
+                for (String namespace : namespaces) {
+                    namespaceResources.setPolicies(NamespaceName.get(tenant, 
namespace), policies -> {
+                        
policies.replication_clusters.remove(arguments.cluster);
+                        return policies;
+                    });
+                }
+                removeCurrentClusterFromAllowedClusters(tenantResources, 
tenant, arguments.cluster);
+            }
+            try {
+                
resources.getClusterResources().deleteCluster(arguments.cluster);
+            } catch (MetadataStoreException.NotFoundException ex) {
+                // Ignore if the cluster does not exist
+                log.info("Cluster metadata for '{}' does not exist.", 
arguments.cluster);
+            }
         }
 
         log.info("Cluster metadata for '{}' teardown.", arguments.cluster);
     }
 
+    private static void removeCurrentClusterFromAllowedClusters(
+            TenantResources tenantResources, String tenant, String curCluster)
+            throws MetadataStoreException, InterruptedException, 
ExecutionException {
+        Optional<TenantInfo> tenantInfoOptional = 
tenantResources.getTenant(tenant);
+        if (tenantInfoOptional.isEmpty()) {
+            return;
+        }
+        tenantResources.updateTenantAsync(tenant, ti -> {
+            ti.getAllowedClusters().remove(curCluster);
+            return ti;
+        }).get();
+    }
+
     private static CompletableFuture<Void> deleteRecursively(MetadataStore 
metadataStore, String path) {
         return metadataStore.getChildren(path)
                 .thenCompose(children -> FutureUtil.waitForAll(
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataTeardownTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataTeardownTest.java
index 5184afade9c..c689bb60fed 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataTeardownTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataTeardownTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.zookeeper;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 import java.util.SortedMap;
 import org.apache.pulsar.PulsarClusterMetadataSetup;
 import org.apache.pulsar.PulsarClusterMetadataTeardown;
@@ -54,7 +55,7 @@ public class ClusterMetadataTeardownTest {
     @Test
     public void testSetupClusterMetadataAndTeardown() throws Exception {
         String[] args1 = {
-                "--cluster", "testReSetupClusterMetadata-cluster",
+                "--cluster", "cluster1",
                 "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(),
                 "--configuration-store", "127.0.0.1:" + 
localZkS.getZookeeperPort(),
                 "--configuration-metadata-store-config-path", 
"src/test/resources/conf/zk_client_enable_sasl.conf",
@@ -65,7 +66,7 @@ public class ClusterMetadataTeardownTest {
         };
         PulsarClusterMetadataSetup.main(args1);
         SortedMap<String, String> data1 = localZkS.dumpData();
-        String clusterDataJson = 
data1.get("/admin/clusters/testReSetupClusterMetadata-cluster");
+        String clusterDataJson = data1.get("/admin/clusters/cluster1");
         assertNotNull(clusterDataJson);
         ClusterData clusterData = ObjectMapperFactory
                 .getMapper()
@@ -78,13 +79,78 @@ public class ClusterMetadataTeardownTest {
         assertFalse(clusterData.isBrokerClientTlsEnabled());
 
         String[] args2 = {
-                "--cluster", "testReSetupClusterMetadata-cluster",
+                "--cluster", "cluster1",
                 "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(),
                 "--configuration-store", "127.0.0.1:" + 
localZkS.getZookeeperPort(),
                 "--configuration-metadata-store-config-path", 
"src/test/resources/conf/zk_client_enable_sasl.conf",
         };
         PulsarClusterMetadataTeardown.main(args2);
         SortedMap<String, String> data2 = localZkS.dumpData();
-        
assertFalse(data2.containsKey("/admin/clusters/testReSetupClusterMetadata-cluster"));
+        assertFalse(data2.containsKey("/admin/clusters/cluster1"));
+    }
+
+    @Test
+    public void testSetupMultipleClusterMetadataAndTeardown() throws Exception 
{
+        String[] cluster1Args = {
+                "--cluster", "cluster1",
+                "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(),
+                "--configuration-store", "127.0.0.1:" + 
localZkS.getZookeeperPort(),
+                "--configuration-metadata-store-config-path", 
"src/test/resources/conf/zk_client_enable_sasl.conf",
+                "--web-service-url", "http://127.0.0.1:8080";,
+                "--web-service-url-tls", "https://127.0.0.1:8443";,
+                "--broker-service-url", "pulsar://127.0.0.1:6650",
+                "--broker-service-url-tls", "pulsar+ssl://127.0.0.1:6651"
+        };
+        PulsarClusterMetadataSetup.main(cluster1Args);
+        String[] cluster2Args = {
+                "--cluster", "cluster2",
+                "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(),
+                "--configuration-store", "127.0.0.1:" + 
localZkS.getZookeeperPort(),
+                "--configuration-metadata-store-config-path", 
"src/test/resources/conf/zk_client_enable_sasl.conf",
+                "--web-service-url", "http://127.0.0.1:8081";,
+                "--web-service-url-tls", "https://127.0.0.1:8445";,
+                "--broker-service-url", "pulsar://127.0.0.1:6651",
+                "--broker-service-url-tls", "pulsar+ssl://127.0.0.1:6652"
+        };
+        PulsarClusterMetadataSetup.main(cluster2Args);
+        SortedMap<String, String> data1 = localZkS.dumpData();
+        String clusterDataJson = data1.get("/admin/clusters/cluster1");
+        assertNotNull(clusterDataJson);
+        ClusterData clusterData = ObjectMapperFactory
+                .getMapper()
+                .reader()
+                .readValue(clusterDataJson, ClusterData.class);
+        assertEquals(clusterData.getServiceUrl(), "http://127.0.0.1:8080";);
+        assertEquals(clusterData.getServiceUrlTls(), "https://127.0.0.1:8443";);
+        assertEquals(clusterData.getBrokerServiceUrl(), 
"pulsar://127.0.0.1:6650");
+        assertEquals(clusterData.getBrokerServiceUrlTls(), 
"pulsar+ssl://127.0.0.1:6651");
+        assertFalse(clusterData.isBrokerClientTlsEnabled());
+
+        String[] args2 = {
+                "--cluster", "cluster1",
+                "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(),
+                "--configuration-store", "127.0.0.1:" + 
localZkS.getZookeeperPort(),
+                "--configuration-metadata-store-config-path", 
"src/test/resources/conf/zk_client_enable_sasl.conf",
+        };
+        PulsarClusterMetadataTeardown.main(args2);
+        SortedMap<String, String> data2 = localZkS.dumpData();
+        assertFalse(data2.containsKey("/admin/clusters/cluster1"));
+        assertTrue(data2.containsKey("/admin/clusters/cluster2"));
+
+        assertTrue(data2.containsKey("/admin/policies/public"));
+        assertFalse(data2.get("/admin/policies/public").contains("cluster1"));
+        assertTrue(data2.get("/admin/policies/public").contains("cluster2"));
+
+        assertTrue(data2.containsKey("/admin/policies/pulsar"));
+        assertFalse(data2.get("/admin/policies/pulsar").contains("cluster1"));
+        assertTrue(data2.get("/admin/policies/pulsar").contains("cluster2"));
+
+        assertTrue(data2.containsKey("/admin/policies/public/default"));
+        
assertFalse(data2.get("/admin/policies/public/default").contains("cluster1"));
+        
assertTrue(data2.get("/admin/policies/public/default").contains("cluster2"));
+
+        assertTrue(data2.containsKey("/admin/policies/pulsar/system"));
+        
assertFalse(data2.get("/admin/policies/pulsar/system").contains("cluster1"));
+        
assertTrue(data2.get("/admin/policies/pulsar/system").contains("cluster2"));
     }
 }

Reply via email to