This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f9aae1ffa8cd001494a7c249d3949abe904dbf48
Author: Qiang Zhao <[email protected]>
AuthorDate: Tue Jan 18 12:07:39 2022 +0800

    Fix call sync method in async rest API for 
``internalGetSubscriptionsForNonPartitionedTopic`` (#13745)
    
    (cherry picked from commit 0a046b9f6dc4a1122bbd7d5e8a1da3a74bd2c7b0)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  5 ++-
 .../broker/admin/impl/PersistentTopicsBase.java    | 49 ++++++++++------------
 .../pulsar/broker/web/PulsarWebResource.java       | 38 +++++++++++++----
 3 files changed, 55 insertions(+), 37 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index ba61953..6e601af 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -40,6 +40,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
@@ -789,7 +790,9 @@ public abstract class AdminResource extends 
PulsarWebResource {
 
     protected void resumeAsyncResponseExceptionally(AsyncResponse 
asyncResponse, Throwable throwable) {
         if (throwable instanceof WebApplicationException) {
-            asyncResponse.resume((WebApplicationException) throwable);
+            asyncResponse.resume(throwable);
+        } else if (throwable instanceof 
BrokerServiceException.NotAllowedException) {
+            asyncResponse.resume(new RestException(Status.CONFLICT, 
throwable));
         } else {
             asyncResponse.resume(new RestException(throwable));
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 1c8aa8a..25e8ac1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1106,26 +1106,25 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse 
asyncResponse, boolean authoritative) {
-        try {
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, 
TopicOperation.GET_SUBSCRIPTIONS);
-
-            Topic topic = getTopicReference(topicName);
-            final List<String> subscriptions = Lists.newArrayList();
-            topic.getSubscriptions().forEach((subName, sub) -> 
subscriptions.add(subName));
-            asyncResponse.resume(subscriptions);
-        } catch (WebApplicationException wae) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Failed to get subscriptions for 
non-partitioned topic {},"
-                                + " redirecting to other brokers.",
-                        clientAppId(), topicName, wae);
-            }
-            resumeAsyncResponseExceptionally(asyncResponse, wae);
-            return;
-        } catch (Exception e) {
-            log.error("[{}] Failed to get list of subscriptions for {}", 
clientAppId(), topicName, e);
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-        }
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.GET_SUBSCRIPTIONS))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenAccept(topic -> 
asyncResponse.resume(Lists.newArrayList(topic.getSubscriptions().keys())))
+                .exceptionally(ex -> {
+                    Throwable cause = ex.getCause();
+                    if (cause instanceof WebApplicationException
+                            && ((WebApplicationException) 
cause).getResponse().getStatus()
+                            == Status.TEMPORARY_REDIRECT.getStatusCode()) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("[{}] Failed to get subscriptions 
for non-partitioned topic {},"
+                                                + " redirecting to other 
brokers.", clientAppId(), topicName, cause);
+                            }
+                    } else {
+                        log.error("[{}] Failed to get list of subscriptions 
for {}", clientAppId(), topicName, cause);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, cause);
+                    return null;
+                });
     }
 
     protected TopicStats internalGetStats(boolean authoritative, boolean 
getPreciseBacklog,
@@ -3537,13 +3536,9 @@ public class PersistentTopicsBase extends AdminResource {
 
     private CompletableFuture<Topic> getTopicReferenceAsync(TopicName 
topicName) {
         return 
pulsar().getBrokerService().getTopicIfExists(topicName.toString())
-                .thenCompose(optTopic -> {
-                    if (optTopic.isPresent()) {
-                        return 
CompletableFuture.completedFuture(optTopic.get());
-                    } else {
-                        return topicNotFoundReasonAsync(topicName);
-                    }
-                });
+                .thenCompose(optTopic -> optTopic
+                        .map(CompletableFuture::completedFuture)
+                        .orElseGet(() -> topicNotFoundReasonAsync(topicName)));
     }
 
     private RestException topicNotFoundReason(TopicName topicName) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index d44a812..ac2e4da 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -1080,22 +1080,42 @@ public abstract class PulsarWebResource {
     }
 
     public void validateTopicOperation(TopicName topicName, TopicOperation 
operation, String subscription) {
+        try {
+            validateTopicOperationAsync(topicName, operation, 
subscription).get();
+        } catch (InterruptedException | ExecutionException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof WebApplicationException){
+                throw (WebApplicationException) cause;
+            } else {
+                throw new RestException(cause);
+            }
+        }
+    }
+
+    public CompletableFuture<Void> validateTopicOperationAsync(TopicName 
topicName, TopicOperation operation) {
+       return validateTopicOperationAsync(topicName, operation, null);
+    }
+
+    public CompletableFuture<Void> validateTopicOperationAsync(TopicName 
topicName,
+                                                               TopicOperation 
operation, String subscription) {
         if (pulsar().getConfiguration().isAuthenticationEnabled()
                 && pulsar().getBrokerService().isAuthorizationEnabled()) {
             if (!isClientAuthenticated(clientAppId())) {
                 throw new RestException(Status.UNAUTHORIZED, "Need to 
authenticate to perform the request");
             }
-
             AuthenticationDataHttps authData = clientAuthData();
             authData.setSubscription(subscription);
-
-            Boolean isAuthorized = 
pulsar().getBrokerService().getAuthorizationService()
-                    .allowTopicOperation(topicName, operation, 
originalPrincipal(), clientAppId(), authData);
-
-            if (!isAuthorized) {
-                throw new RestException(Status.UNAUTHORIZED, 
String.format("Unauthorized to validateTopicOperation for"
-                        + " operation [%s] on topic [%s]", 
operation.toString(), topicName));
-            }
+            return pulsar().getBrokerService().getAuthorizationService()
+                    .allowTopicOperationAsync(topicName, operation, 
originalPrincipal(), clientAppId(), authData)
+                    .thenAccept(isAuthorized -> {
+                        if (!isAuthorized) {
+                            throw new RestException(Status.UNAUTHORIZED, 
String.format(
+                                    "Unauthorized to validateTopicOperation 
for operation [%s] on topic [%s]",
+                                    operation.toString(), topicName));
+                        }
+                    });
+        } else {
+            return CompletableFuture.completedFuture(null);
         }
     }
 

Reply via email to