This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 8de67dc1dd6 [fix][broker] fix delete_when_subscriptions_caught_up 
doesn't work while have active consumers (#18320)
8de67dc1dd6 is described below

commit 8de67dc1dd625e4e73673df20732b149eccee26f
Author: Penghui Li <[email protected]>
AuthorDate: Sat Nov 5 08:45:43 2022 +0800

    [fix][broker] fix delete_when_subscriptions_caught_up doesn't work while 
have active consumers (#18320)
---
 .github/workflows/ci-cpp-build.yaml                |   1 +
 .github/workflows/ci-cpp.yaml                      |   1 +
 .github/workflows/pulsar-ci-flaky.yaml             |   1 +
 .github/workflows/pulsar-ci.yaml                   |   1 +
 .../broker/service/persistent/PersistentTopic.java | 179 +++++++++++----------
 .../broker/service/InactiveTopicDeleteTest.java    |   2 +-
 .../client/api/SimpleProducerConsumerStatTest.java |   1 +
 .../pulsar/client/api/MultiTopicConsumerStats.java |   2 +-
 .../client/api/PartitionedTopicProducerStats.java  |   2 +-
 .../impl/MultiTopicConsumerStatsRecorderImpl.java  |   2 +-
 .../PartitionedTopicProducerStatsRecorderImpl.java |   2 +-
 11 files changed, 104 insertions(+), 90 deletions(-)

diff --git a/.github/workflows/ci-cpp-build.yaml 
b/.github/workflows/ci-cpp-build.yaml
index 7a450efe972..0d16a1b4cd1 100644
--- a/.github/workflows/ci-cpp-build.yaml
+++ b/.github/workflows/ci-cpp-build.yaml
@@ -22,6 +22,7 @@ on:
   pull_request:
     branches:
       - master
+      - branch-2.11
     paths:
       - '.github/workflows/**'
       - 'pulsar-client-cpp/**'
diff --git a/.github/workflows/ci-cpp.yaml b/.github/workflows/ci-cpp.yaml
index 135c41474f5..ddd3ea68b9b 100644
--- a/.github/workflows/ci-cpp.yaml
+++ b/.github/workflows/ci-cpp.yaml
@@ -22,6 +22,7 @@ on:
   pull_request:
     branches:
       - master
+      - branch-2.11
   workflow_dispatch:
 
 concurrency:
diff --git a/.github/workflows/pulsar-ci-flaky.yaml 
b/.github/workflows/pulsar-ci-flaky.yaml
index 03e3adff33a..f4a4609cb4c 100644
--- a/.github/workflows/pulsar-ci-flaky.yaml
+++ b/.github/workflows/pulsar-ci-flaky.yaml
@@ -22,6 +22,7 @@ on:
   pull_request:
     branches:
       - master
+      - branch-2.11
   workflow_dispatch:
 
 concurrency:
diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml
index e7cf347c06c..b0b5d63349c 100644
--- a/.github/workflows/pulsar-ci.yaml
+++ b/.github/workflows/pulsar-ci.yaml
@@ -22,6 +22,7 @@ on:
   pull_request:
     branches:
       - master
+      - branch-2.11
   workflow_dispatch:
 
 concurrency:
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5114ddcee54..866cb3d5cc0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1111,6 +1111,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
      *            Flag indicating whether delete should succeed if topic still 
has unconnected subscriptions. Set to
      *            false when called from admin API (it will delete the subs 
too), and set to true when called from GC
      *            thread
+     * @param failIfHasBacklogs
+     *            Flag indicating whether delete should succeed if topic has 
backlogs. Set to false when called from
+     *            admin API (it will delete the subs too), and set to true 
when called from GC thread
      * @param closeIfClientsConnected
      *            Flag indicate whether explicitly close connected
      *            producers/consumers/replicators before trying to delete 
topic.
@@ -1129,102 +1132,108 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             if (isClosingOrDeleting) {
                 log.warn("[{}] Topic is already being closed or deleted", 
topic);
                 return FutureUtil.failedFuture(new TopicFencedException("Topic 
is already fenced"));
-            } else if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
-                return FutureUtil.failedFuture(
-                        new TopicBusyException("Topic has subscriptions: " + 
subscriptions.keys()));
-            } else if (failIfHasBacklogs && hasBacklogs()) {
-                List<String> backlogSubs =
-                        subscriptions.values().stream()
-                                .filter(sub -> 
sub.getNumberOfEntriesInBacklog(false) > 0)
-                                .map(PersistentSubscription::getName).toList();
-                return FutureUtil.failedFuture(
-                        new TopicBusyException("Topic has subscriptions did 
not catch up: " + backlogSubs));
+            }
+            // We can proceed with the deletion if either:
+            //  1. No one is connected and no subscriptions
+            //  2. The topic have subscriptions but no backlogs for all 
subscriptions
+            //     if delete_when_no_subscriptions is applied
+            //  3. We want to kick out everyone and forcefully delete the 
topic.
+            //     In this case, we shouldn't care if the usageCount is 0 or 
not, just proceed
+            if (!closeIfClientsConnected) {
+                if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
+                    return FutureUtil.failedFuture(
+                            new TopicBusyException("Topic has subscriptions: " 
+ subscriptions.keys()));
+                } else if (failIfHasBacklogs) {
+                    if (hasBacklogs()) {
+                        List<String> backlogSubs =
+                                subscriptions.values().stream()
+                                        .filter(sub -> 
sub.getNumberOfEntriesInBacklog(false) > 0)
+                                        
.map(PersistentSubscription::getName).toList();
+                        return FutureUtil.failedFuture(
+                                new TopicBusyException("Topic has 
subscriptions did not catch up: " + backlogSubs));
+                    } else if (!producers.isEmpty()) {
+                        return FutureUtil.failedFuture(new TopicBusyException(
+                                "Topic has " + producers.size() + " connected 
producers"));
+                    }
+                } else if (currentUsageCount() > 0) {
+                    return FutureUtil.failedFuture(new TopicBusyException(
+                            "Topic has " + currentUsageCount() + " connected 
producers/consumers"));
+                }
             }
 
             fenceTopicToCloseOrDelete(); // Avoid clients reconnections while 
deleting
             CompletableFuture<Void> closeClientFuture = new 
CompletableFuture<>();
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
             if (closeIfClientsConnected) {
-                List<CompletableFuture<Void>> futures = Lists.newArrayList();
                 replicators.forEach((cluster, replicator) -> 
futures.add(replicator.disconnect()));
                 producers.values().forEach(producer -> 
futures.add(producer.disconnect()));
-                subscriptions.forEach((s, sub) -> 
futures.add(sub.disconnect()));
-                FutureUtil.waitForAll(futures).thenRun(() -> {
-                    closeClientFuture.complete(null);
-                }).exceptionally(ex -> {
-                    log.error("[{}] Error closing clients", topic, ex);
-                    unfenceTopicToResume();
-                    closeClientFuture.completeExceptionally(ex);
-                    return null;
-                });
-            } else {
-                closeClientFuture.complete(null);
             }
+            FutureUtil.waitForAll(futures).thenRun(() -> {
+                closeClientFuture.complete(null);
+            }).exceptionally(ex -> {
+                log.error("[{}] Error closing clients", topic, ex);
+                unfenceTopicToResume();
+                closeClientFuture.completeExceptionally(ex);
+                return null;
+            });
 
             closeClientFuture.thenAccept(delete -> {
-                // We can proceed with the deletion if either:
-                //  1. No one is connected
-                //  2. We want to kick out everyone and forcefully delete the 
topic.
-                //     In this case, we shouldn't care if the usageCount is 0 
or not, just proceed
-                if (currentUsageCount() ==  0 || (closeIfClientsConnected && 
!failIfHasSubscriptions)) {
-                    CompletableFuture<Void> deleteTopicAuthenticationFuture = 
new CompletableFuture<>();
-                    brokerService.deleteTopicAuthenticationWithRetry(topic, 
deleteTopicAuthenticationFuture, 5);
-
-                    deleteTopicAuthenticationFuture.thenCompose(__ -> 
deleteSchema())
-                            .thenCompose(__ -> deleteTopicPolicies())
-                            .thenCompose(__ -> 
transactionBufferCleanupAndClose())
-                            .whenComplete((v, ex) -> {
-                        if (ex != null) {
-                            log.error("[{}] Error deleting topic", topic, ex);
-                            unfenceTopicToResume();
-                            deleteFuture.completeExceptionally(ex);
-                        } else {
-                            List<CompletableFuture<Void>> subsDeleteFutures = 
new ArrayList<>();
-                            subscriptions.forEach((sub, p) -> 
subsDeleteFutures.add(unsubscribe(sub)));
-
-                            
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
-                                if (e != null) {
-                                    log.error("[{}] Error deleting topic", 
topic, e);
-                                    unfenceTopicToResume();
-                                    deleteFuture.completeExceptionally(e);
-                                } else {
-                                    ledger.asyncDelete(new 
AsyncCallbacks.DeleteLedgerCallback() {
-                                        @Override
-                                        public void 
deleteLedgerComplete(Object ctx) {
-                                            
brokerService.removeTopicFromCache(PersistentTopic.this);
-
-                                            
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
-
-                                            
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
-
-                                            unregisterTopicPolicyListener();
-
-                                            log.info("[{}] Topic deleted", 
topic);
-                                            deleteFuture.complete(null);
-                                        }
-
-                                        @Override
-                                        public void 
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
-                                            if (exception.getCause()
-                                                    instanceof 
MetadataStoreException.NotFoundException) {
-                                                log.info("[{}] Topic is 
already deleted {}",
-                                                        topic, 
exception.getMessage());
-                                                deleteLedgerComplete(ctx);
-                                            } else {
-                                                unfenceTopicToResume();
-                                                log.error("[{}] Error deleting 
topic", topic, exception);
-                                                
deleteFuture.completeExceptionally(new PersistenceException(exception));
+                CompletableFuture<Void> deleteTopicAuthenticationFuture = new 
CompletableFuture<>();
+                brokerService.deleteTopicAuthenticationWithRetry(topic, 
deleteTopicAuthenticationFuture, 5);
+                deleteTopicAuthenticationFuture.thenCompose(__ -> 
deleteSchema())
+                        .thenCompose(__ -> deleteTopicPolicies())
+                        .thenCompose(__ -> transactionBufferCleanupAndClose())
+                        .whenComplete((v, ex) -> {
+                            if (ex != null) {
+                                log.error("[{}] Error deleting topic", topic, 
ex);
+                                unfenceTopicToResume();
+                                deleteFuture.completeExceptionally(ex);
+                            } else {
+                                List<CompletableFuture<Void>> 
subsDeleteFutures = new ArrayList<>();
+                                subscriptions.forEach((sub, p) -> 
subsDeleteFutures.add(unsubscribe(sub)));
+
+                                
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
+                                    if (e != null) {
+                                        log.error("[{}] Error deleting topic", 
topic, e);
+                                        unfenceTopicToResume();
+                                        deleteFuture.completeExceptionally(e);
+                                    } else {
+                                        ledger.asyncDelete(new 
AsyncCallbacks.DeleteLedgerCallback() {
+                                            @Override
+                                            public void 
deleteLedgerComplete(Object ctx) {
+                                                
brokerService.removeTopicFromCache(PersistentTopic.this);
+
+                                                
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
+
+                                                
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
+
+                                                
unregisterTopicPolicyListener();
+
+                                                log.info("[{}] Topic deleted", 
topic);
+                                                deleteFuture.complete(null);
                                             }
-                                        }
-                                    }, null);
-                                }
-                            });
-                        }
-                    });
-                } else {
-                    unfenceTopicToResume();
-                    deleteFuture.completeExceptionally(new TopicBusyException(
-                            "Topic has " + currentUsageCount() + " connected 
producers/consumers"));
-                }
+
+                                            @Override
+                                            public void 
deleteLedgerFailed(ManagedLedgerException exception,
+                                                                           
Object ctx) {
+                                                if (exception.getCause()
+                                                        instanceof 
MetadataStoreException.NotFoundException) {
+                                                    log.info("[{}] Topic is 
already deleted {}",
+                                                            topic, 
exception.getMessage());
+                                                    deleteLedgerComplete(ctx);
+                                                } else {
+                                                    unfenceTopicToResume();
+                                                    log.error("[{}] Error 
deleting topic", topic, exception);
+                                                    
deleteFuture.completeExceptionally(
+                                                            new 
PersistenceException(exception));
+                                                }
+                                            }
+                                        }, null);
+                                    }
+                                });
+                            }
+                        });
             }).exceptionally(ex->{
                 unfenceTopicToResume();
                 deleteFuture.completeExceptionally(
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index 4a372c31b15..25089fdfdbd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -328,7 +328,6 @@ public class InactiveTopicDeleteTest extends BrokerTestBase 
{
             producer.send("Pulsar".getBytes());
         }
 
-        consumer.close();
         producer.close();
 
         Thread.sleep(2000);
@@ -338,6 +337,7 @@ public class InactiveTopicDeleteTest extends BrokerTestBase 
{
         admin.topics().skipAllMessages(topic, "sub");
         Awaitility.await()
                 .untilAsserted(() -> 
Assert.assertFalse(admin.topics().getList("prop/ns-abc").contains(topic)));
+        consumer.close();
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 1c02f2ea57e..0adeca4bcdb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -25,6 +25,7 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MultiTopicConsumerStats.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MultiTopicConsumerStats.java
index e1c1d3372c3..552f0954c19 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MultiTopicConsumerStats.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MultiTopicConsumerStats.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PartitionedTopicProducerStats.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PartitionedTopicProducerStats.java
index 5fd3c1f34a0..70c11a06dfb 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PartitionedTopicProducerStats.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PartitionedTopicProducerStats.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
index 17018be02be..94a93fc1db1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java
index 2f73a6af406..d396e7e693a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

Reply via email to