This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new a3c5feb78bd [fix][broker] The feature 
brokerDeleteInactivePartitionedTopicMetadataEnabled leaves orphan topic 
policies and topic schemas (#24150)
a3c5feb78bd is described below

commit a3c5feb78bd2330d00933def684f5e50bf6ff221
Author: fengyubiao <[email protected]>
AuthorDate: Wed Apr 9 14:29:31 2025 +0800

    [fix][broker] The feature 
brokerDeleteInactivePartitionedTopicMetadataEnabled leaves orphan topic 
policies and topic schemas (#24150)
---
 conf/broker.conf                                   |   5 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |   8 +-
 .../broker/service/persistent/PersistentTopic.java |  10 +-
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |  14 ++
 .../broker/service/OneWayReplicatorTestBase.java   |  17 ++
 .../broker/service/ReplicationTopicGcTest.java     | 192 +++++++++++++++++++++
 .../ReplicationTopicGcUsingGlobalZKTest.java       |  63 +++++++
 7 files changed, 304 insertions(+), 5 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 02fde814906..cfa7013cfc6 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -218,8 +218,11 @@ brokerDeleteInactiveTopicsFrequencySeconds=60
 brokerDeleteInactiveTopicsMode=delete_when_no_subscriptions
 
 # Metadata of inactive partitioned topic will not be cleaned up automatically 
by default.
-# Note: If `allowAutoTopicCreation` and this option are enabled at the same 
time,
+# Note 1: If `allowAutoTopicCreation` and this option are enabled at the same 
time,
 # it may appear that a partitioned topic has just been deleted but is 
automatically created as a non-partitioned topic.
+# Note 2: Activating bidirectional geo-replication under global configuration 
ZooKeeper may lead to schema remnants and
+# abnormal topic-level policies.
+# Note 3: Activating bidirectional geo-replication under global configuration 
ZooKeeper may lead to a consumption issue.
 brokerDeleteInactivePartitionedTopicMetadataEnabled=false
 
 # Max duration of topic inactivity in seconds, default is not present
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2b1b799fa20..0c2ac4635f1 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -694,9 +694,13 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
             category = CATEGORY_POLICIES,
             dynamic = true,
             doc = "Metadata of inactive partitioned topic will not be 
automatically cleaned up by default.\n"
-            + "Note: If `allowAutoTopicCreation` and this option are enabled 
at the same time,\n"
+            + "Note 1: If `allowAutoTopicCreation` and this option are enabled 
at the same time,\n"
             + "it may appear that a partitioned topic has just been deleted 
but is automatically created as a "
-                    + "non-partitioned topic."
+            + "non-partitioned topic.\n"
+            + "Note 2: Activating bidirectional geo-replication under global 
ZooKeeper configuration may lead to schema"
+            + " remnants and abnormal topic-level policies.\n"
+            + "Note 3: Activating bidirectional geo-replication under global 
configuration ZooKeeper may lead"
+            + " to a consumption issue."
     )
     private boolean brokerDeleteInactivePartitionedTopicMetadataEnabled = 
false;
     @FieldContext(
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 811388a692b..6738cfdce3c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3246,8 +3246,14 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                                             
String.format("Another partition exists for [%s].",
                                                                     
topicName));
                                                 } else {
-                                                    return 
partitionedTopicResources
-                                                            
.deletePartitionedTopicAsync(topicName);
+                                                    try {
+                                                        return 
brokerService.getPulsar().getAdminClient().topics()
+                                                                
.deletePartitionedTopicAsync(topicName.toString());
+                                                    } catch 
(PulsarServerException e) {
+                                                        log.info("[{}] Delete 
topic metadata failed due to failed to"
+                                                                + " get 
internal admin client.", topicName, e);
+                                                        return 
CompletableFuture.failedFuture(e);
+                                                    }
                                                 }
                                             });
                                 }))
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 3f5ee721a7e..87c48e961db 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -3744,4 +3744,18 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         Assert.assertTrue(permissions11.isEmpty());
         Assert.assertTrue(permissions22.isEmpty());
     }
+
+    @Test
+    public void testDeletePatchyPartitionedTopic() throws Exception {
+        final String topic = BrokerTestUtil.newUniqueName(defaultNamespace + 
"/tp");
+        admin.topics().createPartitionedTopic(topic, 2);
+        Producer producer = 
pulsarClient.newProducer().topic(TopicName.get(topic).getPartition(0).toString())
+                .create();
+        // Mock a scenario that "-partition-1" has been removed due to topic 
GC.
+        
pulsar.getBrokerService().getTopic(TopicName.get(topic).getPartition(1).toString(),
 false)
+                .get().get().delete().join();
+        // Verify: delete partitioned topic.
+        producer.close();
+        admin.topics().deletePartitionedTopicAsync(topic, false).get();
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index 200c8dd3b3d..fdb01fc867a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -480,4 +480,21 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
             admin2.topics().delete(topicName.toString());
         }
     }
+
+    protected void waitReplicatorStopped(PulsarService sourceCluster, 
PulsarService targetCluster, String topicName) {
+        Awaitility.await().atMost(Duration.ofSeconds(60)).untilAsserted(() -> {
+            Optional<Topic> topicOptional2 = 
targetCluster.getBrokerService().getTopic(topicName, false).get();
+            assertTrue(topicOptional2.isPresent());
+            PersistentTopic persistentTopic2 = (PersistentTopic) 
topicOptional2.get();
+            for (org.apache.pulsar.broker.service.Producer producer : 
persistentTopic2.getProducers().values()) {
+                assertFalse(producer.getProducerName()
+                        
.startsWith(targetCluster.getConfiguration().getReplicatorPrefix()));
+            }
+            Optional<Topic> topicOptional1 = 
sourceCluster.getBrokerService().getTopic(topicName, false).get();
+            assertTrue(topicOptional1.isPresent());
+            PersistentTopic persistentTopic1 = (PersistentTopic) 
topicOptional1.get();
+            assertTrue(persistentTopic1.getReplicators().isEmpty()
+                    || 
!persistentTopic1.getReplicators().get(targetCluster.getConfig().getClusterName()).isConnected());
+        });
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcTest.java
new file mode 100644
index 00000000000..267e8547914
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.PublishRate;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZookeeperServerTest;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class ReplicationTopicGcTest extends OneWayReplicatorTestBase {
+
+    @Override
+    @BeforeClass(alwaysRun = true, timeOut = 300000)
+    public void setup() throws Exception {
+        super.setup();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true, timeOut = 300000)
+    public void cleanup() throws Exception {
+        super.cleanup();
+    }
+
+    @DataProvider(name = "topicTypes")
+    public Object[][] topicTypes() {
+        return new Object[][]{
+                {TopicType.NON_PARTITIONED},
+                {TopicType.PARTITIONED}
+        };
+    }
+
+    protected void setConfigDefaults(ServiceConfiguration config, String 
clusterName,
+                                     LocalBookkeeperEnsemble 
bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
+        super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, 
brokerConfigZk);
+        
config.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+        config.setBrokerDeleteInactiveTopicsEnabled(true);
+        config.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(true);
+        config.setBrokerDeleteInactiveTopicsFrequencySeconds(5);
+        config.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(5);
+        config.setReplicationPolicyCheckDurationSeconds(1);
+    }
+
+    @Test(dataProvider = "topicTypes")
+    public void testTopicGC(TopicType topicType) throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
+        final String schemaId = TopicName.get(topicName).getSchemaName();
+        final String subTopic = topicType == TopicType.NON_PARTITIONED ? 
topicName
+                : TopicName.get(topicName).getPartition(0).toString();
+        if (topicType == TopicType.NON_PARTITIONED) {
+            admin1.topics().createNonPartitionedTopic(topicName);
+        } else {
+            admin1.topics().createPartitionedTopic(topicName, 1);
+        }
+
+        Producer<String> producer1 = 
client1.newProducer(Schema.STRING).topic(topicName).create();
+        // Wait for replicator started.
+        waitReplicatorStarted(subTopic);
+
+        // Trigger a topic level policies.
+        PublishRate publishRate = new PublishRate(1000, 1024 * 1024);
+        admin1.topicPolicies().setPublishRate(topicName, publishRate);
+        admin2.topicPolicies().setPublishRate(topicName, publishRate);
+        // Write a schema.
+        // Since there is a producer registered one the source cluster, 
skipped to write a schema.
+        admin2.schemas().createSchema(topicName, 
Schema.STRING.getSchemaInfo());
+
+        // Trigger GC through close all clients.
+        producer1.close();
+        // Verify: all resources were deleted.
+        Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
+            // sub topic.
+            assertFalse(pulsar1.getPulsarResources().getTopicResources()
+                    .persistentTopicExists(TopicName.get(subTopic)).get());
+            assertFalse(pulsar2.getPulsarResources().getTopicResources()
+                    .persistentTopicExists(TopicName.get(subTopic)).get());
+            // partitioned topic.
+            assertFalse(pulsar1.getPulsarResources().getNamespaceResources()
+                    
.getPartitionedTopicResources().partitionedTopicExists(TopicName.get(topicName)));
+            assertFalse(pulsar2.getPulsarResources().getNamespaceResources()
+                    
.getPartitionedTopicResources().partitionedTopicExists(TopicName.get(topicName)));
+            // topic policies.
+            
assertTrue(pulsar1.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(topicName),
+                    TopicPoliciesService.GetType.DEFAULT).get().isEmpty());
+            
assertTrue(pulsar2.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(topicName),
+                    TopicPoliciesService.GetType.DEFAULT).get().isEmpty());
+            // schema.
+            
assertTrue(CollectionUtils.isEmpty(pulsar1.getSchemaStorage().getAll(schemaId).get()));
+            
assertTrue(CollectionUtils.isEmpty(pulsar2.getSchemaStorage().getAll(schemaId).get()));
+        });
+    }
+
+    @Test(dataProvider = "topicTypes")
+    public void testRemoteClusterStillConsumeAfterCurrentClusterGc(TopicType 
topicType) throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
+        final String subscription = "s1";
+        final String schemaId = TopicName.get(topicName).getSchemaName();
+        final String subTopic = topicType == TopicType.NON_PARTITIONED ? 
topicName
+                : TopicName.get(topicName).getPartition(0).toString();
+        if (topicType == TopicType.NON_PARTITIONED) {
+            admin1.topics().createNonPartitionedTopic(topicName);
+        } else {
+            admin1.topics().createPartitionedTopic(topicName, 1);
+        }
+
+        // Wait for replicator started.
+        Producer<String> producer1 = 
client1.newProducer(Schema.STRING).topic(topicName).create();
+        waitReplicatorStarted(subTopic);
+        admin2.topics().createSubscription(topicName, subscription, 
MessageId.earliest);
+
+        if (usingGlobalZK) {
+            admin2.topics().setReplicationClusters(topicName, 
Collections.singletonList(cluster2));
+            waitReplicatorStopped(pulsar2, pulsar1, subTopic);
+        }
+
+        // Send a message
+        producer1.send("msg-1");
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(admin2.topics().getStats(subTopic).getSubscriptions().get(subscription).getMsgBacklog(),
 1);
+        });
+
+        // Trigger GC through close all clients.
+        producer1.close();
+        // Verify: the topic was removed on the source cluster.
+        Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
+            // sub topic.
+            assertFalse(pulsar1.getPulsarResources().getTopicResources()
+                    .persistentTopicExists(TopicName.get(subTopic)).get());
+            // partitioned topic.
+            assertFalse(pulsar1.getPulsarResources().getNamespaceResources()
+                    
.getPartitionedTopicResources().partitionedTopicExists(TopicName.get(topicName)));
+            // topic policies.
+            
assertTrue(pulsar1.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(topicName),
+                    TopicPoliciesService.GetType.DEFAULT).get().isEmpty());
+            // schema.
+            
assertTrue(CollectionUtils.isEmpty(pulsar1.getSchemaStorage().getAll(schemaId).get()));
+        });
+
+        // Verify: consumer still can consume messages from the remote cluster.
+        org.apache.pulsar.client.api.Consumer<String> consumer =
+                
client2.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subscription).subscribe();
+        Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+        assertNotNull(msg);
+        assertEquals(msg.getValue(), "msg-1");
+
+        // Cleanup.
+        consumer.close();
+        if (topicType == TopicType.NON_PARTITIONED) {
+            admin2.topics().delete(topicName);
+        } else {
+            admin2.topics().deletePartitionedTopic(topicName);
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcUsingGlobalZKTest.java
new file mode 100644
index 00000000000..ce6eddc5698
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcUsingGlobalZKTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class ReplicationTopicGcUsingGlobalZKTest extends 
ReplicationTopicGcTest {
+
+    @Override
+    @BeforeClass(alwaysRun = true, timeOut = 300000)
+    public void setup() throws Exception {
+        super.usingGlobalZK = true;
+        super.setup();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true, timeOut = 300000)
+    public void cleanup() throws Exception {
+        super.cleanup();
+    }
+
+    @Test(dataProvider = "topicTypes")
+    public void testTopicGC(TopicType topicType) throws Exception {
+        if (topicType.equals(TopicType.PARTITIONED)) {
+            // Pulsar does not support the feature 
"brokerDeleteInactivePartitionedTopicMetadataEnabled" when enabling
+            // Geo-Replication with Global ZK.
+            return;
+        }
+        super.testTopicGC(topicType);
+    }
+
+    @Test(dataProvider = "topicTypes")
+    public void testRemoteClusterStillConsumeAfterCurrentClusterGc(TopicType 
topicType) throws Exception {
+        if (topicType.equals(TopicType.PARTITIONED)) {
+            // Pulsar does not support the feature 
"brokerDeleteInactivePartitionedTopicMetadataEnabled" when enabling
+            // Geo-Replication with Global ZK.
+            return;
+        }
+        super.testRemoteClusterStillConsumeAfterCurrentClusterGc(topicType);
+    }
+}

Reply via email to