This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new f10df17f27e [fix] [broker] topics infinitely failed to delete after remove cluster from replicated clusters modifying when using partitioned system topic (#24097) f10df17f27e is described below commit f10df17f27e3cf44beefe239cfd166ded61d4d8c Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Sat Mar 29 00:40:26 2025 +0800 [fix] [broker] topics infinitely failed to delete after remove cluster from replicated clusters modifying when using partitioned system topic (#24097) (cherry picked from commit abd51212839cec931a22b5ee293802a0b78175ec) --- .../SystemTopicBasedTopicPoliciesService.java | 21 ++- ...OneWayReplicatorUsingGlobalPartitionedTest.java | 207 +++++++++++++++++++++ 2 files changed, 226 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index f2206f5cada..7bf6119de34 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -137,10 +137,27 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic return CompletableFuture.completedFuture(null); } TopicName changeEvents = NamespaceEventsSystemTopicFactory.getEventsTopicName(topicName.getNamespaceObject()); - return pulsarService.getNamespaceService().checkTopicExists(changeEvents).thenCompose(topicExistsInfo -> { + CompletableFuture<Boolean> changeEventTopicExists = pulsarService.getPulsarResources().getTopicResources() + .persistentTopicExists(changeEvents).thenCompose(nonPartitionedExists -> { + if (!nonPartitionedExists) { + // To check whether partitioned __change_events exists. + // Instead of checking partitioned metadata, we check the first partition, because there is a case + // does not work if we choose checking partitioned metadata. + // The case's details: + // 1. Start 2 clusters: c1 and c2. + // 2. Enable replication between c1 and c2 with a global ZK. + // 3. The partitioned metadata was shared using by c1 and c2. + // 4. Pulsar only delete partitions when the topic is deleting from c1, because c2 is still using + // partitioned metadata. + return pulsarService.getPulsarResources().getTopicResources() + .persistentTopicExists(changeEvents.getPartition(0)); + } + return CompletableFuture.completedFuture(true); + }); + return changeEventTopicExists.thenCompose(exists -> { // If the system topic named "__change_events" has been deleted, it means all the data in the topic have // been deleted, so we do not need to delete the message that we want to delete again. - if (!topicExistsInfo.isExists()) { + if (!exists) { log.info("Skip delete topic-level policies because {} has been removed before", changeEvents); return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java new file mode 100644 index 00000000000..2a2a1befd16 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import java.time.Duration; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.apache.pulsar.zookeeper.ZookeeperServerTest; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class OneWayReplicatorUsingGlobalPartitionedTest extends OneWayReplicatorTest { + + @Override + @BeforeClass(alwaysRun = true, timeOut = 300000) + public void setup() throws Exception { + super.usingGlobalZK = true; + super.setup(); + } + + @Override + @AfterClass(alwaysRun = true, timeOut = 300000) + public void cleanup() throws Exception { + super.cleanup(); + } + + @Override + protected void setConfigDefaults(ServiceConfiguration config, String clusterName, + LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) { + super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, brokerConfigZk); + config.setAllowAutoTopicCreationType(TopicType.PARTITIONED); + config.setDefaultNumPartitions(1); + } + + @Override + @Test(enabled = false) + public void testReplicatorProducerStatInTopic() throws Exception { + super.testReplicatorProducerStatInTopic(); + } + + @Override + @Test(enabled = false) + public void testCreateRemoteConsumerFirst() throws Exception { + super.testReplicatorProducerStatInTopic(); + } + + @Override + @Test(enabled = false) + public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception { + super.testReplicatorProducerStatInTopic(); + } + + @Override + @Test(enabled = false) + public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception { + super.testConcurrencyOfUnloadBundleAndRecreateProducer(); + } + + @Override + @Test(enabled = false) + public void testPartitionedTopicLevelReplication() throws Exception { + super.testPartitionedTopicLevelReplication(); + } + + @Override + @Test(enabled = false) + public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws Exception { + super.testPartitionedTopicLevelReplicationRemoteTopicExist(); + } + + @Override + @Test(enabled = false) + public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() throws Exception { + super.testPartitionedTopicLevelReplicationRemoteConflictTopicExist(); + } + + @Override + @Test(enabled = false) + public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception { + super.testConcurrencyOfUnloadBundleAndRecreateProducer2(); + } + + @Override + @Test(enabled = false) + public void testUnFenceTopicToReuse() throws Exception { + super.testUnFenceTopicToReuse(); + } + + @Override + @Test(enabled = false) + public void testDeleteNonPartitionedTopic() throws Exception { + super.testDeleteNonPartitionedTopic(); + } + + @Override + @Test(enabled = false) + public void testDeletePartitionedTopic() throws Exception { + super.testDeletePartitionedTopic(); + } + + @Override + @Test(enabled = false) + public void testNoExpandTopicPartitionsWhenDisableTopicLevelReplication() throws Exception { + super.testNoExpandTopicPartitionsWhenDisableTopicLevelReplication(); + } + + @Override + @Test(enabled = false) + public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Exception { + super.testExpandTopicPartitionsOnNamespaceLevelReplication(); + } + + @Override + @Test(enabled = false) + public void testReloadWithTopicLevelGeoReplication(ReplicationLevel replicationLevel) throws Exception { + super.testReloadWithTopicLevelGeoReplication(replicationLevel); + } + + @Test(enabled = false) + @Override + public void testConfigReplicationStartAt() throws Exception { + super.testConfigReplicationStartAt(); + } + + @Test(enabled = false) + @Override + public void testDifferentTopicCreationRule(ReplicationMode replicationMode) throws Exception { + super.testDifferentTopicCreationRule(replicationMode); + } + + @Test(enabled = false) + @Override + public void testReplicationCountMetrics() throws Exception { + super.testReplicationCountMetrics(); + } + + @Test(timeOut = 60_000) + public void testRemoveCluster() throws Exception { + // Initialize. + final String ns1 = defaultTenant + "/" + "ns_73b1a31afce34671a5ddc48fe5ad7fc8"; + final String topic = "persistent://" + ns1 + "/___tp-5dd50794-7af8-4a34-8a0b-06188052c66a"; + final String topicChangeEvents = "persistent://" + ns1 + "/__change_events-partition-0"; + admin1.namespaces().createNamespace(ns1); + admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2))); + admin1.topics().createNonPartitionedTopic(topic); + + // Wait for loading topic up. + Producer<String> p = client1.newProducer(Schema.STRING).topic(topic).create(); + Awaitility.await().untilAsserted(() -> { + Map<String, CompletableFuture<Optional<Topic>>> tps = pulsar1.getBrokerService().getTopics(); + assertTrue(tps.containsKey(topic)); + assertTrue(tps.containsKey(topicChangeEvents)); + }); + + // The topics under the namespace of the cluster-1 will be deleted. + // Verify the result. + admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster2))); + Awaitility.await().atMost(Duration.ofSeconds(60)).ignoreExceptions().untilAsserted(() -> { + Map<String, CompletableFuture<Optional<Topic>>> tps = pulsar1.getBrokerService().getTopics(); + assertFalse(tps.containsKey(topic)); + assertFalse(tps.containsKey(topicChangeEvents)); + assertFalse(pulsar1.getNamespaceService().checkTopicExists(TopicName.get(topic)) + .get(5, TimeUnit.SECONDS).isExists()); + assertFalse(pulsar1.getNamespaceService() + .checkTopicExists(TopicName.get(topicChangeEvents)) + .get(5, TimeUnit.SECONDS).isExists()); + }); + + // cleanup. + p.close(); + admin2.topics().delete(topic); + admin2.namespaces().deleteNamespace(ns1); + } +}