This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 49490b382fa [fix][broker] Fix the issue of topics possibly being
deleted. (#21704)
49490b382fa is described below
commit 49490b382fa12295655804abd101dc07134b7449
Author: crossoverJie <[email protected]>
AuthorDate: Thu Dec 14 21:45:37 2023 +0800
[fix][broker] Fix the issue of topics possibly being deleted. (#21704)
Co-authored-by: Jiwe Guo <[email protected]>
(cherry picked from commit 84ea1ca05decbcb5d3a3bd1812e53ad10773b259)
---
.../broker/service/persistent/PersistentTopic.java | 9 +-
.../PersistentTopicInitializeDelayTest.java | 142 +++++++++++++++++++++
2 files changed, 149 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d0de36c6242..4409114e013 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -296,8 +296,6 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
.build();
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
- registerTopicPolicyListener();
-
this.messageDeduplication = new
MessageDeduplication(brokerService.pulsar(), this, ledger);
if (ledger.getProperties().containsKey(TOPIC_EPOCH_PROPERTY_NAME)) {
topicEpoch =
Optional.of(Long.parseLong(ledger.getProperties().get(TOPIC_EPOCH_PROPERTY_NAME)));
@@ -1594,6 +1592,11 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
List<String> configuredClusters =
topicPolicies.getReplicationClusters().get();
+ if (CollectionUtils.isEmpty(configuredClusters)) {
+ log.warn("[{}] No replication clusters configured", name);
+ return CompletableFuture.completedFuture(null);
+ }
+
int newMessageTTLInSeconds =
topicPolicies.getMessageTTLInSeconds().get();
String localCluster =
brokerService.pulsar().getConfiguration().getClusterName();
@@ -3534,6 +3537,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
protected CompletableFuture<Void> initTopicPolicy() {
if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
&&
brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+ brokerService.getPulsar().getTopicPoliciesService()
+
.registerListener(TopicName.getPartitionedTopicName(topic), this);
return CompletableFuture.completedFuture(null).thenRunAsync(() ->
onUpdate(
brokerService.getPulsar().getTopicPoliciesService()
.getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic))),
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicInitializeDelayTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicInitializeDelayTest.java
new file mode 100644
index 00000000000..ab8d4dbe5cc
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicInitializeDelayTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+@Test(groups = "broker")
+@Slf4j
+public class PersistentTopicInitializeDelayTest extends BrokerTestBase {
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ conf.setTopicFactoryClassName(MyTopicFactory.class.getName());
+ conf.setAllowAutoTopicCreation(true);
+ conf.setManagedLedgerMaxEntriesPerLedger(1);
+ conf.setBrokerDeleteInactiveTopicsEnabled(false);
+ conf.setTransactionCoordinatorEnabled(false);
+ conf.setTopicLoadTimeoutSeconds(30);
+ super.baseSetup();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test(timeOut = 30 * 1000)
+ public void testTopicInitializeDelay() throws Exception {
+ admin.tenants().createTenant("public",
TenantInfo.builder().allowedClusters(Set.of(configClusterName)).build());
+ String namespace = "public/initialize-delay";
+ admin.namespaces().createNamespace(namespace);
+ final String topicName = "persistent://" + namespace +
"/testTopicInitializeDelay";
+ admin.topics().createNonPartitionedTopic(topicName);
+
+ admin.topicPolicies().setMaxConsumers(topicName, 10);
+ Awaitility.await().untilAsserted(() ->
assertEquals(admin.topicPolicies().getMaxConsumers(topicName), 10));
+ admin.topics().unload(topicName);
+ CompletableFuture<Optional<Topic>> optionalFuture =
pulsar.getBrokerService().getTopic(topicName, true);
+
+ Optional<Topic> topic = optionalFuture.get(15, TimeUnit.SECONDS);
+ assertTrue(topic.isPresent());
+ }
+
+ public static class MyTopicFactory implements TopicFactory {
+ @Override
+ public <T extends Topic> T create(String topic, ManagedLedger ledger,
BrokerService brokerService,
+ Class<T> topicClazz) {
+ try {
+ if (topicClazz == NonPersistentTopic.class) {
+ return (T) new NonPersistentTopic(topic, brokerService);
+ } else {
+ return (T) new MyPersistentTopic(topic, ledger,
brokerService);
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // No-op
+ }
+ }
+
+ public static class MyPersistentTopic extends PersistentTopic {
+
+ private static AtomicInteger checkReplicationInvocationCount = new
AtomicInteger(0);
+
+ public MyPersistentTopic(String topic, ManagedLedger ledger,
BrokerService brokerService) {
+ super(topic, ledger, brokerService);
+ SystemTopicBasedTopicPoliciesService topicPoliciesService =
+ (SystemTopicBasedTopicPoliciesService)
brokerService.getPulsar().getTopicPoliciesService();
+ if
(topicPoliciesService.getListeners().containsKey(TopicName.get(topic)) ) {
+
this.onUpdate(brokerService.getPulsar().getTopicPoliciesService().getTopicPoliciesIfExists(TopicName.get(topic)));
+ }
+ }
+
+ protected void updateTopicPolicyByNamespacePolicy(Policies
namespacePolicies) {
+ try {
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ super.updateTopicPolicyByNamespacePolicy(namespacePolicies);
+ }
+
+ public CompletableFuture<Void> checkReplication() {
+ if
(TopicName.get(topic).getLocalName().equalsIgnoreCase("testTopicInitializeDelay"))
{
+ checkReplicationInvocationCount.incrementAndGet();
+ log.info("checkReplication, count = {}",
checkReplicationInvocationCount.get());
+ List<String> configuredClusters =
topicPolicies.getReplicationClusters().get();
+ if (!(configuredClusters.size() == 1 &&
configuredClusters.contains(brokerService.pulsar().getConfiguration().getClusterName())))
{
+ try {
+ // this will cause the get topic timeout.
+ Thread.sleep(8 * 1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ throw new RuntimeException("checkReplication error");
+ }
+ }
+ return super.checkReplication();
+ }
+ }
+}