This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9a25e599eef808cc71b48b999987df55abb0070f
Author: Qiang Zhao <[email protected]>
AuthorDate: Thu Jan 13 00:52:38 2022 +0800

    [ Issue 13479 ] Fixed internal topic effect by InactiveTopicPolicy. (#13611)
    
    (cherry picked from commit 5835191295371b3a5f49ed1019a8ad197554424e)
---
 .../pulsar/broker/admin/impl/BrokersBase.java      |  3 +-
 .../broker/service/persistent/SystemTopic.java     |  5 +++
 .../pulsar/broker/systopic/SystemTopicClient.java  |  6 ++-
 .../broker/service/InactiveTopicDeleteTest.java    | 49 ++++++++++++++++++++++
 4 files changed, 61 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 756d141..bb9453e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -70,6 +70,7 @@ import org.slf4j.LoggerFactory;
 public class BrokersBase extends PulsarWebResource {
     private static final Logger LOG = 
LoggerFactory.getLogger(BrokersBase.class);
     private static final Duration HEALTHCHECK_READ_TIMEOUT = 
Duration.ofSeconds(10);
+    public static final String HEALTH_CHECK_TOPIC_SUFFIX = "healthcheck";
 
     @GET
     @Path("/{cluster}")
@@ -317,7 +318,7 @@ public class BrokersBase extends PulsarWebResource {
                             pulsar().getConfiguration());
 
 
-            topic = String.format("persistent://%s/healthcheck", 
heartbeatNamespace);
+            topic = String.format("persistent://%s/%s", heartbeatNamespace, 
HEALTH_CHECK_TOPIC_SUFFIX);
 
             LOG.info("Running healthCheck with topic={}", topic);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
index 6e3173f..f1d7b06 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
@@ -31,6 +31,11 @@ public class SystemTopic extends PersistentTopic {
     }
 
     @Override
+    public boolean isDeleteWhileInactive() {
+        return false;
+    }
+
+    @Override
     public boolean isSizeBacklogExceeded() {
         return false;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
index e838a69..122bd9f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.admin.impl.BrokersBase;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -201,7 +202,10 @@ public interface SystemTopicClient<T> {
         if (StringUtils.endsWith(localName, 
MLPendingAckStore.PENDING_ACK_STORE_SUFFIX)) {
             return true;
         }
-
+        // health check topic
+        if (StringUtils.endsWith(localName, 
BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX)){
+            return true;
+        }
         return false;
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index 94ac6e8..3b7d9aa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -34,6 +34,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.admin.impl.BrokersBase;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
@@ -576,4 +578,51 @@ public class InactiveTopicDeleteTest extends 
BrokerTestBase {
         Awaitility.await().untilAsserted(()
                 -> assertEquals(admin.topics().getInactiveTopicPolicies(topic, 
true), brokerLevelPolicy));
     }
+
+    @Test(timeOut = 30000)
+    public void testInternalTopicInactiveNotClean() throws Exception {
+        conf.setSystemTopicEnabled(true);
+        
conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+        conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
+        super.baseSetup();
+        // init topic
+        final String healthCheckTopic = "persistent://prop/ns-abc/"+ 
BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX;
+        final String topic = 
"persistent://prop/ns-abc/testDeleteWhenNoSubscriptions";
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscribe();
+
+        Producer<byte[]> heathCheckProducer = pulsarClient.newProducer()
+                .topic(healthCheckTopic)
+                .create();
+        Consumer<byte[]> heathCheckConsumer = pulsarClient.newConsumer()
+                .topic(healthCheckTopic)
+                .subscriptionName("healthCheck")
+                .subscribe();
+
+        consumer.close();
+        producer.close();
+        heathCheckConsumer.close();
+        heathCheckProducer.close();
+
+        Awaitility.await().untilAsserted(() -> 
Assert.assertTrue(admin.topics().getList("prop/ns-abc")
+                .contains(topic)));
+        Awaitility.await().untilAsserted(() -> {
+            
Assert.assertTrue(admin.topics().getList("prop/ns-abc").contains(healthCheckTopic));
+        });
+
+        admin.topics().deleteSubscription(topic, "sub");
+        admin.topics().deleteSubscription(healthCheckTopic, "healthCheck");
+
+        Awaitility.await().untilAsserted(() -> 
Assert.assertFalse(admin.topics().getList("prop/ns-abc")
+                .contains(topic)));
+        Awaitility.await().pollDelay(2, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
Assert.assertTrue(admin.topics().getList("prop/ns-abc")
+                        .contains(healthCheckTopic)));
+    }
 }

Reply via email to