This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new f451148  Clean the metadata of the non-persistent partitioned topics. 
(#12549)
f451148 is described below

commit f4511489d5501895ab127e3df9f7e74ae2adcc23
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Dec 15 16:21:59 2021 +0800

    Clean the metadata of the non-persistent partitioned topics. (#12549)
    
    ## Motivation
    This is a fix about non-persistent partitioned topics related to #8442.
    
    ## Modification
    - Add tryToDeletePartitionedMetadata to clean up the metadata info.
---
 .../service/nonpersistent/NonPersistentTopic.java  | 30 ++++++++++++++++++++++
 .../broker/service/NonPersistentTopicE2ETest.java  | 14 ++++++++++
 2 files changed, 44 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 86aa9fc..3148073 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -429,6 +429,35 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
         return deleteFuture;
     }
 
+    private CompletableFuture<Void> tryToDeletePartitionedMetadata() {
+        if (TopicName.get(topic).isPartitioned() && 
!deletePartitionedTopicMetadataWhileInactive()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        TopicName topicName = 
TopicName.get(TopicName.get(topic).getPartitionedTopicName());
+        String path = 
AdminResource.path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE, 
topicName.getNamespace()
+                , topicName.getDomain().value(), 
topicName.getEncodedLocalName());
+        try {
+            if (!getBrokerService().pulsar().getGlobalZkCache().exists(path)) {
+                return CompletableFuture.completedFuture(null);
+            }
+            CompletableFuture<Void> deleteMetadataFuture = new 
CompletableFuture<>();
+            
getBrokerService().pulsar().getGlobalZkCache().getZooKeeper().delete(path, -1
+                    , (rc, s, o) -> {
+                        if (KeeperException.Code.OK.intValue() == rc
+                                || KeeperException.Code.NONODE.intValue() == 
rc) {
+                            
getBrokerService().pulsar().getGlobalZkCache().invalidate(path);
+                            deleteMetadataFuture.complete(null);
+                        } else {
+                            deleteMetadataFuture.completeExceptionally(
+                                    
KeeperException.create(KeeperException.Code.get(rc)));
+                        }
+                    }, null);
+            return deleteMetadataFuture;
+        } catch (Exception e) {
+            return FutureUtil.failedFuture(e);
+        }
+    }
+
     /**
      * Close this topic - close all producers and subscriptions associated 
with this topic
      *
@@ -872,6 +901,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
                     }
 
                     stopReplProducers().thenCompose(v -> delete(true, false, 
true))
+                            .thenAccept(__ -> tryToDeletePartitionedMetadata())
                             .thenRun(() -> log.info("[{}] Topic deleted 
successfully due to inactivity", topic))
                             .exceptionally(e -> {
                                 if (e.getCause() instanceof 
TopicBusyException) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
index fab7a5e..bd22999 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
@@ -55,6 +55,8 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase 
{
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
+        conf.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(true);
+        conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
         super.baseSetup();
     }
 
@@ -231,5 +233,17 @@ public class NonPersistentTopicE2ETest extends 
BrokerTestBase {
         producer2.close();
 
         
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+        // 6. Test for partitioned topic to delete the partitioned metadata
+        String topicGc = "non-persistent://prop/ns-abc/topic-gc";
+        int partitions = 5;
+        admin.topics().createPartitionedTopic(topicGc, partitions);
+        Producer<byte[]> producer3 = 
pulsarClient.newProducer().topic(topicGc).create();
+        producer3.close();
+        
assertTrue(pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+                TopicName.get(topicGc)).join().partitions == partitions);
+        runGC();
+        
assertTrue(pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+                TopicName.get(topicGc)).join().partitions == 0);
     }
 }

Reply via email to