This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fb863a4150fcbe7da10f5554b6aabb889b69b65d Author: fengyubiao <[email protected]> AuthorDate: Wed Apr 30 09:43:51 2025 +0800 [fix][broker] Orphan schema after disabled a cluster for a namespace (#24223) (cherry picked from commit 2d78cbddbcf921fed9649203a32d98839346ff60) --- .../broker/service/persistent/PersistentTopic.java | 54 +++++++++++++++++++++- ...OneWayReplicatorUsingGlobalPartitionedTest.java | 45 ++++++++++++++---- 2 files changed, 90 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 306f7a08fc1..5dd2909def8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -52,6 +52,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -1928,7 +1929,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal if (!success) { // if local cluster is removed from global namespace cluster-list : then delete topic forcefully // because pulsar doesn't serve global topic without local repl-cluster configured. - return deleteForcefully(); + return deleteForcefully().thenCompose(ignore -> { + return deleteSchemaAndPoliciesIfClusterRemoved(); + }); } int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get(); @@ -1971,6 +1974,55 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal }); } + CompletableFuture<Void> deleteSchemaAndPoliciesIfClusterRemoved() { + TopicName tName = TopicName.get(topic); + if (!tName.isPartitioned()) { + return CompletableFuture.completedFuture(null); + } + TopicName partitionedName = TopicName.get(tName.getPartitionedTopicName()); + return brokerService.getPulsar().getPulsarResources().getNamespaceResources() + .getPartitionedTopicResources() + .getPartitionedTopicMetadataAsync(partitionedName) + .thenApply(metadataOp -> { + if (metadataOp.isEmpty()) { + return null; + } + AtomicInteger checkedCounter = new AtomicInteger(metadataOp.get().partitions); + for (int i = 0; i < metadataOp.get().partitions; i++) { + brokerService.getPulsar().getPulsarResources().getTopicResources() + .persistentTopicExists(partitionedName.getPartition(i)).thenAccept(b -> { + if (!b) { + int leftPartitions = checkedCounter.decrementAndGet(); + log.info("[{}] partitions: {}, left: {}", tName, metadataOp.get().partitions, + leftPartitions); + if (leftPartitions == 0) { + brokerService.getPulsar().getSchemaStorage() + .delete(partitionedName.getSchemaName()) + .whenComplete((schemaVersion, ex) -> { + if (ex == null) { + log.info("Deleted schema[{}] after all partitions[{}] were removed" + + " because the current cluster has bee removed from" + + " topic/namespace policies", + partitionedName, metadataOp.get().partitions); + } else { + log.error("Failed to delete schema[{}] after all partitions[{}] were" + + " removed, when the current cluster has bee removed from" + + " topic/namespace policies", + partitionedName, metadataOp.get().partitions, ex); + } + + }); + // TODO regarding the topic level policies, it will be deleted at a seperate PR. + // Because there is an issue related to Global policies has not been solved so + // far. + } + } + }); + } + return null; + }); + } + private CompletableFuture<Boolean> checkAllowedCluster(String localCluster) { List<String> replicationClusters = topicPolicies.getReplicationClusters().get(); return brokerService.pulsar().getPulsarResources().getNamespaceResources() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java index a22067101c3..cc8be021a8f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java @@ -18,11 +18,13 @@ */ package org.apache.pulsar.broker.service; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import java.time.Duration; import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -33,6 +35,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.common.protocol.schema.StoredSchema; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.apache.pulsar.zookeeper.ZookeeperServerTest; import org.awaitility.Awaitility; @@ -174,17 +177,38 @@ public class OneWayReplicatorUsingGlobalPartitionedTest extends OneWayReplicator // Initialize. final String ns1 = defaultTenant + "/" + "ns_73b1a31afce34671a5ddc48fe5ad7fc8"; final String topic = "persistent://" + ns1 + "/___tp-5dd50794-7af8-4a34-8a0b-06188052c66a"; + final String topicP0 = TopicName.get(topic).getPartition(0).toString(); + final String topicP1 = TopicName.get(topic).getPartition(1).toString(); final String topicChangeEvents = "persistent://" + ns1 + "/__change_events-partition-0"; admin1.namespaces().createNamespace(ns1); admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2))); - admin1.topics().createNonPartitionedTopic(topic); + admin1.topics().createPartitionedTopic(topic, 2); + Awaitility.await().untilAsserted(() -> { + assertTrue(pulsar2.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .partitionedTopicExists(TopicName.get(topic))); + List<CompletableFuture<StoredSchema>> schemaList11 + = pulsar1.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get(); + assertEquals(schemaList11.size(), 0); + List<CompletableFuture<StoredSchema>> schemaList21 + = pulsar2.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get(); + assertEquals(schemaList21.size(), 0); + }); - // Wait for loading topic up. + // Wait for copying messages. Producer<String> p = client1.newProducer(Schema.STRING).topic(topic).create(); + p.send("msg-1"); + p.close(); Awaitility.await().untilAsserted(() -> { Map<String, CompletableFuture<Optional<Topic>>> tps = pulsar1.getBrokerService().getTopics(); - assertTrue(tps.containsKey(topic)); + assertTrue(tps.containsKey(topicP0)); + assertTrue(tps.containsKey(topicP1)); assertTrue(tps.containsKey(topicChangeEvents)); + List<CompletableFuture<StoredSchema>> schemaList12 + = pulsar1.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get(); + assertEquals(schemaList12.size(), 1); + List<CompletableFuture<StoredSchema>> schemaList22 + = pulsar2.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get(); + assertEquals(schemaList12.size(), 1); }); // The topics under the namespace of the cluster-1 will be deleted. @@ -192,18 +216,23 @@ public class OneWayReplicatorUsingGlobalPartitionedTest extends OneWayReplicator admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster2))); Awaitility.await().atMost(Duration.ofSeconds(60)).ignoreExceptions().untilAsserted(() -> { Map<String, CompletableFuture<Optional<Topic>>> tps = pulsar1.getBrokerService().getTopics(); - assertFalse(tps.containsKey(topic)); + assertFalse(tps.containsKey(topicP0)); + assertFalse(tps.containsKey(topicP1)); assertFalse(tps.containsKey(topicChangeEvents)); - assertFalse(pulsar1.getNamespaceService().checkTopicExists(TopicName.get(topic)) - .get(5, TimeUnit.SECONDS).isExists()); assertFalse(pulsar1.getNamespaceService() .checkTopicExists(TopicName.get(topicChangeEvents)) .get(5, TimeUnit.SECONDS).isExists()); + // Verify: schema will be removed in local cluster, and remote cluster will not. + List<CompletableFuture<StoredSchema>> schemaList13 + = pulsar1.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get(); + assertEquals(schemaList13.size(), 0); + List<CompletableFuture<StoredSchema>> schemaList23 + = pulsar2.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get(); + assertEquals(schemaList23.size(), 1); }); // cleanup. - p.close(); - admin2.topics().delete(topic); + admin2.topics().deletePartitionedTopic(topic); admin2.namespaces().deleteNamespace(ns1); } }
