codelipenghui commented on a change in pull request #14062:
URL: https://github.com/apache/pulsar/pull/14062#discussion_r801667655



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2540,58 +2540,81 @@ protected Response internalPeekNthMessage(String 
subName, int messagePosition, b
         }
     }
 
-    protected Response internalExamineMessage(String initialPosition, long 
messagePosition, boolean authoritative) {
+    protected void internalExamineMessageAsync(AsyncResponse asyncResponse, 
String initialPosition,
+                                               long messagePosition, boolean 
authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            future = CompletableFuture.completedFuture(null);
         }
 
-        if (!topicName.isPartitioned() && 
getPartitionedTopicMetadata(topicName,
-                authoritative, false).partitions > 0) {
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Examine messages on a partitioned topic is not allowed, "
-                            + "please try examine message on specific topic 
partition");
-        }
-        validateTopicOwnership(topicName, authoritative);
-        if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
-            log.error("[{}] Not supported operation of non-persistent topic {} 
", clientAppId(), topicName);
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Examine messages on a non-persistent topic is not 
allowed");
-        }
+        long msgPos = messagePosition < 1 ? 1 : messagePosition;
+        String initPos = initialPosition == null ? "latest" : initialPosition;
 
-        if (messagePosition < 1) {
-            messagePosition = 1;
-        }
 
-        if (null == initialPosition) {
-            initialPosition = "latest";
+        if (!topicName.isPartitioned()) {
+            future = future.thenCompose(__ -> 
getPartitionedTopicMetadataAsync(topicName,
+                            authoritative, false))
+                    .thenAccept(partitionedTopicMetadata -> {
+                        if (partitionedTopicMetadata.partitions > 0) {
+                            String msg = "Examine messages on a partitioned 
topic is not allowed, "
+                                    + "please try examine message on specific 
topic partition";
+                            log.warn("[{}] {} topicName: {}", clientAppId(), 
msg, topicName);
+                            throw new RestException(Status.METHOD_NOT_ALLOWED, 
msg);
+                        }
+                    });
         }
 
-        try {
-            PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
-            long totalMessage = topic.getNumberOfEntries();
-            PositionImpl startPosition = topic.getFirstPosition();
-            long messageToSkip =
-                    initialPosition.equals("earliest") ? messagePosition : 
totalMessage - messagePosition + 1;
-            CompletableFuture<Entry> future = new CompletableFuture<>();
-            PositionImpl readPosition = topic.getPositionAfterN(startPosition, 
messageToSkip);
-            topic.asyncReadEntry(readPosition, new 
AsyncCallbacks.ReadEntryCallback() {
-                @Override
-                public void readEntryComplete(Entry entry, Object ctx) {
-                    future.complete(entry);
-                }
+        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                    if (!(topic instanceof PersistentTopic)) {
+                        throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                "Examine messages on a non-persistent topic is 
not allowed");
+                    }
 
-                @Override
-                public void readEntryFailed(ManagedLedgerException exception, 
Object ctx) {
-                    future.completeExceptionally(exception);
-                }
-            }, null);
-            return generateResponseWithEntry(future.get());
-        } catch (Exception exception) {
-            exception.printStackTrace();
-            log.error("[{}] Failed to examine message at position {} from {} 
due to {}", clientAppId(), messagePosition,
-                    topicName, exception);
-            throw new RestException(exception);
-        }
+                    try {
+                        PersistentTopic persistentTopic = (PersistentTopic) 
topic;
+                        long totalMessage = 
persistentTopic.getNumberOfEntries();
+                        PositionImpl startPosition = 
persistentTopic.getFirstPosition();
+                        long messageToSkip =
+                                initPos.equals("earliest") ? msgPos : 
totalMessage - msgPos + 1;
+                        CompletableFuture<Entry> readEntry = new 
CompletableFuture<>();
+                        PositionImpl readPosition = 
persistentTopic.getPositionAfterN(startPosition, messageToSkip);
+                        persistentTopic.asyncReadEntry(readPosition, new 
AsyncCallbacks.ReadEntryCallback() {
+                            @Override
+                            public void readEntryComplete(Entry entry, Object 
ctx) {
+                                readEntry.complete(entry);
+                            }
+
+                            @Override
+                            public void readEntryFailed(ManagedLedgerException 
exception, Object ctx) {
+                                readEntry.completeExceptionally(exception);
+                            }
+                        }, null);
+                        return readEntry;
+                    } catch (Exception exception) {

Review comment:
       ```suggestion
                       } catch (ManagedLedgerException exception) {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to