liudezhi2098 commented on a change in pull request #13871:
URL: https://github.com/apache/pulsar/pull/13871#discussion_r789395887



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2783,49 +2783,55 @@ protected PersistentOfflineTopicStats 
internalGetBacklog(boolean authoritative)
 
     protected void internalGetBacklogSizeByMessageId(AsyncResponse 
asyncResponse,
                                                      MessageIdImpl messageId, 
boolean authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            try {
-                validateGlobalNamespaceOwnership(namespaceName);
-            } catch (Exception e) {
-                log.error("[{}] Failed to get backlog size for topic {}", 
clientAppId(), topicName, e);
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-                return;
-            }
-        }
-        PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName,
-                authoritative, false);
-        if (!topicName.isPartitioned() && partitionMetadata.partitions > 0) {
-            log.warn("[{}] Not supported calculate backlog size operation on 
partitioned-topic {}",
-                    clientAppId(), topicName);
-            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
-                    "calculate backlog size is not allowed for 
partitioned-topic"));
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
         } else {
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, TopicOperation.GET_BACKLOG_SIZE);
-            PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
-            PositionImpl pos = new PositionImpl(messageId.getLedgerId(), 
messageId.getEntryId());
-            if (topic == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Topic not found"));
-                return;
-            }
-            try {
-                ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
topic.getManagedLedger();
-                if (messageId.getLedgerId() == -1) {
-                    asyncResponse.resume(managedLedger.getTotalSize());
-                } else {
-                    
asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
-                }
-            } catch (WebApplicationException wae) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Failed to get backlog size for topic {}, 
redirecting to other brokers.",
-                            clientAppId(), topicName, wae);
-                }
-                resumeAsyncResponseExceptionally(asyncResponse, wae);
-            } catch (Exception e) {
-                log.error("[{}] Failed to get backlog size for topic {}", 
clientAppId(), topicName, e);
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-            }
+            future = CompletableFuture.completedFuture(null);
         }
+        future.thenAccept(__ -> {
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                    .thenAccept(partitionMetadata -> {
+                        if (!topicName.isPartitioned() && 
partitionMetadata.partitions > 0) {
+                            log.warn("[{}] Not supported calculate backlog 
size operation on partitioned-topic {}",
+                                    clientAppId(), topicName);
+                            asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                    "calculate backlog size is not allowed for 
partitioned-topic"));
+                        } else {
+                            validateTopicOwnership(topicName, authoritative);
+                            validateTopicOperation(topicName, 
TopicOperation.GET_BACKLOG_SIZE);
+                            PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
+                            PositionImpl pos = new 
PositionImpl(messageId.getLedgerId(), messageId.getEntryId());
+                            if (topic == null) {
+                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Topic not found"));
+                                return;
+                            }
+                            try {
+                                ManagedLedgerImpl managedLedger = 
(ManagedLedgerImpl) topic.getManagedLedger();
+                                if (messageId.getLedgerId() == -1) {
+                                    
asyncResponse.resume(managedLedger.getTotalSize());
+                                } else {
+                                    
asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
+                                }
+                            } catch (WebApplicationException wae) {

Review comment:
       fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to