Technoboy- commented on a change in pull request #13666:
URL: https://github.com/apache/pulsar/pull/13666#discussion_r780245222
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1505,35 +1505,37 @@ protected void internalDeleteSubscription(AsyncResponse
asyncResponse, String su
private void
internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse,
String
subName, boolean authoritative) {
- try {
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE);
-
- Topic topic = getTopicReference(topicName);
- Subscription sub = topic.getSubscription(subName);
- if (sub == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Subscription not found"));
- return;
- }
- sub.delete().get();
- log.info("[{}][{}] Deleted subscription {}", clientAppId(),
topicName, subName);
- asyncResponse.resume(Response.noContent().build());
- } catch (Exception e) {
- if (e.getCause() instanceof SubscriptionBusyException) {
- log.error("[{}] Failed to delete subscription {} from topic
{}", clientAppId(), subName, topicName, e);
- asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
- "Subscription has active connected consumers"));
- } else if (e instanceof WebApplicationException) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Failed to delete subscription from topic
{}, redirecting to other brokers.",
- clientAppId(), topicName, e);
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.UNSUBSCRIBE))
+ .thenCompose(__ -> {
+ Topic topic = getTopicReference(topicName);
+ Subscription sub = topic.getSubscription(subName);
+ if (sub == null) {
+ throw new RestException(Status.NOT_FOUND, "Subscription
not found");
}
- asyncResponse.resume(e);
- } else {
- log.error("[{}] Failed to delete subscription {} {}",
clientAppId(), topicName, subName, e);
- asyncResponse.resume(new RestException(e));
- }
- }
+ return sub.delete();
+ }).thenRun(() -> {
+ log.info("[{}][{}] Deleted subscription {}", clientAppId(),
topicName, subName);
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(e -> {
+ if (e.getCause() instanceof SubscriptionBusyException) {
+ log.error("[{}] Failed to delete subscription {} from
topic {}", clientAppId(), subName,
+ topicName, e);
+ asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
+ "Subscription has active connected consumers"));
+ } else if (e instanceof WebApplicationException) {
Review comment:
e -> e.getCause()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]