This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 49490b382fa [fix][broker] Fix the issue of topics possibly being 
deleted. (#21704)
49490b382fa is described below

commit 49490b382fa12295655804abd101dc07134b7449
Author: crossoverJie <[email protected]>
AuthorDate: Thu Dec 14 21:45:37 2023 +0800

    [fix][broker] Fix the issue of topics possibly being deleted. (#21704)
    
    Co-authored-by: Jiwe Guo <[email protected]>
    (cherry picked from commit 84ea1ca05decbcb5d3a3bd1812e53ad10773b259)
---
 .../broker/service/persistent/PersistentTopic.java |   9 +-
 .../PersistentTopicInitializeDelayTest.java        | 142 +++++++++++++++++++++
 2 files changed, 149 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d0de36c6242..4409114e013 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -296,8 +296,6 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 .build();
         this.backloggedCursorThresholdEntries =
                 
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
-        registerTopicPolicyListener();
-
         this.messageDeduplication = new 
MessageDeduplication(brokerService.pulsar(), this, ledger);
         if (ledger.getProperties().containsKey(TOPIC_EPOCH_PROPERTY_NAME)) {
             topicEpoch = 
Optional.of(Long.parseLong(ledger.getProperties().get(TOPIC_EPOCH_PROPERTY_NAME)));
@@ -1594,6 +1592,11 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         }
 
         List<String> configuredClusters = 
topicPolicies.getReplicationClusters().get();
+        if (CollectionUtils.isEmpty(configuredClusters)) {
+            log.warn("[{}] No replication clusters configured", name);
+            return CompletableFuture.completedFuture(null);
+        }
+
         int newMessageTTLInSeconds = 
topicPolicies.getMessageTTLInSeconds().get();
 
         String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
@@ -3534,6 +3537,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     protected CompletableFuture<Void> initTopicPolicy() {
         if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
                 && 
brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+            brokerService.getPulsar().getTopicPoliciesService()
+                    
.registerListener(TopicName.getPartitionedTopicName(topic), this);
             return CompletableFuture.completedFuture(null).thenRunAsync(() -> 
onUpdate(
                             brokerService.getPulsar().getTopicPoliciesService()
                                     
.getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic))),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicInitializeDelayTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicInitializeDelayTest.java
new file mode 100644
index 00000000000..ab8d4dbe5cc
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicInitializeDelayTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+@Test(groups = "broker")
+@Slf4j
+public class PersistentTopicInitializeDelayTest extends BrokerTestBase {
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        conf.setTopicFactoryClassName(MyTopicFactory.class.getName());
+        conf.setAllowAutoTopicCreation(true);
+        conf.setManagedLedgerMaxEntriesPerLedger(1);
+        conf.setBrokerDeleteInactiveTopicsEnabled(false);
+        conf.setTransactionCoordinatorEnabled(false);
+        conf.setTopicLoadTimeoutSeconds(30);
+        super.baseSetup();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test(timeOut = 30 * 1000)
+    public void testTopicInitializeDelay() throws Exception {
+        admin.tenants().createTenant("public", 
TenantInfo.builder().allowedClusters(Set.of(configClusterName)).build());
+        String namespace = "public/initialize-delay";
+        admin.namespaces().createNamespace(namespace);
+        final String topicName = "persistent://" + namespace + 
"/testTopicInitializeDelay";
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        admin.topicPolicies().setMaxConsumers(topicName, 10);
+        Awaitility.await().untilAsserted(() -> 
assertEquals(admin.topicPolicies().getMaxConsumers(topicName), 10));
+        admin.topics().unload(topicName);
+        CompletableFuture<Optional<Topic>> optionalFuture = 
pulsar.getBrokerService().getTopic(topicName, true);
+
+        Optional<Topic> topic = optionalFuture.get(15, TimeUnit.SECONDS);
+        assertTrue(topic.isPresent());
+    }
+
+    public static class MyTopicFactory implements TopicFactory {
+        @Override
+        public <T extends Topic> T create(String topic, ManagedLedger ledger, 
BrokerService brokerService,
+                                          Class<T> topicClazz) {
+            try {
+                if (topicClazz == NonPersistentTopic.class) {
+                    return (T) new NonPersistentTopic(topic, brokerService);
+                } else {
+                    return (T) new MyPersistentTopic(topic, ledger, 
brokerService);
+                }
+            } catch (Exception e) {
+                throw new IllegalStateException(e);
+            }
+        }
+
+        @Override
+        public void close() throws IOException {
+            // No-op
+        }
+    }
+
+    public static class MyPersistentTopic extends PersistentTopic {
+
+        private static AtomicInteger checkReplicationInvocationCount = new 
AtomicInteger(0);
+
+        public MyPersistentTopic(String topic, ManagedLedger ledger, 
BrokerService brokerService) {
+            super(topic, ledger, brokerService);
+            SystemTopicBasedTopicPoliciesService topicPoliciesService =
+                    (SystemTopicBasedTopicPoliciesService) 
brokerService.getPulsar().getTopicPoliciesService();
+            if 
(topicPoliciesService.getListeners().containsKey(TopicName.get(topic)) ) {
+                
this.onUpdate(brokerService.getPulsar().getTopicPoliciesService().getTopicPoliciesIfExists(TopicName.get(topic)));
+            }
+        }
+
+        protected void updateTopicPolicyByNamespacePolicy(Policies 
namespacePolicies) {
+            try {
+                Thread.sleep(10 * 1000);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            super.updateTopicPolicyByNamespacePolicy(namespacePolicies);
+        }
+
+        public CompletableFuture<Void> checkReplication() {
+            if 
(TopicName.get(topic).getLocalName().equalsIgnoreCase("testTopicInitializeDelay"))
 {
+                checkReplicationInvocationCount.incrementAndGet();
+                log.info("checkReplication, count = {}", 
checkReplicationInvocationCount.get());
+                List<String> configuredClusters = 
topicPolicies.getReplicationClusters().get();
+                if (!(configuredClusters.size() == 1 && 
configuredClusters.contains(brokerService.pulsar().getConfiguration().getClusterName())))
 {
+                    try {
+                        // this will cause the get topic timeout.
+                        Thread.sleep(8 * 1000);
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                    throw new RuntimeException("checkReplication error");
+                }
+            }
+            return super.checkReplication();
+        }
+    }
+}

Reply via email to