suiyuzeng commented on a change in pull request #13888:
URL: https://github.com/apache/pulsar/pull/13888#discussion_r793219191



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4704,104 +4704,110 @@ protected void 
internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
         }
 
         // Permission to consume this topic is required
-        try {
-            validateTopicOperation(topicName, 
TopicOperation.GET_REPLICATED_SUBSCRIPTION_STATUS, subName);
-        } catch (Exception e) {
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
+        CompletableFuture<Void> validateFuture =
+                validateTopicOperationAsync(topicName, 
TopicOperation.GET_REPLICATED_SUBSCRIPTION_STATUS, subName);
 
+        CompletableFuture<Void> resultFuture;
         // If the topic name is a partition name, no need to get partition 
topic metadata again
         if (topicName.isPartitioned()) {
-            
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, 
subName, authoritative);
+            resultFuture = validateFuture.thenAccept(
+                    __ -> 
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse,
+                            subName, authoritative));
         } else {
-            getPartitionedTopicMetadataAsync(topicName,
-                    authoritative, false).thenAccept(partitionMetadata -> {
-                if (partitionMetadata.partitions > 0) {
-                    final List<CompletableFuture<Map<String, Boolean>>> 
futures = Lists.newArrayList();
-                    final Map<String, Boolean> status = Maps.newHashMap();
+            resultFuture = validateFuture
+                    .thenCompose(__ -> 
getPartitionedTopicMetadataAsync(topicName, authoritative, false))
+                    .thenAccept(partitionMetadata -> {
+                        if (partitionMetadata.partitions > 0) {
+                            final List<CompletableFuture<Map<String, 
Boolean>>> futures = Lists.newArrayList();
+                            final Map<String, Boolean> status = 
Maps.newHashMap();
 
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
-                        TopicName partition = topicName.getPartition(i);
-                        try {
-                            
futures.add(pulsar().getAdminClient().topics().getReplicatedSubscriptionStatusAsync(
-                                    partition.toString(), 
subName).whenComplete((response, throwable) -> {
-                                if (throwable != null) {
-                                    log.error("[{}] Failed to get replicated 
subscriptions on {} {}",
-                                            clientAppId(), partition, subName, 
throwable);
-                                    asyncResponse.resume(new 
RestException(throwable));
+                            for (int i = 0; i < partitionMetadata.partitions; 
i++) {
+                                TopicName partition = 
topicName.getPartition(i);
+                                try {
+                                    
futures.add(pulsar().getAdminClient().topics().getReplicatedSubscriptionStatusAsync(
+                                            partition.toString(), 
subName).whenComplete((response, throwable) -> {
+                                        if (throwable != null) {
+                                            log.error("[{}] Failed to get 
replicated subscriptions on {} {}",
+                                                    clientAppId(), partition, 
subName, throwable);
+                                            asyncResponse.resume(new 
RestException(throwable));
+                                        }
+                                        status.putAll(response);
+                                    }));
+                                } catch (Exception e) {
+                                    log.warn("[{}] Failed to get replicated 
subscription status on {} {}",
+                                            clientAppId(), partition, subName, 
e);
+                                    throw new RestException(e);
                                 }
-                                status.putAll(response);
-                            }));
-                        } catch (Exception e) {
-                            log.warn("[{}] Failed to get replicated 
subscription status on {} {}",
-                                    clientAppId(), partition, subName, e);
-                            throw new RestException(e);
-                        }
-                    }
-
-                    FutureUtil.waitForAll(futures).handle((result, exception) 
-> {
-                        if (exception != null) {
-                            Throwable t = exception.getCause();
-                            if (t instanceof NotFoundException) {
-                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
-                                        "Topic or subscription not found"));
-                            } else if (t instanceof 
PreconditionFailedException) {
-                                asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
-                                        "Cannot get replicated subscriptions 
on non-global topics"));
-                            } else {
-                                log.error("[{}] Failed to get replicated 
subscription status on {} {}",
-                                        clientAppId(), topicName, subName, t);
-                                asyncResponse.resume(new RestException(t));
                             }
+
+                            FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
+                                if (exception != null) {
+                                    Throwable t = exception.getCause();
+                                    if (t instanceof NotFoundException) {
+                                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
+                                                "Topic or subscription not 
found"));
+                                    } else if (t instanceof 
PreconditionFailedException) {
+                                        asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
+                                                "Cannot get replicated 
subscriptions on non-global topics"));
+                                    } else {
+                                        log.error("[{}] Failed to get 
replicated subscription status on {} {}",
+                                                clientAppId(), topicName, 
subName, t);
+                                        asyncResponse.resume(new 
RestException(t));
+                                    }
+                                }
+                                asyncResponse.resume(status);
+                                return null;
+                            });
+                        } else {
+                            
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, 
subName,
+                                    authoritative);
                         }
-                        asyncResponse.resume(status);
-                        return null;
                     });
-                } else {
-                    
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, 
subName,
-                            authoritative);
-                }
-            }).exceptionally(ex -> {
-                log.warn("[{}] Failed to get replicated subscription status on 
{} {}", clientAppId(),
-                        topicName, subName, ex);
-                resumeAsyncResponseExceptionally(asyncResponse, ex);
-                return null;
-            });
         }
+
+        resultFuture.exceptionally(ex -> {
+            Throwable cause = FutureUtil.unwrapCompletionException(ex);
+            log.warn("[{}] Failed to get replicated subscription status on {} 
{}", clientAppId(),
+                    topicName, subName, cause);
+            resumeAsyncResponseExceptionally(asyncResponse, cause);
+            return null;
+        });
     }
 
-    private void 
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse 
asyncResponse,
+    private void internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(

Review comment:
       As the AsyncResponse was given,the function can process any exception. 
The caller no need to care the result. How about this way? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to