This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5978edcb96dca08d66a1fdb711a66ecaa530652c
Author: lipenghui <[email protected]>
AuthorDate: Wed Jan 12 21:57:34 2022 +0800

    Avoid call sync method in async rest API for delete subscription (#13666)
    
    (cherry picked from commit 8dc506c66c9d49e1393c20050ee39326d3788b7f)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 59 ++++++++++++----------
 1 file changed, 31 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 23f5742..ef2cee7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1467,35 +1467,38 @@ public class PersistentTopicsBase extends AdminResource 
{
 
     private void 
internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse,
                                                                   String 
subName, boolean authoritative) {
-        try {
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE);
-
-            Topic topic = getTopicReference(topicName);
-            Subscription sub = topic.getSubscription(subName);
-            if (sub == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Subscription not found"));
-                return;
-            }
-            sub.delete().get();
-            log.info("[{}][{}] Deleted subscription {}", clientAppId(), 
topicName, subName);
-            asyncResponse.resume(Response.noContent().build());
-        } catch (Exception e) {
-            if (e.getCause() instanceof SubscriptionBusyException) {
-                log.error("[{}] Failed to delete subscription {} from topic 
{}", clientAppId(), subName, topicName, e);
-                asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
-                    "Subscription has active connected consumers"));
-            } else if (e instanceof WebApplicationException) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Failed to delete subscription from topic 
{}, redirecting to other brokers.",
-                            clientAppId(), topicName, e);
+        validateTopicOwnershipAsync(topicName, authoritative)
+            .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.UNSUBSCRIBE))
+            .thenCompose(__ -> {
+                Topic topic = getTopicReference(topicName);
+                Subscription sub = topic.getSubscription(subName);
+                if (sub == null) {
+                    throw new RestException(Status.NOT_FOUND, "Subscription 
not found");
                 }
-                asyncResponse.resume(e);
-            } else {
-                log.error("[{}] Failed to delete subscription {} {}", 
clientAppId(), topicName, subName, e);
-                asyncResponse.resume(new RestException(e));
-            }
-        }
+                return sub.delete();
+            }).thenRun(() -> {
+                log.info("[{}][{}] Deleted subscription {}", clientAppId(), 
topicName, subName);
+                asyncResponse.resume(Response.noContent().build());
+            }).exceptionally(e -> {
+                Throwable cause = e.getCause();
+                if (cause instanceof SubscriptionBusyException) {
+                    log.error("[{}] Failed to delete subscription {} from 
topic {}", clientAppId(), subName,
+                            topicName, cause);
+                    asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
+                            "Subscription has active connected consumers"));
+                } else if (cause instanceof WebApplicationException) {
+                    if (log.isDebugEnabled() && ((WebApplicationException) 
cause).getResponse().getStatus()
+                            == Status.TEMPORARY_REDIRECT.getStatusCode()) {
+                        log.debug("[{}] Failed to delete subscription from 
topic {}, redirecting to other brokers.",
+                                clientAppId(), topicName, cause);
+                    }
+                    asyncResponse.resume(cause);
+                } else {
+                    log.error("[{}] Failed to delete subscription {} {}", 
clientAppId(), topicName, subName, cause);
+                    asyncResponse.resume(new RestException(cause));
+                }
+                return null;
+            });
     }
 
     protected void internalDeleteSubscriptionForcefully(AsyncResponse 
asyncResponse,

Reply via email to