Jason918 commented on a change in pull request #13888:
URL: https://github.com/apache/pulsar/pull/13888#discussion_r789687849
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4702,37 +4701,38 @@ protected void
internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
}
}
- private void
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse
asyncResponse,
+ private CompletableFuture<Void>
internalGetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(
Review comment:
No need to change the return type.
Do not add "Async", let's keep the method name consistent with others.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4627,24 +4627,23 @@ protected void
internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
return;
}
- // Reject the request if the topic is not global
- if (!topicName.isGlobal()) {
- asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
- "Cannot get replicated subscriptions on non-global
topics"));
- return;
- }
-
- // Permission to consume this topic is required
- try {
- validateTopicOperation(topicName,
TopicOperation.GET_REPLICATED_SUBSCRIPTION_STATUS, subName);
- } catch (Exception e) {
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
+ // 1.Permission to consume this topic is required
+ // 2.Redirect the request to the peer-cluster if the local cluster is
not included in the replication clusters
+ CompletableFuture<Void> future =
+ validateTopicOperationAsync(topicName,
TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName));
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
-
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse,
subName, authoritative);
+ future.thenCompose(
+ __ ->
internalGetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(asyncResponse,
+ subName, authoritative))
+ .exceptionally(ex -> {
+ log.warn("[{}] Failed to get replicated subscription
status on {} {}", clientAppId(),
+ topicName, subName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
} else {
getPartitionedTopicMetadataAsync(topicName,
Review comment:
`future.thenCompose` here
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4702,37 +4701,38 @@ protected void
internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
}
}
- private void
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse
asyncResponse,
+ private CompletableFuture<Void>
internalGetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(
+
AsyncResponse asyncResponse,
String subName,
boolean authoritative) {
- try {
- // Redirect the request to the appropriate broker if this broker
is not the owner of the topic
- validateTopicOwnership(topicName, authoritative);
-
- Topic topic = getTopicReference(topicName);
- if (topic == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Topic not found"));
- return;
- }
+ // Redirect the request to the appropriate broker if this broker is
not the owner of the topic
+ return validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> getTopicReferenceAsync(topicName).thenAccept(topic
-> {
+ if (topic == null) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Topic not found"));
+ return;
+ }
- Subscription sub = topic.getSubscription(subName);
- if (sub == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Subscription not found"));
- return;
- }
+ Subscription sub = topic.getSubscription(subName);
+ if (sub == null) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Subscription not found"));
+ return;
+ }
- if (topic instanceof PersistentTopic && sub instanceof
PersistentSubscription) {
- Map res = Maps.newHashMap();
- res.put(topicName.toString(), sub.isReplicated());
- asyncResponse.resume(res);
- } else {
- asyncResponse.resume(new
RestException(Status.METHOD_NOT_ALLOWED,
- "Cannot get replicated subscriptions on non-persistent
topics"));
- }
- } catch (Exception e) {
+ if (topic instanceof PersistentTopic && sub instanceof
PersistentSubscription) {
+ Map res = Maps.newHashMap();
+ res.put(topicName.toString(), sub.isReplicated());
+ asyncResponse.resume(res);
+ } else {
+ asyncResponse.resume(new
RestException(Status.METHOD_NOT_ALLOWED,
+ "Cannot get replicated subscriptions on
non-persistent topics"));
+ }
+ })
+ ).exceptionally(e -> {
log.error("[{}] Failed to get replicated subscription status on {}
{}", clientAppId(),
- topicName, subName, e);
+ topicName, subName, e.getCause());
resumeAsyncResponseExceptionally(asyncResponse, e);
Review comment:
e.getCause()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]