This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5978edcb96dca08d66a1fdb711a66ecaa530652c Author: lipenghui <[email protected]> AuthorDate: Wed Jan 12 21:57:34 2022 +0800 Avoid call sync method in async rest API for delete subscription (#13666) (cherry picked from commit 8dc506c66c9d49e1393c20050ee39326d3788b7f) --- .../broker/admin/impl/PersistentTopicsBase.java | 59 ++++++++++++---------- 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 23f5742..ef2cee7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1467,35 +1467,38 @@ public class PersistentTopicsBase extends AdminResource { private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, boolean authoritative) { - try { - validateTopicOwnership(topicName, authoritative); - validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE); - - Topic topic = getTopicReference(topicName); - Subscription sub = topic.getSubscription(subName); - if (sub == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); - return; - } - sub.delete().get(); - log.info("[{}][{}] Deleted subscription {}", clientAppId(), topicName, subName); - asyncResponse.resume(Response.noContent().build()); - } catch (Exception e) { - if (e.getCause() instanceof SubscriptionBusyException) { - log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName, topicName, e); - asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, - "Subscription has active connected consumers")); - } else if (e instanceof WebApplicationException) { - if (log.isDebugEnabled()) { - log.debug("[{}] Failed to delete subscription from topic {}, redirecting to other brokers.", - clientAppId(), topicName, e); + validateTopicOwnershipAsync(topicName, authoritative) + .thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE)) + .thenCompose(__ -> { + Topic topic = getTopicReference(topicName); + Subscription sub = topic.getSubscription(subName); + if (sub == null) { + throw new RestException(Status.NOT_FOUND, "Subscription not found"); } - asyncResponse.resume(e); - } else { - log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, e); - asyncResponse.resume(new RestException(e)); - } - } + return sub.delete(); + }).thenRun(() -> { + log.info("[{}][{}] Deleted subscription {}", clientAppId(), topicName, subName); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(e -> { + Throwable cause = e.getCause(); + if (cause instanceof SubscriptionBusyException) { + log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName, + topicName, cause); + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Subscription has active connected consumers")); + } else if (cause instanceof WebApplicationException) { + if (log.isDebugEnabled() && ((WebApplicationException) cause).getResponse().getStatus() + == Status.TEMPORARY_REDIRECT.getStatusCode()) { + log.debug("[{}] Failed to delete subscription from topic {}, redirecting to other brokers.", + clientAppId(), topicName, cause); + } + asyncResponse.resume(cause); + } else { + log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, cause); + asyncResponse.resume(new RestException(cause)); + } + return null; + }); } protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse,
