This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4f804145e216c842dd4a174819426941b2465929
Author: Qiang Zhao <[email protected]>
AuthorDate: Thu Jan 13 00:21:29 2022 +0800

    Avoid call sync method in async rest API for force delete subscription  
(#13668)
    
    (cherry picked from commit 416e7e5d110c35cbd51f7b082b70e944244a4938)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 55 +++++++++++-----------
 1 file changed, 28 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ef2cee7..1c8aa8a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1567,33 +1567,34 @@ public class PersistentTopicsBase extends AdminResource 
{
 
     private void 
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse 
asyncResponse,
                                                                             
String subName, boolean authoritative) {
-        try {
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE);
-
-            Topic topic = getTopicReference(topicName);
-            Subscription sub = topic.getSubscription(subName);
-            if (sub == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Subscription not found"));
-                return;
-            }
-            sub.deleteForcefully().get();
-            log.info("[{}][{}] Deleted subscription forcefully {}", 
clientAppId(), topicName, subName);
-            asyncResponse.resume(Response.noContent().build());
-        } catch (Exception e) {
-            if (e instanceof WebApplicationException) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Failed to delete subscription forcefully 
from topic {},"
-                                    + " redirecting to other brokers.",
-                            clientAppId(), topicName, e);
-                }
-                asyncResponse.resume(e);
-            } else {
-                log.error("[{}] Failed to delete subscription forcefully {} 
{}",
-                        clientAppId(), topicName, subName, e);
-                asyncResponse.resume(new RestException(e));
-            }
-        }
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.UNSUBSCRIBE))
+                .thenCompose(__ -> {
+                    Topic topic = getTopicReference(topicName);
+                    Subscription sub = topic.getSubscription(subName);
+                    if (sub == null) {
+                        throw new RestException(Status.NOT_FOUND, 
"Subscription not found");
+                    }
+                    return sub.deleteForcefully();
+                }).thenRun(() -> {
+                    log.info("[{}][{}] Deleted subscription forcefully {}", 
clientAppId(), topicName, subName);
+                    asyncResponse.resume(Response.noContent().build());
+                }).exceptionally(e -> {
+                    Throwable cause = e.getCause();
+                    if (cause instanceof WebApplicationException) {
+                        if (log.isDebugEnabled() && ((WebApplicationException) 
cause).getResponse().getStatus()
+                                == Status.TEMPORARY_REDIRECT.getStatusCode()) {
+                            log.debug("[{}] Failed to delete subscription from 
topic {}, redirecting to other brokers.",
+                                    clientAppId(), topicName, cause);
+                        }
+                        asyncResponse.resume(cause);
+                    } else {
+                        log.error("[{}] Failed to delete subscription 
forcefully {} {}",
+                                clientAppId(), topicName, subName, cause);
+                        asyncResponse.resume(new RestException(cause));
+                    }
+                    return null;
+                });
     }
 
     protected void internalSkipAllMessages(AsyncResponse asyncResponse, String 
subName, boolean authoritative) {

Reply via email to