This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1555607fa29 [improve][broker]Improve error response of failed to
delete topic if it has replicators connected (#24975)
1555607fa29 is described below
commit 1555607fa29f9427f04b08343d8ceb31f1c75693
Author: fengyubiao <[email protected]>
AuthorDate: Mon Nov 17 09:42:22 2025 +0800
[improve][broker]Improve error response of failed to delete topic if it has
replicators connected (#24975)
---
.../broker/admin/impl/PersistentTopicsBase.java | 3 +--
.../apache/pulsar/broker/service/ServerCnx.java | 2 +-
.../broker/service/persistent/PersistentTopic.java | 19 +++++++++++--
.../pulsar/broker/TopicEventsListenerTest.java | 3 +--
.../broker/service/OneWayReplicatorTest.java | 31 ++++++++++++++++++++++
...OneWayReplicatorUsingGlobalPartitionedTest.java | 5 ++++
.../service/OneWayReplicatorUsingGlobalZKTest.java | 6 ++++-
.../java/org/apache/pulsar/schema/SchemaTest.java | 8 +++---
8 files changed, 65 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9318246afc1..5a365b13995 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -796,8 +796,7 @@ public class PersistentTopicsBase extends AdminResource {
Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof PreconditionFailedException) {
asyncResponse.resume(
- new RestException(Status.PRECONDITION_FAILED,
- "Topic has active
producers/subscriptions"));
+ new RestException(Status.PRECONDITION_FAILED,
realCause.getMessage()));
} else if (realCause instanceof WebApplicationException){
asyncResponse.resume(realCause);
} else if (realCause instanceof
MetadataStoreException.NotFoundException) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 55cd0fe7e3d..a539f437e7f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2838,7 +2838,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
.whenComplete(((txnID, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
- log.debug("Send response {} for new txn request {}",
tcId.getId(), requestId);
+ log.debug("Send response {} for new txn request {}",
txnID, requestId);
}
commandSender.sendNewTxnResponse(requestId, txnID,
tcId.getId());
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f320f8a6e7b..23133cccea0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1520,8 +1520,23 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
"Topic has " + producers.size() + " connected
producers"));
}
} else if (currentUsageCount() > 0) {
- return FutureUtil.failedFuture(new TopicBusyException(
- "Topic has " + currentUsageCount() + " connected
producers/consumers"));
+ StringBuilder errorMsg = new StringBuilder("Topic has");
+ errorMsg.append(" ").append(currentUsageCount())
+ .append(currentUsageCount() == 1 ? " client" : "
clients").append(" connected");
+ long consumerCount =
subscriptions.values().stream().map(sub -> sub.getConsumers().size())
+ .reduce(0, Integer::sum);
+ long replicatorCount = 0;
+ long producerCount = 0;
+ if (!producers.isEmpty()) {
+ replicatorCount =
producers.values().stream().filter(Producer::isRemote).count();
+ if (producers.size() > replicatorCount) {
+ producerCount = producers.size() - replicatorCount;
+ }
+ }
+ errorMsg.append(" Including").append("
").append(consumerCount).append(" consumers,")
+ .append(" ").append(producerCount).append("
producers,").append(" and")
+ .append(" ").append(replicatorCount).append("
replicators.");
+ return FutureUtil.failedFuture(new
TopicBusyException(errorMsg.toString()));
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
index fb45094c569..41da87dd165 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
@@ -201,8 +201,7 @@ public class TopicEventsListenerTest extends BrokerTestBase
{
if (forceDelete) {
throw e;
}
- assertTrue(e.getMessage().contains("Topic has active
producers/subscriptions")
- || e.getMessage().contains("connected
producers/consumers"));
+ assertTrue(e.getMessage().contains("Topic has"));
}
final String[] expectedEvents;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index d7b8d71a5a4..8a80f57f5b2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -139,6 +139,37 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
super.cleanup();
}
+ @Test(timeOut = 45 * 1000)
+ public void testDeleteTopicWhenReplicating() throws Exception {
+ final String topicName1 = BrokerTestUtil.newUniqueName("persistent://"
+ replicatedNamespace + "/tp_");
+ Producer<byte[]> producer1 =
client1.newProducer().topic(topicName1).create();
+ waitReplicatorStarted(topicName1);
+ try {
+ admin2.topics().delete(topicName1);
+ fail("Should fail to delete topic when replicating");
+ } catch (PulsarAdminException.PreconditionFailedException ex) {
+ assertTrue(ex.getMessage().contains("1 replicators"));
+ }
+
+ final String topicName2 = BrokerTestUtil.newUniqueName("persistent://"
+ replicatedNamespace + "/tp_");
+ admin1.topics().createPartitionedTopic(topicName2, 1);
+ Producer<byte[]> producer2 =
client1.newProducer().topic(topicName2).create();
+
waitReplicatorStarted(TopicName.get(topicName2).getPartition(0).toString());
+ try {
+ admin2.topics().deletePartitionedTopic(topicName2);
+ fail("Should fail to delete topic when replicating");
+ } catch (PulsarAdminException.PreconditionFailedException ex) {
+ assertTrue(ex.getMessage().contains("1 replicators"));
+ }
+
+ producer1.close();
+ producer2.close();
+ cleanupTopics(() -> {
+ admin1.topics().delete(topicName1);
+ admin2.topics().deletePartitionedTopic(topicName2);
+ });
+ }
+
@Test(timeOut = 45 * 1000)
public void testReplicatorProducerStatInTopic() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ replicatedNamespace + "/tp_");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
index d862636f0b2..3b719551b95 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
@@ -73,6 +73,11 @@ public class OneWayReplicatorUsingGlobalPartitionedTest
extends OneWayReplicator
config.setDefaultNumPartitions(1);
}
+ @Test(enabled = false)
+ public void testDeleteTopicWhenReplicating() throws Exception {
+ super.testDeleteTopicWhenReplicating();
+ }
+
@Override
@Test(enabled = false)
public void testReplicatorProducerStatInTopic() throws Exception {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index a0db3c8e438..60672845b5e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -83,8 +83,12 @@ public class OneWayReplicatorUsingGlobalZKTest extends
OneWayReplicatorTest {
config.setTransactionCoordinatorEnabled(true);
}
+ @Test(enabled = false)
+ public void testDeleteTopicWhenReplicating() throws Exception {
+ super.testDeleteTopicWhenReplicating();
+ }
- @Test(enabled = false)
+ @Test(enabled = false)
public void testReplicatorProducerStatInTopic() throws Exception {
super.testReplicatorProducerStatInTopic();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 2930256a75c..9bf1f5e4048 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -962,7 +962,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest
{
} catch (Exception e) {
assertThat(e.getMessage())
.isNotNull()
- .startsWith("Topic has 2 connected producers/consumers");
+ .startsWith("Topic has 2 clients");
}
assertEquals(this.getPulsar().getSchemaRegistryService()
.trimDeletedSchemaAndGetList(TopicName.get(topic1).getSchemaName()).get().size(),
2);
@@ -972,7 +972,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest
{
} catch (Exception e) {
assertThat(e.getMessage())
.isNotNull()
- .startsWith("Topic has active producers/subscriptions");
+ .startsWith("Topic has 1 client");
}
assertEquals(this.getPulsar().getSchemaRegistryService()
.trimDeletedSchemaAndGetList(TopicName.get(topic2).getSchemaName()).get().size(),
1);
@@ -1055,7 +1055,7 @@ public class SchemaTest extends
MockedPulsarServiceBaseTest {
admin.topics().delete(topicOne, false);
fail();
} catch (Exception e) {
- assertTrue(e.getMessage().startsWith("Topic has 2 connected
producers/consumers"));
+ assertTrue(e.getMessage().startsWith("Topic has 2 clients"));
}
assertEquals(this.getPulsar().getSchemaRegistryService()
.trimDeletedSchemaAndGetList(TopicName.get(topicOne).getSchemaName()).get().size(),
2);
@@ -1063,7 +1063,7 @@ public class SchemaTest extends
MockedPulsarServiceBaseTest {
admin.topics().deletePartitionedTopic(topicTwo, false);
fail();
} catch (Exception e) {
- assertTrue(e.getMessage().startsWith("Topic has active
producers/subscriptions"));
+ assertTrue(e.getMessage().startsWith("Topic has 1 client"));
}
assertEquals(this.getPulsar().getSchemaRegistryService()
.trimDeletedSchemaAndGetList(TopicName.get(topicTwo).getSchemaName()).get().size(),
1);