This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 17f23df1673 [fix][broker] fix delete_when_subscriptions_caught_up 
doesn't work while have active consumers (#18283)
17f23df1673 is described below

commit 17f23df1673de8d2e77798e016345ed8a6c05f15
Author: Penghui Li <[email protected]>
AuthorDate: Thu Nov 3 17:02:27 2022 +0800

    [fix][broker] fix delete_when_subscriptions_caught_up doesn't work while 
have active consumers (#18283)
    
    (cherry picked from commit 67d9d63882a7814077646ace80a387f58600f20f)
---
 .../broker/service/persistent/PersistentTopic.java | 180 +++++++++++----------
 .../broker/service/InactiveTopicDeleteTest.java    |   2 +-
 2 files changed, 95 insertions(+), 87 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 9491df7d446..df90547c723 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1106,6 +1106,9 @@ public class PersistentTopic extends AbstractTopic
      *            Flag indicating whether delete should succeed if topic still 
has unconnected subscriptions. Set to
      *            false when called from admin API (it will delete the subs 
too), and set to true when called from GC
      *            thread
+     * @param failIfHasBacklogs
+     *            Flag indicating whether delete should succeed if topic has 
backlogs. Set to false when called from
+     *            admin API (it will delete the subs too), and set to true 
when called from GC thread
      * @param closeIfClientsConnected
      *            Flag indicate whether explicitly close connected
      *            producers/consumers/replicators before trying to delete 
topic.
@@ -1127,103 +1130,108 @@ public class PersistentTopic extends AbstractTopic
             if (isClosingOrDeleting) {
                 log.warn("[{}] Topic is already being closed or deleted", 
topic);
                 return FutureUtil.failedFuture(new TopicFencedException("Topic 
is already fenced"));
-            } else if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
-                return FutureUtil.failedFuture(
-                        new TopicBusyException("Topic has subscriptions: " + 
subscriptions.keys()));
-            } else if (failIfHasBacklogs && hasBacklogs()) {
-                List<String> backlogSubs =
-                        subscriptions.values().stream()
-                                .filter(sub -> 
sub.getNumberOfEntriesInBacklog(false) > 0)
-                                
.map(PersistentSubscription::getName).collect(Collectors.toList());
-                return FutureUtil.failedFuture(
-                        new TopicBusyException("Topic has subscriptions did 
not catch up: " + backlogSubs));
+            }
+            // We can proceed with the deletion if either:
+            //  1. No one is connected and no subscriptions
+            //  2. The topic have subscriptions but no backlogs for all 
subscriptions
+            //     if delete_when_no_subscriptions is applied
+            //  3. We want to kick out everyone and forcefully delete the 
topic.
+            //     In this case, we shouldn't care if the usageCount is 0 or 
not, just proceed
+            if (!closeIfClientsConnected) {
+                if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
+                    return FutureUtil.failedFuture(
+                            new TopicBusyException("Topic has subscriptions: " 
+ subscriptions.keys()));
+                } else if (failIfHasBacklogs) {
+                    if (hasBacklogs()) {
+                        List<String> backlogSubs =
+                                subscriptions.values().stream()
+                                        .filter(sub -> 
sub.getNumberOfEntriesInBacklog(false) > 0)
+                                        
.map(PersistentSubscription::getName).collect(Collectors.toList());
+                        return FutureUtil.failedFuture(
+                                new TopicBusyException("Topic has 
subscriptions did not catch up: " + backlogSubs));
+                    } else if (!producers.isEmpty()) {
+                        return FutureUtil.failedFuture(new TopicBusyException(
+                                "Topic has " + producers.size() + " connected 
producers"));
+                    }
+                } else if (currentUsageCount() > 0) {
+                    return FutureUtil.failedFuture(new TopicBusyException(
+                            "Topic has " + currentUsageCount() + " connected 
producers/consumers"));
+                }
             }
 
             fenceTopicToCloseOrDelete(); // Avoid clients reconnections while 
deleting
             CompletableFuture<Void> closeClientFuture = new 
CompletableFuture<>();
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
             if (closeIfClientsConnected) {
-                List<CompletableFuture<Void>> futures = Lists.newArrayList();
                 replicators.forEach((cluster, replicator) -> 
futures.add(replicator.disconnect()));
                 producers.values().forEach(producer -> 
futures.add(producer.disconnect()));
-                subscriptions.forEach((s, sub) -> 
futures.add(sub.disconnect()));
-                FutureUtil.waitForAll(futures).thenRun(() -> {
-                    closeClientFuture.complete(null);
-                }).exceptionally(ex -> {
-                    log.error("[{}] Error closing clients", topic, ex);
-                    unfenceTopicToResume();
-                    closeClientFuture.completeExceptionally(ex);
-                    return null;
-                });
-            } else {
-                closeClientFuture.complete(null);
             }
+            FutureUtil.waitForAll(futures).thenRun(() -> {
+                closeClientFuture.complete(null);
+            }).exceptionally(ex -> {
+                log.error("[{}] Error closing clients", topic, ex);
+                unfenceTopicToResume();
+                closeClientFuture.completeExceptionally(ex);
+                return null;
+            });
 
             closeClientFuture.thenAccept(delete -> {
-                // We can proceed with the deletion if either:
-                //  1. No one is connected
-                //  2. We want to kick out everyone and forcefully delete the 
topic.
-                //     In this case, we shouldn't care if the usageCount is 0 
or not, just proceed
-                if (currentUsageCount() ==  0 || (closeIfClientsConnected && 
!failIfHasSubscriptions)) {
-                    CompletableFuture<Void> deleteTopicAuthenticationFuture = 
new CompletableFuture<>();
-                    brokerService.deleteTopicAuthenticationWithRetry(topic, 
deleteTopicAuthenticationFuture, 5);
-
-                    deleteTopicAuthenticationFuture.thenCompose(
-                                    __ -> deleteSchema ? deleteSchema() : 
CompletableFuture.completedFuture(null))
-                            .thenCompose(__ -> deleteTopicPolicies())
-                            .thenCompose(__ -> 
transactionBufferCleanupAndClose())
-                            .whenComplete((v, ex) -> {
-                        if (ex != null) {
-                            log.error("[{}] Error deleting topic", topic, ex);
-                            unfenceTopicToResume();
-                            deleteFuture.completeExceptionally(ex);
-                        } else {
-                            List<CompletableFuture<Void>> subsDeleteFutures = 
new ArrayList<>();
-                            subscriptions.forEach((sub, p) -> 
subsDeleteFutures.add(unsubscribe(sub)));
-
-                            
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
-                                if (e != null) {
-                                    log.error("[{}] Error deleting topic", 
topic, e);
-                                    unfenceTopicToResume();
-                                    deleteFuture.completeExceptionally(e);
-                                } else {
-                                    ledger.asyncDelete(new 
AsyncCallbacks.DeleteLedgerCallback() {
-                                        @Override
-                                        public void 
deleteLedgerComplete(Object ctx) {
-                                            
brokerService.removeTopicFromCache(PersistentTopic.this);
-
-                                            
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
-
-                                            
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
-
-                                            unregisterTopicPolicyListener();
-
-                                            log.info("[{}] Topic deleted", 
topic);
-                                            deleteFuture.complete(null);
-                                        }
-
-                                        @Override
-                                        public void 
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
-                                            if (exception.getCause()
-                                                    instanceof 
MetadataStoreException.NotFoundException) {
-                                                log.info("[{}] Topic is 
already deleted {}",
-                                                        topic, 
exception.getMessage());
-                                                deleteLedgerComplete(ctx);
-                                            } else {
-                                                unfenceTopicToResume();
-                                                log.error("[{}] Error deleting 
topic", topic, exception);
-                                                
deleteFuture.completeExceptionally(new PersistenceException(exception));
+                CompletableFuture<Void> deleteTopicAuthenticationFuture = new 
CompletableFuture<>();
+                brokerService.deleteTopicAuthenticationWithRetry(topic, 
deleteTopicAuthenticationFuture, 5);
+                deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema 
? deleteSchema() :
+                                CompletableFuture.completedFuture(null))
+                        .thenCompose(__ -> deleteTopicPolicies())
+                        .thenCompose(__ -> transactionBufferCleanupAndClose())
+                        .whenComplete((v, ex) -> {
+                            if (ex != null) {
+                                log.error("[{}] Error deleting topic", topic, 
ex);
+                                unfenceTopicToResume();
+                                deleteFuture.completeExceptionally(ex);
+                            } else {
+                                List<CompletableFuture<Void>> 
subsDeleteFutures = new ArrayList<>();
+                                subscriptions.forEach((sub, p) -> 
subsDeleteFutures.add(unsubscribe(sub)));
+                                
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
+                                    if (e != null) {
+                                        log.error("[{}] Error deleting topic", 
topic, e);
+                                        unfenceTopicToResume();
+                                        deleteFuture.completeExceptionally(e);
+                                    } else {
+                                        ledger.asyncDelete(new 
AsyncCallbacks.DeleteLedgerCallback() {
+                                            @Override
+                                            public void 
deleteLedgerComplete(Object ctx) {
+                                                
brokerService.removeTopicFromCache(PersistentTopic.this);
+
+                                                
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
+
+                                                
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
+
+                                                
unregisterTopicPolicyListener();
+
+                                                log.info("[{}] Topic deleted", 
topic);
+                                                deleteFuture.complete(null);
                                             }
-                                        }
-                                    }, null);
-                                }
-                            });
-                        }
-                    });
-                } else {
-                    unfenceTopicToResume();
-                    deleteFuture.completeExceptionally(new TopicBusyException(
-                            "Topic has " + currentUsageCount() + " connected 
producers/consumers"));
-                }
+
+                                            @Override
+                                            public void 
deleteLedgerFailed(ManagedLedgerException exception,
+                                                                           
Object ctx) {
+                                                if (exception.getCause()
+                                                        instanceof 
MetadataStoreException.NotFoundException) {
+                                                    log.info("[{}] Topic is 
already deleted {}",
+                                                            topic, 
exception.getMessage());
+                                                    deleteLedgerComplete(ctx);
+                                                } else {
+                                                    unfenceTopicToResume();
+                                                    log.error("[{}] Error 
deleting topic", topic, exception);
+                                                    
deleteFuture.completeExceptionally(
+                                                            new 
PersistenceException(exception));
+                                                }
+                                            }
+                                        }, null);
+                                    }
+                                });
+                            }
+                        });
             }).exceptionally(ex->{
                 unfenceTopicToResume();
                 deleteFuture.completeExceptionally(
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index 1a2dd423b90..9fa73ea1403 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -332,7 +332,6 @@ public class InactiveTopicDeleteTest extends BrokerTestBase 
{
             producer.send("Pulsar".getBytes());
         }
 
-        consumer.close();
         producer.close();
 
         Thread.sleep(2000);
@@ -342,6 +341,7 @@ public class InactiveTopicDeleteTest extends BrokerTestBase 
{
         admin.topics().skipAllMessages(topic, "sub");
         Awaitility.await()
                 .untilAsserted(() -> 
Assert.assertFalse(admin.topics().getList("prop/ns-abc").contains(topic)));
+        consumer.close();
     }
 
     @Test

Reply via email to