Jason918 commented on a change in pull request #13888:
URL: https://github.com/apache/pulsar/pull/13888#discussion_r789687849



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4702,37 +4701,38 @@ protected void 
internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
         }
     }
 
-    private void 
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse 
asyncResponse,
+    private CompletableFuture<Void> 
internalGetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(

Review comment:
       No need to change the return type.
   Do not add "Async", let's keep the method name consistent with others.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4627,24 +4627,23 @@ protected void 
internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
             return;
         }
 
-        // Reject the request if the topic is not global
-        if (!topicName.isGlobal()) {
-            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Cannot get replicated subscriptions on non-global 
topics"));
-            return;
-        }
-
-        // Permission to consume this topic is required
-        try {
-            validateTopicOperation(topicName, 
TopicOperation.GET_REPLICATED_SUBSCRIPTION_STATUS, subName);
-        } catch (Exception e) {
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
+        // 1.Permission to consume this topic is required
+        // 2.Redirect the request to the peer-cluster if the local cluster is 
not included in the replication clusters
+        CompletableFuture<Void> future =
+                validateTopicOperationAsync(topicName, 
TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName)
+                        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName));
 
         // If the topic name is a partition name, no need to get partition 
topic metadata again
         if (topicName.isPartitioned()) {
-            
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, 
subName, authoritative);
+            future.thenCompose(
+                    __ -> 
internalGetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(asyncResponse,
+                            subName, authoritative))
+                    .exceptionally(ex -> {
+                        log.warn("[{}] Failed to get replicated subscription 
status on {} {}", clientAppId(),
+                                topicName, subName, ex);
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                        return null;
+                    });
         } else {
             getPartitionedTopicMetadataAsync(topicName,

Review comment:
       `future.thenCompose` here

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4702,37 +4701,38 @@ protected void 
internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
         }
     }
 
-    private void 
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse 
asyncResponse,
+    private CompletableFuture<Void> 
internalGetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(
+                                                                               
AsyncResponse asyncResponse,
                                                                                
String subName,
                                                                                
boolean authoritative) {
-        try {
-            // Redirect the request to the appropriate broker if this broker 
is not the owner of the topic
-            validateTopicOwnership(topicName, authoritative);
-
-            Topic topic = getTopicReference(topicName);
-            if (topic == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Topic not found"));
-                return;
-            }
+        // Redirect the request to the appropriate broker if this broker is 
not the owner of the topic
+        return validateTopicOwnershipAsync(topicName, authoritative)
+        .thenCompose(__ -> getTopicReferenceAsync(topicName).thenAccept(topic 
-> {
+                if (topic == null) {
+                    asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Topic not found"));
+                    return;
+                }
 
-            Subscription sub = topic.getSubscription(subName);
-            if (sub == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Subscription not found"));
-                return;
-            }
+                Subscription sub = topic.getSubscription(subName);
+                if (sub == null) {
+                    asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Subscription not found"));
+                    return;
+                }
 
-            if (topic instanceof PersistentTopic && sub instanceof 
PersistentSubscription) {
-                Map res = Maps.newHashMap();
-                res.put(topicName.toString(), sub.isReplicated());
-                asyncResponse.resume(res);
-            } else {
-                asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED,
-                        "Cannot get replicated subscriptions on non-persistent 
topics"));
-            }
-        } catch (Exception e) {
+                if (topic instanceof PersistentTopic && sub instanceof 
PersistentSubscription) {
+                    Map res = Maps.newHashMap();
+                    res.put(topicName.toString(), sub.isReplicated());
+                    asyncResponse.resume(res);
+                } else {
+                    asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED,
+                            "Cannot get replicated subscriptions on 
non-persistent topics"));
+                }
+            })
+        ).exceptionally(e -> {
             log.error("[{}] Failed to get replicated subscription status on {} 
{}", clientAppId(),
-                    topicName, subName, e);
+                    topicName, subName, e.getCause());
             resumeAsyncResponseExceptionally(asyncResponse, e);

Review comment:
       e.getCause()




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to