This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new f451148 Clean the metadata of the non-persistent partitioned topics.
(#12549)
f451148 is described below
commit f4511489d5501895ab127e3df9f7e74ae2adcc23
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Dec 15 16:21:59 2021 +0800
Clean the metadata of the non-persistent partitioned topics. (#12549)
## Motivation
This is a fix about non-persistent partitioned topics related to #8442.
## Modification
- Add tryToDeletePartitionedMetadata to clean up the metadata info.
---
.../service/nonpersistent/NonPersistentTopic.java | 30 ++++++++++++++++++++++
.../broker/service/NonPersistentTopicE2ETest.java | 14 ++++++++++
2 files changed, 44 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 86aa9fc..3148073 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -429,6 +429,35 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic {
return deleteFuture;
}
+ private CompletableFuture<Void> tryToDeletePartitionedMetadata() {
+ if (TopicName.get(topic).isPartitioned() &&
!deletePartitionedTopicMetadataWhileInactive()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ TopicName topicName =
TopicName.get(TopicName.get(topic).getPartitionedTopicName());
+ String path =
AdminResource.path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE,
topicName.getNamespace()
+ , topicName.getDomain().value(),
topicName.getEncodedLocalName());
+ try {
+ if (!getBrokerService().pulsar().getGlobalZkCache().exists(path)) {
+ return CompletableFuture.completedFuture(null);
+ }
+ CompletableFuture<Void> deleteMetadataFuture = new
CompletableFuture<>();
+
getBrokerService().pulsar().getGlobalZkCache().getZooKeeper().delete(path, -1
+ , (rc, s, o) -> {
+ if (KeeperException.Code.OK.intValue() == rc
+ || KeeperException.Code.NONODE.intValue() ==
rc) {
+
getBrokerService().pulsar().getGlobalZkCache().invalidate(path);
+ deleteMetadataFuture.complete(null);
+ } else {
+ deleteMetadataFuture.completeExceptionally(
+
KeeperException.create(KeeperException.Code.get(rc)));
+ }
+ }, null);
+ return deleteMetadataFuture;
+ } catch (Exception e) {
+ return FutureUtil.failedFuture(e);
+ }
+ }
+
/**
* Close this topic - close all producers and subscriptions associated
with this topic
*
@@ -872,6 +901,7 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic {
}
stopReplProducers().thenCompose(v -> delete(true, false,
true))
+ .thenAccept(__ -> tryToDeletePartitionedMetadata())
.thenRun(() -> log.info("[{}] Topic deleted
successfully due to inactivity", topic))
.exceptionally(e -> {
if (e.getCause() instanceof
TopicBusyException) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
index fab7a5e..bd22999 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
@@ -55,6 +55,8 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase
{
@BeforeMethod
@Override
protected void setup() throws Exception {
+ conf.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(true);
+ conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
super.baseSetup();
}
@@ -231,5 +233,17 @@ public class NonPersistentTopicE2ETest extends
BrokerTestBase {
producer2.close();
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+ // 6. Test for partitioned topic to delete the partitioned metadata
+ String topicGc = "non-persistent://prop/ns-abc/topic-gc";
+ int partitions = 5;
+ admin.topics().createPartitionedTopic(topicGc, partitions);
+ Producer<byte[]> producer3 =
pulsarClient.newProducer().topic(topicGc).create();
+ producer3.close();
+
assertTrue(pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+ TopicName.get(topicGc)).join().partitions == partitions);
+ runGC();
+
assertTrue(pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+ TopicName.get(topicGc)).join().partitions == 0);
}
}