This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 29190da1b433d6949c5f5a90f6c9500f2713e5db Author: fengyubiao <[email protected]> AuthorDate: Tue Jun 17 14:54:08 2025 +0800 [fix][broker] Once the cluster is configured incorrectly, the broker maintains the incorrect cluster configuration even if you removed it (#24419) (cherry picked from commit e157cac11c72fe28d40d5730b2b27f9923f76b9f) --- .../org/apache/pulsar/broker/service/BrokerService.java | 6 ++++++ .../broker/service/ReplicatorRemoveClusterTest.java | 16 +++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 1e4555c5d8b..0b056c8fb86 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -792,6 +792,12 @@ public class BrokerService implements Closeable { return CompletableFuture.completedFuture(null); } return client.closeAsync(); + }).thenCompose(__ -> { + PulsarAdmin pulsarAdmin = clusterAdmins.remove(clusterName); + if (pulsarAdmin != null) { + pulsarAdmin.close(); + } + return CompletableFuture.completedFuture(null); }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java index c61efdff26d..4a74966a3b6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java @@ -23,8 +23,10 @@ import java.lang.reflect.Method; import java.util.UUID; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.awaitility.Awaitility; import org.testng.Assert; @@ -88,6 +90,7 @@ public class ReplicatorRemoveClusterTest extends ReplicatorTestBase { .build(); final String topicName = "persistent://pulsar1/ns1/testRemoveClusterFromNamespace-" + UUID.randomUUID(); + admin1.topics().createPartitionedTopic(topicName, 1); Producer<byte[]> producer = client.newProducer() .topic(topicName) @@ -98,9 +101,12 @@ public class ReplicatorRemoveClusterTest extends ReplicatorTestBase { producer.close(); client.close(); - Replicator replicator = pulsar1.getBrokerService().getTopicReference(topicName) + Replicator replicator = pulsar1.getBrokerService() + .getTopicReference(TopicName.get(topicName).getPartition(0).toString()) .get().getReplicators().get("r3"); + PulsarAdmin replicatorAdmin = pulsar1.getBrokerService().getClusterAdmins().get("r3"); + Awaitility.await().untilAsserted(() -> Assert.assertTrue(replicator.isConnected())); admin1.clusters().deleteCluster("r3"); @@ -110,5 +116,13 @@ public class ReplicatorRemoveClusterTest extends ReplicatorTestBase { Awaitility.await().untilAsserted(() -> Assert.assertNull( pulsar1.getBrokerService().getReplicationClients().get("r3"))); + Awaitility.await().untilAsserted(() -> Assert.assertNull( + pulsar1.getBrokerService().getClusterAdmins().get("r3"))); + try { + replicatorAdmin.clusters().getClusters(); + Assert.fail("Should get an error that pulsarAdmin has been closed"); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("has been closed")); + } } }
