This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 17f23df1673 [fix][broker] fix delete_when_subscriptions_caught_up
doesn't work while have active consumers (#18283)
17f23df1673 is described below
commit 17f23df1673de8d2e77798e016345ed8a6c05f15
Author: Penghui Li <[email protected]>
AuthorDate: Thu Nov 3 17:02:27 2022 +0800
[fix][broker] fix delete_when_subscriptions_caught_up doesn't work while
have active consumers (#18283)
(cherry picked from commit 67d9d63882a7814077646ace80a387f58600f20f)
---
.../broker/service/persistent/PersistentTopic.java | 180 +++++++++++----------
.../broker/service/InactiveTopicDeleteTest.java | 2 +-
2 files changed, 95 insertions(+), 87 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 9491df7d446..df90547c723 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1106,6 +1106,9 @@ public class PersistentTopic extends AbstractTopic
* Flag indicating whether delete should succeed if topic still
has unconnected subscriptions. Set to
* false when called from admin API (it will delete the subs
too), and set to true when called from GC
* thread
+ * @param failIfHasBacklogs
+ * Flag indicating whether delete should succeed if topic has
backlogs. Set to false when called from
+ * admin API (it will delete the subs too), and set to true
when called from GC thread
* @param closeIfClientsConnected
* Flag indicate whether explicitly close connected
* producers/consumers/replicators before trying to delete
topic.
@@ -1127,103 +1130,108 @@ public class PersistentTopic extends AbstractTopic
if (isClosingOrDeleting) {
log.warn("[{}] Topic is already being closed or deleted",
topic);
return FutureUtil.failedFuture(new TopicFencedException("Topic
is already fenced"));
- } else if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
- return FutureUtil.failedFuture(
- new TopicBusyException("Topic has subscriptions: " +
subscriptions.keys()));
- } else if (failIfHasBacklogs && hasBacklogs()) {
- List<String> backlogSubs =
- subscriptions.values().stream()
- .filter(sub ->
sub.getNumberOfEntriesInBacklog(false) > 0)
-
.map(PersistentSubscription::getName).collect(Collectors.toList());
- return FutureUtil.failedFuture(
- new TopicBusyException("Topic has subscriptions did
not catch up: " + backlogSubs));
+ }
+ // We can proceed with the deletion if either:
+ // 1. No one is connected and no subscriptions
+ // 2. The topic have subscriptions but no backlogs for all
subscriptions
+ // if delete_when_no_subscriptions is applied
+ // 3. We want to kick out everyone and forcefully delete the
topic.
+ // In this case, we shouldn't care if the usageCount is 0 or
not, just proceed
+ if (!closeIfClientsConnected) {
+ if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
+ return FutureUtil.failedFuture(
+ new TopicBusyException("Topic has subscriptions: "
+ subscriptions.keys()));
+ } else if (failIfHasBacklogs) {
+ if (hasBacklogs()) {
+ List<String> backlogSubs =
+ subscriptions.values().stream()
+ .filter(sub ->
sub.getNumberOfEntriesInBacklog(false) > 0)
+
.map(PersistentSubscription::getName).collect(Collectors.toList());
+ return FutureUtil.failedFuture(
+ new TopicBusyException("Topic has
subscriptions did not catch up: " + backlogSubs));
+ } else if (!producers.isEmpty()) {
+ return FutureUtil.failedFuture(new TopicBusyException(
+ "Topic has " + producers.size() + " connected
producers"));
+ }
+ } else if (currentUsageCount() > 0) {
+ return FutureUtil.failedFuture(new TopicBusyException(
+ "Topic has " + currentUsageCount() + " connected
producers/consumers"));
+ }
}
fenceTopicToCloseOrDelete(); // Avoid clients reconnections while
deleting
CompletableFuture<Void> closeClientFuture = new
CompletableFuture<>();
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
if (closeIfClientsConnected) {
- List<CompletableFuture<Void>> futures = Lists.newArrayList();
replicators.forEach((cluster, replicator) ->
futures.add(replicator.disconnect()));
producers.values().forEach(producer ->
futures.add(producer.disconnect()));
- subscriptions.forEach((s, sub) ->
futures.add(sub.disconnect()));
- FutureUtil.waitForAll(futures).thenRun(() -> {
- closeClientFuture.complete(null);
- }).exceptionally(ex -> {
- log.error("[{}] Error closing clients", topic, ex);
- unfenceTopicToResume();
- closeClientFuture.completeExceptionally(ex);
- return null;
- });
- } else {
- closeClientFuture.complete(null);
}
+ FutureUtil.waitForAll(futures).thenRun(() -> {
+ closeClientFuture.complete(null);
+ }).exceptionally(ex -> {
+ log.error("[{}] Error closing clients", topic, ex);
+ unfenceTopicToResume();
+ closeClientFuture.completeExceptionally(ex);
+ return null;
+ });
closeClientFuture.thenAccept(delete -> {
- // We can proceed with the deletion if either:
- // 1. No one is connected
- // 2. We want to kick out everyone and forcefully delete the
topic.
- // In this case, we shouldn't care if the usageCount is 0
or not, just proceed
- if (currentUsageCount() == 0 || (closeIfClientsConnected &&
!failIfHasSubscriptions)) {
- CompletableFuture<Void> deleteTopicAuthenticationFuture =
new CompletableFuture<>();
- brokerService.deleteTopicAuthenticationWithRetry(topic,
deleteTopicAuthenticationFuture, 5);
-
- deleteTopicAuthenticationFuture.thenCompose(
- __ -> deleteSchema ? deleteSchema() :
CompletableFuture.completedFuture(null))
- .thenCompose(__ -> deleteTopicPolicies())
- .thenCompose(__ ->
transactionBufferCleanupAndClose())
- .whenComplete((v, ex) -> {
- if (ex != null) {
- log.error("[{}] Error deleting topic", topic, ex);
- unfenceTopicToResume();
- deleteFuture.completeExceptionally(ex);
- } else {
- List<CompletableFuture<Void>> subsDeleteFutures =
new ArrayList<>();
- subscriptions.forEach((sub, p) ->
subsDeleteFutures.add(unsubscribe(sub)));
-
-
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
- if (e != null) {
- log.error("[{}] Error deleting topic",
topic, e);
- unfenceTopicToResume();
- deleteFuture.completeExceptionally(e);
- } else {
- ledger.asyncDelete(new
AsyncCallbacks.DeleteLedgerCallback() {
- @Override
- public void
deleteLedgerComplete(Object ctx) {
-
brokerService.removeTopicFromCache(PersistentTopic.this);
-
-
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
-
-
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
-
- unregisterTopicPolicyListener();
-
- log.info("[{}] Topic deleted",
topic);
- deleteFuture.complete(null);
- }
-
- @Override
- public void
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
- if (exception.getCause()
- instanceof
MetadataStoreException.NotFoundException) {
- log.info("[{}] Topic is
already deleted {}",
- topic,
exception.getMessage());
- deleteLedgerComplete(ctx);
- } else {
- unfenceTopicToResume();
- log.error("[{}] Error deleting
topic", topic, exception);
-
deleteFuture.completeExceptionally(new PersistenceException(exception));
+ CompletableFuture<Void> deleteTopicAuthenticationFuture = new
CompletableFuture<>();
+ brokerService.deleteTopicAuthenticationWithRetry(topic,
deleteTopicAuthenticationFuture, 5);
+ deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema
? deleteSchema() :
+ CompletableFuture.completedFuture(null))
+ .thenCompose(__ -> deleteTopicPolicies())
+ .thenCompose(__ -> transactionBufferCleanupAndClose())
+ .whenComplete((v, ex) -> {
+ if (ex != null) {
+ log.error("[{}] Error deleting topic", topic,
ex);
+ unfenceTopicToResume();
+ deleteFuture.completeExceptionally(ex);
+ } else {
+ List<CompletableFuture<Void>>
subsDeleteFutures = new ArrayList<>();
+ subscriptions.forEach((sub, p) ->
subsDeleteFutures.add(unsubscribe(sub)));
+
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
+ if (e != null) {
+ log.error("[{}] Error deleting topic",
topic, e);
+ unfenceTopicToResume();
+ deleteFuture.completeExceptionally(e);
+ } else {
+ ledger.asyncDelete(new
AsyncCallbacks.DeleteLedgerCallback() {
+ @Override
+ public void
deleteLedgerComplete(Object ctx) {
+
brokerService.removeTopicFromCache(PersistentTopic.this);
+
+
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
+
+
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
+
+
unregisterTopicPolicyListener();
+
+ log.info("[{}] Topic deleted",
topic);
+ deleteFuture.complete(null);
}
- }
- }, null);
- }
- });
- }
- });
- } else {
- unfenceTopicToResume();
- deleteFuture.completeExceptionally(new TopicBusyException(
- "Topic has " + currentUsageCount() + " connected
producers/consumers"));
- }
+
+ @Override
+ public void
deleteLedgerFailed(ManagedLedgerException exception,
+
Object ctx) {
+ if (exception.getCause()
+ instanceof
MetadataStoreException.NotFoundException) {
+ log.info("[{}] Topic is
already deleted {}",
+ topic,
exception.getMessage());
+ deleteLedgerComplete(ctx);
+ } else {
+ unfenceTopicToResume();
+ log.error("[{}] Error
deleting topic", topic, exception);
+
deleteFuture.completeExceptionally(
+ new
PersistenceException(exception));
+ }
+ }
+ }, null);
+ }
+ });
+ }
+ });
}).exceptionally(ex->{
unfenceTopicToResume();
deleteFuture.completeExceptionally(
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index 1a2dd423b90..9fa73ea1403 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -332,7 +332,6 @@ public class InactiveTopicDeleteTest extends BrokerTestBase
{
producer.send("Pulsar".getBytes());
}
- consumer.close();
producer.close();
Thread.sleep(2000);
@@ -342,6 +341,7 @@ public class InactiveTopicDeleteTest extends BrokerTestBase
{
admin.topics().skipAllMessages(topic, "sub");
Awaitility.await()
.untilAsserted(() ->
Assert.assertFalse(admin.topics().getList("prop/ns-abc").contains(topic)));
+ consumer.close();
}
@Test