mattisonchao commented on a change in pull request #13876:
URL: https://github.com/apache/pulsar/pull/13876#discussion_r789370010



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2383,42 +2383,55 @@ private PositionImpl calculatePositionAckSet(boolean 
isExcluded, int batchSize,
 
     protected void internalGetMessageById(AsyncResponse asyncResponse, long 
ledgerId, long entryId,
                                               boolean authoritative) {
-        try {
-            // will redirect if the topic not owned by current broker
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
-
-            if (topicName.isGlobal()) {
-                validateGlobalNamespaceOwnership(namespaceName);
-            }
-            PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
-            ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
topic.getManagedLedger();
-            ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new 
AsyncCallbacks.ReadEntryCallback() {
-                @Override
-                public void readEntryFailed(ManagedLedgerException exception, 
Object ctx) {
-                    asyncResponse.resume(new RestException(exception));
-                }
+        // will redirect if the topic not owned by current broker
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.PEEK_MESSAGES))
+                .thenCompose(__ -> {
+                    CompletableFuture<Void> ret;
+                    if (topicName.isGlobal()) {
+                        ret = 
validateGlobalNamespaceOwnershipAsync(namespaceName);
+                    } else {
+                        ret = CompletableFuture.completedFuture(null);
+                    }
+                    return ret.thenCompose(ignore -> {
+                        return getTopicReferenceAsync(topicName)
+                                .thenAccept(topic -> {
+                                    ManagedLedgerImpl ledger =
+                                            (ManagedLedgerImpl) 
((PersistentTopic) topic).getManagedLedger();
+                                    ledger.asyncReadEntry(new 
PositionImpl(ledgerId, entryId),
+                                            new 
AsyncCallbacks.ReadEntryCallback() {
+                                                @Override
+                                                public void 
readEntryFailed(ManagedLedgerException exception,
+                                                                            
Object ctx) {
+                                                    asyncResponse.resume(new 
RestException(exception));
+                                                }
 
-                @Override
-                public void readEntryComplete(Entry entry, Object ctx) {
-                    try {
-                        asyncResponse.resume(generateResponseWithEntry(entry));
-                    } catch (IOException exception) {
-                        asyncResponse.resume(new RestException(exception));
-                    } finally {
-                        if (entry != null) {
-                            entry.release();
-                        }
+                                                @Override
+                                                public void 
readEntryComplete(Entry entry, Object ctx) {
+                                                    try {
+                                                        
asyncResponse.resume(generateResponseWithEntry(entry));
+                                                    } catch (IOException 
exception) {
+                                                        
asyncResponse.resume(new RestException(exception));
+                                                    } finally {
+                                                        if (entry != null) {
+                                                            entry.release();
+                                                        }
+                                                    }
+                                                }
+                                            }, null);
+                                });
+                    });
+                }).exceptionally(ex -> {
+                    Throwable cause = ex.getCause();

Review comment:
       well, i think we probable missing to handle ``redirect RestException``.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to