This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5a237a3347972a641d5c63ec8f78b168586f99e6 Author: fengyubiao <[email protected]> AuthorDate: Mon Nov 17 09:42:22 2025 +0800 [improve][broker]Improve error response of failed to delete topic if it has replicators connected (#24975) (cherry picked from commit 1555607fa29f9427f04b08343d8ceb31f1c75693) --- .../broker/admin/impl/PersistentTopicsBase.java | 3 +-- .../apache/pulsar/broker/service/ServerCnx.java | 2 +- .../broker/service/persistent/PersistentTopic.java | 19 +++++++++++-- .../pulsar/broker/TopicEventsListenerTest.java | 3 +-- .../broker/service/OneWayReplicatorTest.java | 31 ++++++++++++++++++++++ ...OneWayReplicatorUsingGlobalPartitionedTest.java | 5 ++++ .../service/OneWayReplicatorUsingGlobalZKTest.java | 6 ++++- .../java/org/apache/pulsar/schema/SchemaTest.java | 8 +++--- 8 files changed, 65 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 27d605f81d1..c7602f814b6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -796,8 +796,7 @@ public class PersistentTopicsBase extends AdminResource { Throwable realCause = FutureUtil.unwrapCompletionException(ex); if (realCause instanceof PreconditionFailedException) { asyncResponse.resume( - new RestException(Status.PRECONDITION_FAILED, - "Topic has active producers/subscriptions")); + new RestException(Status.PRECONDITION_FAILED, realCause.getMessage())); } else if (realCause instanceof WebApplicationException){ asyncResponse.resume(realCause); } else if (realCause instanceof MetadataStoreException.NotFoundException) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index aa6be2eaab0..5e290d08c2f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2831,7 +2831,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { .whenComplete(((txnID, ex) -> { if (ex == null) { if (log.isDebugEnabled()) { - log.debug("Send response {} for new txn request {}", tcId.getId(), requestId); + log.debug("Send response {} for new txn request {}", txnID, requestId); } commandSender.sendNewTxnResponse(requestId, txnID, tcId.getId()); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index bd503f896bf..7a8c047b244 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1511,8 +1511,23 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal "Topic has " + producers.size() + " connected producers")); } } else if (currentUsageCount() > 0) { - return FutureUtil.failedFuture(new TopicBusyException( - "Topic has " + currentUsageCount() + " connected producers/consumers")); + StringBuilder errorMsg = new StringBuilder("Topic has"); + errorMsg.append(" ").append(currentUsageCount()) + .append(currentUsageCount() == 1 ? " client" : " clients").append(" connected"); + long consumerCount = subscriptions.values().stream().map(sub -> sub.getConsumers().size()) + .reduce(0, Integer::sum); + long replicatorCount = 0; + long producerCount = 0; + if (!producers.isEmpty()) { + replicatorCount = producers.values().stream().filter(Producer::isRemote).count(); + if (producers.size() > replicatorCount) { + producerCount = producers.size() - replicatorCount; + } + } + errorMsg.append(" Including").append(" ").append(consumerCount).append(" consumers,") + .append(" ").append(producerCount).append(" producers,").append(" and") + .append(" ").append(replicatorCount).append(" replicators."); + return FutureUtil.failedFuture(new TopicBusyException(errorMsg.toString())); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java index fb45094c569..41da87dd165 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java @@ -201,8 +201,7 @@ public class TopicEventsListenerTest extends BrokerTestBase { if (forceDelete) { throw e; } - assertTrue(e.getMessage().contains("Topic has active producers/subscriptions") - || e.getMessage().contains("connected producers/consumers")); + assertTrue(e.getMessage().contains("Topic has")); } final String[] expectedEvents; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index d7b8d71a5a4..8a80f57f5b2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -139,6 +139,37 @@ public class OneWayReplicatorTest extends OneWayReplicatorTestBase { super.cleanup(); } + @Test(timeOut = 45 * 1000) + public void testDeleteTopicWhenReplicating() throws Exception { + final String topicName1 = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); + Producer<byte[]> producer1 = client1.newProducer().topic(topicName1).create(); + waitReplicatorStarted(topicName1); + try { + admin2.topics().delete(topicName1); + fail("Should fail to delete topic when replicating"); + } catch (PulsarAdminException.PreconditionFailedException ex) { + assertTrue(ex.getMessage().contains("1 replicators")); + } + + final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); + admin1.topics().createPartitionedTopic(topicName2, 1); + Producer<byte[]> producer2 = client1.newProducer().topic(topicName2).create(); + waitReplicatorStarted(TopicName.get(topicName2).getPartition(0).toString()); + try { + admin2.topics().deletePartitionedTopic(topicName2); + fail("Should fail to delete topic when replicating"); + } catch (PulsarAdminException.PreconditionFailedException ex) { + assertTrue(ex.getMessage().contains("1 replicators")); + } + + producer1.close(); + producer2.close(); + cleanupTopics(() -> { + admin1.topics().delete(topicName1); + admin2.topics().deletePartitionedTopic(topicName2); + }); + } + @Test(timeOut = 45 * 1000) public void testReplicatorProducerStatInTopic() throws Exception { final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java index d862636f0b2..3b719551b95 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java @@ -73,6 +73,11 @@ public class OneWayReplicatorUsingGlobalPartitionedTest extends OneWayReplicator config.setDefaultNumPartitions(1); } + @Test(enabled = false) + public void testDeleteTopicWhenReplicating() throws Exception { + super.testDeleteTopicWhenReplicating(); + } + @Override @Test(enabled = false) public void testReplicatorProducerStatInTopic() throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index a0db3c8e438..60672845b5e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -83,8 +83,12 @@ public class OneWayReplicatorUsingGlobalZKTest extends OneWayReplicatorTest { config.setTransactionCoordinatorEnabled(true); } + @Test(enabled = false) + public void testDeleteTopicWhenReplicating() throws Exception { + super.testDeleteTopicWhenReplicating(); + } - @Test(enabled = false) + @Test(enabled = false) public void testReplicatorProducerStatInTopic() throws Exception { super.testReplicatorProducerStatInTopic(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index 2930256a75c..9bf1f5e4048 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -962,7 +962,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { } catch (Exception e) { assertThat(e.getMessage()) .isNotNull() - .startsWith("Topic has 2 connected producers/consumers"); + .startsWith("Topic has 2 clients"); } assertEquals(this.getPulsar().getSchemaRegistryService() .trimDeletedSchemaAndGetList(TopicName.get(topic1).getSchemaName()).get().size(), 2); @@ -972,7 +972,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { } catch (Exception e) { assertThat(e.getMessage()) .isNotNull() - .startsWith("Topic has active producers/subscriptions"); + .startsWith("Topic has 1 client"); } assertEquals(this.getPulsar().getSchemaRegistryService() .trimDeletedSchemaAndGetList(TopicName.get(topic2).getSchemaName()).get().size(), 1); @@ -1055,7 +1055,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { admin.topics().delete(topicOne, false); fail(); } catch (Exception e) { - assertTrue(e.getMessage().startsWith("Topic has 2 connected producers/consumers")); + assertTrue(e.getMessage().startsWith("Topic has 2 clients")); } assertEquals(this.getPulsar().getSchemaRegistryService() .trimDeletedSchemaAndGetList(TopicName.get(topicOne).getSchemaName()).get().size(), 2); @@ -1063,7 +1063,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { admin.topics().deletePartitionedTopic(topicTwo, false); fail(); } catch (Exception e) { - assertTrue(e.getMessage().startsWith("Topic has active producers/subscriptions")); + assertTrue(e.getMessage().startsWith("Topic has 1 client")); } assertEquals(this.getPulsar().getSchemaRegistryService() .trimDeletedSchemaAndGetList(TopicName.get(topicTwo).getSchemaName()).get().size(), 1);
