This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9a25e599eef808cc71b48b999987df55abb0070f Author: Qiang Zhao <[email protected]> AuthorDate: Thu Jan 13 00:52:38 2022 +0800 [ Issue 13479 ] Fixed internal topic effect by InactiveTopicPolicy. (#13611) (cherry picked from commit 5835191295371b3a5f49ed1019a8ad197554424e) --- .../pulsar/broker/admin/impl/BrokersBase.java | 3 +- .../broker/service/persistent/SystemTopic.java | 5 +++ .../pulsar/broker/systopic/SystemTopicClient.java | 6 ++- .../broker/service/InactiveTopicDeleteTest.java | 49 ++++++++++++++++++++++ 4 files changed, 61 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 756d141..bb9453e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -70,6 +70,7 @@ import org.slf4j.LoggerFactory; public class BrokersBase extends PulsarWebResource { private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class); private static final Duration HEALTHCHECK_READ_TIMEOUT = Duration.ofSeconds(10); + public static final String HEALTH_CHECK_TOPIC_SUFFIX = "healthcheck"; @GET @Path("/{cluster}") @@ -317,7 +318,7 @@ public class BrokersBase extends PulsarWebResource { pulsar().getConfiguration()); - topic = String.format("persistent://%s/healthcheck", heartbeatNamespace); + topic = String.format("persistent://%s/%s", heartbeatNamespace, HEALTH_CHECK_TOPIC_SUFFIX); LOG.info("Running healthCheck with topic={}", topic); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java index 6e3173f..f1d7b06 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java @@ -31,6 +31,11 @@ public class SystemTopic extends PersistentTopic { } @Override + public boolean isDeleteWhileInactive() { + return false; + } + + @Override public boolean isSizeBacklogExceeded() { return false; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java index e838a69..122bd9f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.admin.impl.BrokersBase; import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -201,7 +202,10 @@ public interface SystemTopicClient<T> { if (StringUtils.endsWith(localName, MLPendingAckStore.PENDING_ACK_STORE_SUFFIX)) { return true; } - + // health check topic + if (StringUtils.endsWith(localName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX)){ + return true; + } return false; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java index 94ac6e8..3b7d9aa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java @@ -34,6 +34,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.admin.impl.BrokersBase; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; @@ -576,4 +578,51 @@ public class InactiveTopicDeleteTest extends BrokerTestBase { Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getInactiveTopicPolicies(topic, true), brokerLevelPolicy)); } + + @Test(timeOut = 30000) + public void testInternalTopicInactiveNotClean() throws Exception { + conf.setSystemTopicEnabled(true); + conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions); + conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1); + super.baseSetup(); + // init topic + final String healthCheckTopic = "persistent://prop/ns-abc/"+ BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX; + final String topic = "persistent://prop/ns-abc/testDeleteWhenNoSubscriptions"; + + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .create(); + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("sub") + .subscribe(); + + Producer<byte[]> heathCheckProducer = pulsarClient.newProducer() + .topic(healthCheckTopic) + .create(); + Consumer<byte[]> heathCheckConsumer = pulsarClient.newConsumer() + .topic(healthCheckTopic) + .subscriptionName("healthCheck") + .subscribe(); + + consumer.close(); + producer.close(); + heathCheckConsumer.close(); + heathCheckProducer.close(); + + Awaitility.await().untilAsserted(() -> Assert.assertTrue(admin.topics().getList("prop/ns-abc") + .contains(topic))); + Awaitility.await().untilAsserted(() -> { + Assert.assertTrue(admin.topics().getList("prop/ns-abc").contains(healthCheckTopic)); + }); + + admin.topics().deleteSubscription(topic, "sub"); + admin.topics().deleteSubscription(healthCheckTopic, "healthCheck"); + + Awaitility.await().untilAsserted(() -> Assert.assertFalse(admin.topics().getList("prop/ns-abc") + .contains(topic))); + Awaitility.await().pollDelay(2, TimeUnit.SECONDS) + .untilAsserted(() -> Assert.assertTrue(admin.topics().getList("prop/ns-abc") + .contains(healthCheckTopic))); + } }
