This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 08c7992ca84 [improve][broker] Improve naming for delete topic error
(#16965)
08c7992ca84 is described below
commit 08c7992ca846ad86ed2fd2d7270c1913d1998fbd
Author: Lei Zhiyuan <[email protected]>
AuthorDate: Sun Aug 14 18:17:31 2022 +0800
[improve][broker] Improve naming for delete topic error (#16965)
(cherry picked from commit d3dd143d109aba92a2144808b084fc34397c782e)
---
.../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 2 +-
.../broker/service/nonpersistent/NonPersistentTopic.java | 3 ++-
.../pulsar/broker/service/persistent/PersistentTopic.java | 10 ++++++++--
.../apache/pulsar/broker/service/BacklogQuotaManagerTest.java | 5 +++--
.../src/test/java/org/apache/pulsar/schema/SchemaTest.java | 2 +-
5 files changed, 15 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 73b70b21d8c..14d7569881a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1029,7 +1029,7 @@ public class PersistentTopicsBase extends AdminResource {
Throwable t = e.getCause();
log.error("[{}] Failed to delete topic {}", clientAppId(),
topicName, t);
if (t instanceof TopicBusyException) {
- throw new RestException(Status.PRECONDITION_FAILED, "Topic has
active producers/subscriptions");
+ throw new RestException(Status.PRECONDITION_FAILED,
t.getMessage());
} else if (isManagedLedgerNotFoundException(e)) {
throw new RestException(Status.NOT_FOUND, "Topic not found");
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 35b900c297c..f032e2415e1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -389,7 +389,8 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic {
if (failIfHasSubscriptions) {
if (!subscriptions.isEmpty()) {
isFenced = false;
- deleteFuture.completeExceptionally(new
TopicBusyException("Topic has subscriptions"));
+ deleteFuture.completeExceptionally(
+ new TopicBusyException("Topic has
subscriptions:" + subscriptions.keys()));
return;
}
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 95d133d4d16..05b1078d843 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1128,9 +1128,15 @@ public class PersistentTopic extends AbstractTopic
log.warn("[{}] Topic is already being closed or deleted",
topic);
return FutureUtil.failedFuture(new TopicFencedException("Topic
is already fenced"));
} else if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
- return FutureUtil.failedFuture(new TopicBusyException("Topic
has subscriptions"));
+ return FutureUtil.failedFuture(
+ new TopicBusyException("Topic has subscriptions: " +
subscriptions.keys()));
} else if (failIfHasBacklogs && hasBacklogs()) {
- return FutureUtil.failedFuture(new TopicBusyException("Topic
has subscriptions did not catch up"));
+ List<String> backlogSubs =
+ subscriptions.values().stream()
+ .filter(sub ->
sub.getNumberOfEntriesInBacklog(false) > 0)
+
.map(PersistentSubscription::getName).collect(Collectors.toList());
+ return FutureUtil.failedFuture(
+ new TopicBusyException("Topic has subscriptions did
not catch up: " + backlogSubs));
}
fenceTopicToCloseOrDelete(); // Avoid clients reconnections while
deleting
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 3d549cc712a..fbbfbfb40b2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Sets;
import java.net.URL;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@@ -549,7 +550,7 @@ public class BacklogQuotaManagerTest {
consumer2.receive();
}
- TopicStats stats = getTopicStats(topic1);
+ TopicStats stats = admin.topics().getStats(topic1, true);
assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(),
5);
assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(),
5);
@@ -566,7 +567,7 @@ public class BacklogQuotaManagerTest {
Thread.sleep(2000L);
rolloverStats();
- TopicStats stats2 = getTopicStats(topic1);
+ TopicStats stats2 = admin.topics().getStats(topic1, true);
// The first 5 messages should be expired due to limit time is 5
seconds, and the last 9 message should not.
Awaitility.await().untilAsserted(() -> {
assertEquals(stats2.getSubscriptions().get(subName1).getMsgBacklog(), 9);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 1a0921a8aea..e4c1b098ff4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -793,7 +793,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest
{
admin.topics().delete(topic1, false, false);
fail();
} catch (Exception e) {
- assertTrue(e.getMessage().startsWith("Topic has active
producers/subscriptions"));
+ assertTrue(e.getMessage().startsWith("Topic has 2 connected
producers/consumers"));
}
assertEquals(this.getPulsar().getSchemaRegistryService()
.trimDeletedSchemaAndGetList(TopicName.get(topic1).getSchemaName()).get().size(),
2);