This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8dc506c Avoid call sync method in async rest API for delete
subscription (#13666)
8dc506c is described below
commit 8dc506c66c9d49e1393c20050ee39326d3788b7f
Author: lipenghui <[email protected]>
AuthorDate: Wed Jan 12 21:57:34 2022 +0800
Avoid call sync method in async rest API for delete subscription (#13666)
---
.../broker/admin/impl/PersistentTopicsBase.java | 59 ++++++++++++----------
1 file changed, 31 insertions(+), 28 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index e54bb59..3556899 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1505,35 +1505,38 @@ public class PersistentTopicsBase extends AdminResource
{
private void
internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse,
String
subName, boolean authoritative) {
- try {
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE);
-
- Topic topic = getTopicReference(topicName);
- Subscription sub = topic.getSubscription(subName);
- if (sub == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Subscription not found"));
- return;
- }
- sub.delete().get();
- log.info("[{}][{}] Deleted subscription {}", clientAppId(),
topicName, subName);
- asyncResponse.resume(Response.noContent().build());
- } catch (Exception e) {
- if (e.getCause() instanceof SubscriptionBusyException) {
- log.error("[{}] Failed to delete subscription {} from topic
{}", clientAppId(), subName, topicName, e);
- asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
- "Subscription has active connected consumers"));
- } else if (e instanceof WebApplicationException) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Failed to delete subscription from topic
{}, redirecting to other brokers.",
- clientAppId(), topicName, e);
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.UNSUBSCRIBE))
+ .thenCompose(__ -> {
+ Topic topic = getTopicReference(topicName);
+ Subscription sub = topic.getSubscription(subName);
+ if (sub == null) {
+ throw new RestException(Status.NOT_FOUND, "Subscription
not found");
}
- asyncResponse.resume(e);
- } else {
- log.error("[{}] Failed to delete subscription {} {}",
clientAppId(), topicName, subName, e);
- asyncResponse.resume(new RestException(e));
- }
- }
+ return sub.delete();
+ }).thenRun(() -> {
+ log.info("[{}][{}] Deleted subscription {}", clientAppId(),
topicName, subName);
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(e -> {
+ Throwable cause = e.getCause();
+ if (cause instanceof SubscriptionBusyException) {
+ log.error("[{}] Failed to delete subscription {} from
topic {}", clientAppId(), subName,
+ topicName, cause);
+ asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
+ "Subscription has active connected consumers"));
+ } else if (cause instanceof WebApplicationException) {
+ if (log.isDebugEnabled() && ((WebApplicationException)
cause).getResponse().getStatus()
+ == Status.TEMPORARY_REDIRECT.getStatusCode()) {
+ log.debug("[{}] Failed to delete subscription from
topic {}, redirecting to other brokers.",
+ clientAppId(), topicName, cause);
+ }
+ asyncResponse.resume(cause);
+ } else {
+ log.error("[{}] Failed to delete subscription {} {}",
clientAppId(), topicName, subName, cause);
+ asyncResponse.resume(new RestException(cause));
+ }
+ return null;
+ });
}
protected void internalDeleteSubscriptionForcefully(AsyncResponse
asyncResponse,