mattisonchao commented on a change in pull request #14039:
URL: https://github.com/apache/pulsar/pull/14039#discussion_r795109783



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2494,50 +2494,71 @@ public void readEntryComplete(Entry entry, Object ctx) {
         }
     }
 
-    protected Response internalPeekNthMessage(String subName, int 
messagePosition, boolean authoritative) {
+    protected void internalPeekNthMessage(AsyncResponse asyncResponse, String 
subName, int messagePosition,
+                                              boolean authoritative) {
         // If the topic name is a partition name, no need to get partition 
topic metadata again
         if (!topicName.isPartitioned() && 
getPartitionedTopicMetadata(topicName,
                 authoritative, false).partitions > 0) {
-            throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages 
on a partitioned topic is not allowed");
-        }
-
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
-
-        if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
-            log.error("[{}] Not supported operation of non-persistent topic {} 
{}", clientAppId(), topicName,
-                    subName);
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Peek messages on a non-persistent topic is not allowed");
+            resumeAsyncResponseExceptionally(asyncResponse, new 
RestException(Status.METHOD_NOT_ALLOWED,
+                    "Peek messages on a partitioned topic is not allowed"));
+            return;
         }
 
-        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-        PersistentReplicator repl = null;
-        PersistentSubscription sub = null;
-        Entry entry = null;
-        if (subName.startsWith(topic.getReplicatorPrefix())) {
-            repl = getReplicatorReference(subName, topic);
-        } else {
-            sub = (PersistentSubscription) getSubscriptionReference(subName, 
topic);
-        }
-        try {
-            if (subName.startsWith(topic.getReplicatorPrefix())) {
-                entry = repl.peekNthMessage(messagePosition).get();
-            } else {
-                entry = sub.peekNthMessage(messagePosition).get();
-            }
-            return generateResponseWithEntry(entry);
-        } catch (NullPointerException npe) {
-            throw new RestException(Status.NOT_FOUND, "Message not found");
-        } catch (Exception exception) {
-            log.error("[{}] Failed to peek message at position {} from {} {}", 
clientAppId(), messagePosition,
-                    topicName, subName, exception);
-            throw new RestException(exception);
-        } finally {
-            if (entry != null) {
-                entry.release();
-            }
-        }
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.PEEK_MESSAGES))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                    CompletableFuture<Entry> entry = null;

Review comment:
       This variable may cause NPE. when ``!(topic instanceof PersistentTopic)``

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2494,50 +2494,71 @@ public void readEntryComplete(Entry entry, Object ctx) {
         }
     }
 
-    protected Response internalPeekNthMessage(String subName, int 
messagePosition, boolean authoritative) {
+    protected void internalPeekNthMessage(AsyncResponse asyncResponse, String 
subName, int messagePosition,
+                                              boolean authoritative) {
         // If the topic name is a partition name, no need to get partition 
topic metadata again
         if (!topicName.isPartitioned() && 
getPartitionedTopicMetadata(topicName,
                 authoritative, false).partitions > 0) {
-            throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages 
on a partitioned topic is not allowed");
-        }
-
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
-
-        if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
-            log.error("[{}] Not supported operation of non-persistent topic {} 
{}", clientAppId(), topicName,
-                    subName);
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Peek messages on a non-persistent topic is not allowed");
+            resumeAsyncResponseExceptionally(asyncResponse, new 
RestException(Status.METHOD_NOT_ALLOWED,
+                    "Peek messages on a partitioned topic is not allowed"));
+            return;
         }
 
-        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-        PersistentReplicator repl = null;
-        PersistentSubscription sub = null;
-        Entry entry = null;
-        if (subName.startsWith(topic.getReplicatorPrefix())) {
-            repl = getReplicatorReference(subName, topic);
-        } else {
-            sub = (PersistentSubscription) getSubscriptionReference(subName, 
topic);
-        }
-        try {
-            if (subName.startsWith(topic.getReplicatorPrefix())) {
-                entry = repl.peekNthMessage(messagePosition).get();
-            } else {
-                entry = sub.peekNthMessage(messagePosition).get();
-            }
-            return generateResponseWithEntry(entry);
-        } catch (NullPointerException npe) {
-            throw new RestException(Status.NOT_FOUND, "Message not found");
-        } catch (Exception exception) {
-            log.error("[{}] Failed to peek message at position {} from {} {}", 
clientAppId(), messagePosition,
-                    topicName, subName, exception);
-            throw new RestException(exception);
-        } finally {
-            if (entry != null) {
-                entry.release();
-            }
-        }
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.PEEK_MESSAGES))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                    CompletableFuture<Entry> entry = null;
+                    if (!(topic instanceof PersistentTopic)) {
+                        log.error("[{}] Not supported operation of 
non-persistent topic {} {}",
+                                clientAppId(), topicName, subName);
+                        resumeAsyncResponseExceptionally(asyncResponse, new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                "Peek messages on a non-persistent topic is 
not allowed"));
+                    } else {
+                        if (subName.startsWith(((PersistentTopic) 
topic).getReplicatorPrefix())) {
+                            PersistentReplicator repl = 
getReplicatorReference(subName, ((PersistentTopic) topic));
+                            entry = repl.peekNthMessage(messagePosition);
+                        } else {
+                            PersistentSubscription sub = 
(PersistentSubscription) getSubscriptionReference(
+                                    subName, ((PersistentTopic) topic));
+                            entry = sub.peekNthMessage(messagePosition);
+                        }
+                    }
+                    return entry;
+                })
+                .thenAccept(entry -> {
+                    if (entry != null) {
+                        try {
+                            Response response = 
generateResponseWithEntry(entry);
+                            asyncResponse.resume(response);
+                        } catch (NullPointerException npe) {

Review comment:
       Why do we need to catch NPE here? It looks like we need to fix it 
instead of catching it.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2494,50 +2494,71 @@ public void readEntryComplete(Entry entry, Object ctx) {
         }
     }
 
-    protected Response internalPeekNthMessage(String subName, int 
messagePosition, boolean authoritative) {
+    protected void internalPeekNthMessage(AsyncResponse asyncResponse, String 
subName, int messagePosition,
+                                              boolean authoritative) {
         // If the topic name is a partition name, no need to get partition 
topic metadata again
         if (!topicName.isPartitioned() && 
getPartitionedTopicMetadata(topicName,

Review comment:
       getPartitionedTopicMetadata looks like a sync method. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to