This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new a3c5feb78bd [fix][broker] The feature
brokerDeleteInactivePartitionedTopicMetadataEnabled leaves orphan topic
policies and topic schemas (#24150)
a3c5feb78bd is described below
commit a3c5feb78bd2330d00933def684f5e50bf6ff221
Author: fengyubiao <[email protected]>
AuthorDate: Wed Apr 9 14:29:31 2025 +0800
[fix][broker] The feature
brokerDeleteInactivePartitionedTopicMetadataEnabled leaves orphan topic
policies and topic schemas (#24150)
---
conf/broker.conf | 5 +-
.../apache/pulsar/broker/ServiceConfiguration.java | 8 +-
.../broker/service/persistent/PersistentTopic.java | 10 +-
.../apache/pulsar/broker/admin/AdminApi2Test.java | 14 ++
.../broker/service/OneWayReplicatorTestBase.java | 17 ++
.../broker/service/ReplicationTopicGcTest.java | 192 +++++++++++++++++++++
.../ReplicationTopicGcUsingGlobalZKTest.java | 63 +++++++
7 files changed, 304 insertions(+), 5 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 02fde814906..cfa7013cfc6 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -218,8 +218,11 @@ brokerDeleteInactiveTopicsFrequencySeconds=60
brokerDeleteInactiveTopicsMode=delete_when_no_subscriptions
# Metadata of inactive partitioned topic will not be cleaned up automatically
by default.
-# Note: If `allowAutoTopicCreation` and this option are enabled at the same
time,
+# Note 1: If `allowAutoTopicCreation` and this option are enabled at the same
time,
# it may appear that a partitioned topic has just been deleted but is
automatically created as a non-partitioned topic.
+# Note 2: Activating bidirectional geo-replication under global configuration
ZooKeeper may lead to schema remnants and
+# abnormal topic-level policies.
+# Note 3: Activating bidirectional geo-replication under global configuration
ZooKeeper may lead to a consumption issue.
brokerDeleteInactivePartitionedTopicMetadataEnabled=false
# Max duration of topic inactivity in seconds, default is not present
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2b1b799fa20..0c2ac4635f1 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -694,9 +694,13 @@ public class ServiceConfiguration implements
PulsarConfiguration {
category = CATEGORY_POLICIES,
dynamic = true,
doc = "Metadata of inactive partitioned topic will not be
automatically cleaned up by default.\n"
- + "Note: If `allowAutoTopicCreation` and this option are enabled
at the same time,\n"
+ + "Note 1: If `allowAutoTopicCreation` and this option are enabled
at the same time,\n"
+ "it may appear that a partitioned topic has just been deleted
but is automatically created as a "
- + "non-partitioned topic."
+ + "non-partitioned topic.\n"
+ + "Note 2: Activating bidirectional geo-replication under global
ZooKeeper configuration may lead to schema"
+ + " remnants and abnormal topic-level policies.\n"
+ + "Note 3: Activating bidirectional geo-replication under global
configuration ZooKeeper may lead"
+ + " to a consumption issue."
)
private boolean brokerDeleteInactivePartitionedTopicMetadataEnabled =
false;
@FieldContext(
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 811388a692b..6738cfdce3c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3246,8 +3246,14 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
String.format("Another partition exists for [%s].",
topicName));
} else {
- return
partitionedTopicResources
-
.deletePartitionedTopicAsync(topicName);
+ try {
+ return
brokerService.getPulsar().getAdminClient().topics()
+
.deletePartitionedTopicAsync(topicName.toString());
+ } catch
(PulsarServerException e) {
+ log.info("[{}] Delete
topic metadata failed due to failed to"
+ + " get
internal admin client.", topicName, e);
+ return
CompletableFuture.failedFuture(e);
+ }
}
});
}))
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 3f5ee721a7e..87c48e961db 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -3744,4 +3744,18 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
Assert.assertTrue(permissions11.isEmpty());
Assert.assertTrue(permissions22.isEmpty());
}
+
+ @Test
+ public void testDeletePatchyPartitionedTopic() throws Exception {
+ final String topic = BrokerTestUtil.newUniqueName(defaultNamespace +
"/tp");
+ admin.topics().createPartitionedTopic(topic, 2);
+ Producer producer =
pulsarClient.newProducer().topic(TopicName.get(topic).getPartition(0).toString())
+ .create();
+ // Mock a scenario that "-partition-1" has been removed due to topic
GC.
+
pulsar.getBrokerService().getTopic(TopicName.get(topic).getPartition(1).toString(),
false)
+ .get().get().delete().join();
+ // Verify: delete partitioned topic.
+ producer.close();
+ admin.topics().deletePartitionedTopicAsync(topic, false).get();
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index 200c8dd3b3d..fdb01fc867a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -480,4 +480,21 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
admin2.topics().delete(topicName.toString());
}
}
+
+ protected void waitReplicatorStopped(PulsarService sourceCluster,
PulsarService targetCluster, String topicName) {
+ Awaitility.await().atMost(Duration.ofSeconds(60)).untilAsserted(() -> {
+ Optional<Topic> topicOptional2 =
targetCluster.getBrokerService().getTopic(topicName, false).get();
+ assertTrue(topicOptional2.isPresent());
+ PersistentTopic persistentTopic2 = (PersistentTopic)
topicOptional2.get();
+ for (org.apache.pulsar.broker.service.Producer producer :
persistentTopic2.getProducers().values()) {
+ assertFalse(producer.getProducerName()
+
.startsWith(targetCluster.getConfiguration().getReplicatorPrefix()));
+ }
+ Optional<Topic> topicOptional1 =
sourceCluster.getBrokerService().getTopic(topicName, false).get();
+ assertTrue(topicOptional1.isPresent());
+ PersistentTopic persistentTopic1 = (PersistentTopic)
topicOptional1.get();
+ assertTrue(persistentTopic1.getReplicators().isEmpty()
+ ||
!persistentTopic1.getReplicators().get(targetCluster.getConfig().getClusterName()).isConnected());
+ });
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcTest.java
new file mode 100644
index 00000000000..267e8547914
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.PublishRate;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZookeeperServerTest;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class ReplicationTopicGcTest extends OneWayReplicatorTestBase {
+
+ @Override
+ @BeforeClass(alwaysRun = true, timeOut = 300000)
+ public void setup() throws Exception {
+ super.setup();
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true, timeOut = 300000)
+ public void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ @DataProvider(name = "topicTypes")
+ public Object[][] topicTypes() {
+ return new Object[][]{
+ {TopicType.NON_PARTITIONED},
+ {TopicType.PARTITIONED}
+ };
+ }
+
+ protected void setConfigDefaults(ServiceConfiguration config, String
clusterName,
+ LocalBookkeeperEnsemble
bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
+ super.setConfigDefaults(config, clusterName, bookkeeperEnsemble,
brokerConfigZk);
+
config.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+ config.setBrokerDeleteInactiveTopicsEnabled(true);
+ config.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(true);
+ config.setBrokerDeleteInactiveTopicsFrequencySeconds(5);
+ config.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(5);
+ config.setReplicationPolicyCheckDurationSeconds(1);
+ }
+
+ @Test(dataProvider = "topicTypes")
+ public void testTopicGC(TopicType topicType) throws Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ replicatedNamespace + "/tp_");
+ final String schemaId = TopicName.get(topicName).getSchemaName();
+ final String subTopic = topicType == TopicType.NON_PARTITIONED ?
topicName
+ : TopicName.get(topicName).getPartition(0).toString();
+ if (topicType == TopicType.NON_PARTITIONED) {
+ admin1.topics().createNonPartitionedTopic(topicName);
+ } else {
+ admin1.topics().createPartitionedTopic(topicName, 1);
+ }
+
+ Producer<String> producer1 =
client1.newProducer(Schema.STRING).topic(topicName).create();
+ // Wait for replicator started.
+ waitReplicatorStarted(subTopic);
+
+ // Trigger a topic level policies.
+ PublishRate publishRate = new PublishRate(1000, 1024 * 1024);
+ admin1.topicPolicies().setPublishRate(topicName, publishRate);
+ admin2.topicPolicies().setPublishRate(topicName, publishRate);
+ // Write a schema.
+ // Since there is a producer registered one the source cluster,
skipped to write a schema.
+ admin2.schemas().createSchema(topicName,
Schema.STRING.getSchemaInfo());
+
+ // Trigger GC through close all clients.
+ producer1.close();
+ // Verify: all resources were deleted.
+ Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
+ // sub topic.
+ assertFalse(pulsar1.getPulsarResources().getTopicResources()
+ .persistentTopicExists(TopicName.get(subTopic)).get());
+ assertFalse(pulsar2.getPulsarResources().getTopicResources()
+ .persistentTopicExists(TopicName.get(subTopic)).get());
+ // partitioned topic.
+ assertFalse(pulsar1.getPulsarResources().getNamespaceResources()
+
.getPartitionedTopicResources().partitionedTopicExists(TopicName.get(topicName)));
+ assertFalse(pulsar2.getPulsarResources().getNamespaceResources()
+
.getPartitionedTopicResources().partitionedTopicExists(TopicName.get(topicName)));
+ // topic policies.
+
assertTrue(pulsar1.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(topicName),
+ TopicPoliciesService.GetType.DEFAULT).get().isEmpty());
+
assertTrue(pulsar2.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(topicName),
+ TopicPoliciesService.GetType.DEFAULT).get().isEmpty());
+ // schema.
+
assertTrue(CollectionUtils.isEmpty(pulsar1.getSchemaStorage().getAll(schemaId).get()));
+
assertTrue(CollectionUtils.isEmpty(pulsar2.getSchemaStorage().getAll(schemaId).get()));
+ });
+ }
+
+ @Test(dataProvider = "topicTypes")
+ public void testRemoteClusterStillConsumeAfterCurrentClusterGc(TopicType
topicType) throws Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ replicatedNamespace + "/tp_");
+ final String subscription = "s1";
+ final String schemaId = TopicName.get(topicName).getSchemaName();
+ final String subTopic = topicType == TopicType.NON_PARTITIONED ?
topicName
+ : TopicName.get(topicName).getPartition(0).toString();
+ if (topicType == TopicType.NON_PARTITIONED) {
+ admin1.topics().createNonPartitionedTopic(topicName);
+ } else {
+ admin1.topics().createPartitionedTopic(topicName, 1);
+ }
+
+ // Wait for replicator started.
+ Producer<String> producer1 =
client1.newProducer(Schema.STRING).topic(topicName).create();
+ waitReplicatorStarted(subTopic);
+ admin2.topics().createSubscription(topicName, subscription,
MessageId.earliest);
+
+ if (usingGlobalZK) {
+ admin2.topics().setReplicationClusters(topicName,
Collections.singletonList(cluster2));
+ waitReplicatorStopped(pulsar2, pulsar1, subTopic);
+ }
+
+ // Send a message
+ producer1.send("msg-1");
+ Awaitility.await().untilAsserted(() -> {
+
assertEquals(admin2.topics().getStats(subTopic).getSubscriptions().get(subscription).getMsgBacklog(),
1);
+ });
+
+ // Trigger GC through close all clients.
+ producer1.close();
+ // Verify: the topic was removed on the source cluster.
+ Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
+ // sub topic.
+ assertFalse(pulsar1.getPulsarResources().getTopicResources()
+ .persistentTopicExists(TopicName.get(subTopic)).get());
+ // partitioned topic.
+ assertFalse(pulsar1.getPulsarResources().getNamespaceResources()
+
.getPartitionedTopicResources().partitionedTopicExists(TopicName.get(topicName)));
+ // topic policies.
+
assertTrue(pulsar1.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(topicName),
+ TopicPoliciesService.GetType.DEFAULT).get().isEmpty());
+ // schema.
+
assertTrue(CollectionUtils.isEmpty(pulsar1.getSchemaStorage().getAll(schemaId).get()));
+ });
+
+ // Verify: consumer still can consume messages from the remote cluster.
+ org.apache.pulsar.client.api.Consumer<String> consumer =
+
client2.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subscription).subscribe();
+ Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ assertEquals(msg.getValue(), "msg-1");
+
+ // Cleanup.
+ consumer.close();
+ if (topicType == TopicType.NON_PARTITIONED) {
+ admin2.topics().delete(topicName);
+ } else {
+ admin2.topics().deletePartitionedTopic(topicName);
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcUsingGlobalZKTest.java
new file mode 100644
index 00000000000..ce6eddc5698
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcUsingGlobalZKTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class ReplicationTopicGcUsingGlobalZKTest extends
ReplicationTopicGcTest {
+
+ @Override
+ @BeforeClass(alwaysRun = true, timeOut = 300000)
+ public void setup() throws Exception {
+ super.usingGlobalZK = true;
+ super.setup();
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true, timeOut = 300000)
+ public void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ @Test(dataProvider = "topicTypes")
+ public void testTopicGC(TopicType topicType) throws Exception {
+ if (topicType.equals(TopicType.PARTITIONED)) {
+ // Pulsar does not support the feature
"brokerDeleteInactivePartitionedTopicMetadataEnabled" when enabling
+ // Geo-Replication with Global ZK.
+ return;
+ }
+ super.testTopicGC(topicType);
+ }
+
+ @Test(dataProvider = "topicTypes")
+ public void testRemoteClusterStillConsumeAfterCurrentClusterGc(TopicType
topicType) throws Exception {
+ if (topicType.equals(TopicType.PARTITIONED)) {
+ // Pulsar does not support the feature
"brokerDeleteInactivePartitionedTopicMetadataEnabled" when enabling
+ // Geo-Replication with Global ZK.
+ return;
+ }
+ super.testRemoteClusterStillConsumeAfterCurrentClusterGc(topicType);
+ }
+}