Technoboy- commented on a change in pull request #13888:
URL: https://github.com/apache/pulsar/pull/13888#discussion_r792515120
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4704,104 +4704,110 @@ protected void
internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
}
// Permission to consume this topic is required
- try {
- validateTopicOperation(topicName,
TopicOperation.GET_REPLICATED_SUBSCRIPTION_STATUS, subName);
- } catch (Exception e) {
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
+ CompletableFuture<Void> validateFuture =
+ validateTopicOperationAsync(topicName,
TopicOperation.GET_REPLICATED_SUBSCRIPTION_STATUS, subName);
+ CompletableFuture<Void> resultFuture;
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
-
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse,
subName, authoritative);
+ resultFuture = validateFuture.thenAccept(
+ __ ->
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse,
+ subName, authoritative));
} else {
- getPartitionedTopicMetadataAsync(topicName,
- authoritative, false).thenAccept(partitionMetadata -> {
- if (partitionMetadata.partitions > 0) {
- final List<CompletableFuture<Map<String, Boolean>>>
futures = Lists.newArrayList();
- final Map<String, Boolean> status = Maps.newHashMap();
+ resultFuture = validateFuture
+ .thenCompose(__ ->
getPartitionedTopicMetadataAsync(topicName, authoritative, false))
+ .thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions > 0) {
+ final List<CompletableFuture<Map<String,
Boolean>>> futures = Lists.newArrayList();
+ final Map<String, Boolean> status =
Maps.newHashMap();
- for (int i = 0; i < partitionMetadata.partitions; i++) {
- TopicName partition = topicName.getPartition(i);
- try {
-
futures.add(pulsar().getAdminClient().topics().getReplicatedSubscriptionStatusAsync(
- partition.toString(),
subName).whenComplete((response, throwable) -> {
- if (throwable != null) {
- log.error("[{}] Failed to get replicated
subscriptions on {} {}",
- clientAppId(), partition, subName,
throwable);
- asyncResponse.resume(new
RestException(throwable));
+ for (int i = 0; i < partitionMetadata.partitions;
i++) {
+ TopicName partition =
topicName.getPartition(i);
+ try {
+
futures.add(pulsar().getAdminClient().topics().getReplicatedSubscriptionStatusAsync(
+ partition.toString(),
subName).whenComplete((response, throwable) -> {
+ if (throwable != null) {
+ log.error("[{}] Failed to get
replicated subscriptions on {} {}",
+ clientAppId(), partition,
subName, throwable);
+ asyncResponse.resume(new
RestException(throwable));
+ }
+ status.putAll(response);
+ }));
+ } catch (Exception e) {
+ log.warn("[{}] Failed to get replicated
subscription status on {} {}",
+ clientAppId(), partition, subName,
e);
+ throw new RestException(e);
}
- status.putAll(response);
- }));
- } catch (Exception e) {
- log.warn("[{}] Failed to get replicated
subscription status on {} {}",
- clientAppId(), partition, subName, e);
- throw new RestException(e);
- }
- }
-
- FutureUtil.waitForAll(futures).handle((result, exception)
-> {
- if (exception != null) {
- Throwable t = exception.getCause();
- if (t instanceof NotFoundException) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND,
- "Topic or subscription not found"));
- } else if (t instanceof
PreconditionFailedException) {
- asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
- "Cannot get replicated subscriptions
on non-global topics"));
- } else {
- log.error("[{}] Failed to get replicated
subscription status on {} {}",
- clientAppId(), topicName, subName, t);
- asyncResponse.resume(new RestException(t));
}
+
+ FutureUtil.waitForAll(futures).handle((result,
exception) -> {
+ if (exception != null) {
+ Throwable t = exception.getCause();
+ if (t instanceof NotFoundException) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND,
+ "Topic or subscription not
found"));
+ } else if (t instanceof
PreconditionFailedException) {
+ asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
+ "Cannot get replicated
subscriptions on non-global topics"));
+ } else {
+ log.error("[{}] Failed to get
replicated subscription status on {} {}",
+ clientAppId(), topicName,
subName, t);
+ asyncResponse.resume(new
RestException(t));
+ }
+ }
+ asyncResponse.resume(status);
+ return null;
+ });
+ } else {
+
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse,
subName,
+ authoritative);
}
- asyncResponse.resume(status);
- return null;
});
- } else {
-
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse,
subName,
- authoritative);
- }
- }).exceptionally(ex -> {
- log.warn("[{}] Failed to get replicated subscription status on
{} {}", clientAppId(),
- topicName, subName, ex);
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
- });
}
+
+ resultFuture.exceptionally(ex -> {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ log.warn("[{}] Failed to get replicated subscription status on {}
{}", clientAppId(),
+ topicName, subName, cause);
+ resumeAsyncResponseExceptionally(asyncResponse, cause);
+ return null;
+ });
}
- private void
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse
asyncResponse,
+ private void internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(
Review comment:
We can make
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic to return
CompletableFuture<Void>
then we can delete 4805~4811
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]