This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 61f99cb  [Broker] Fix call sync method in async rest api for 
internalExpireMessagesByPosition (#13878)
61f99cb is described below

commit 61f99cb9676c8eb77d9dc2c3bb8a6aa2115896c0
Author: Dezhi LIiu <[email protected]>
AuthorDate: Thu Jan 27 15:39:01 2022 +0800

    [Broker] Fix call sync method in async rest api for 
internalExpireMessagesByPosition (#13878)
    
    ### Motivation
    Avoid call sync method in async rest API for 
PersistentTopicsBase#internalExpireMessagesByPosition.
    
    ### Modifications
    Use async instead of sync method.
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 98 +++++++++++++---------
 1 file changed, 58 insertions(+), 40 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index a137fbd..5acb796 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3505,39 +3505,52 @@ public class PersistentTopicsBase extends AdminResource 
{
 
     protected void internalExpireMessagesByPosition(AsyncResponse 
asyncResponse, String subName, boolean authoritative,
                                                  MessageIdImpl messageId, 
boolean isExcluded, int batchIndex) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            try {
-                validateGlobalNamespaceOwnership(namespaceName);
-            } catch (Exception e) {
-                log.warn("[{}][{}] Failed to expire messages on subscription 
{} to position {}: {}", clientAppId(),
-                        topicName, subName, messageId, e.getMessage());
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-                return;
-            }
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            future = CompletableFuture.completedFuture(null);
         }
 
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES);
-
-        log.info("[{}][{}] received expire messages on subscription {} to 
position {}", clientAppId(), topicName,
-                subName, messageId);
+        future.thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES))
+                .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                .thenCompose(__ -> {
+                    log.info("[{}][{}] received expire messages on 
subscription {} to position {}", clientAppId(),
+                            topicName, subName, messageId);
+                    return getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
+                            .thenAccept(partitionMetadata -> {
+                                if (!topicName.isPartitioned() && 
partitionMetadata.partitions > 0) {
+                                    String msg = "Expire message at position 
is not supported for partitioned-topic";
+                                    log.warn("[{}] {} {}({}) {}", 
clientAppId(), msg, topicName, messageId, subName);
+                                    asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED, msg));
+                                    return;
+                                } else if (messageId.getPartitionIndex() != 
topicName.getPartitionIndex()) {
+                                    String msg = "Invalid parameter for expire 
message by position, partition index of "
+                                            + "passed in message position 
doesn't match partition index for the topic";
+                                    log.warn("[{}] {} {}({}).", clientAppId(), 
msg, topicName, messageId);
+                                    asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED, msg));
+                                    return;
+                                } else {
+                                    
internalExpireMessagesNonPartitionedTopicByPosition(asyncResponse, subName,
+                                            messageId, isExcluded, batchIndex);
+                                }
+                            });
+                }).exceptionally(ex -> {
+            Throwable cause = ex.getCause();
+            log.error("[{}] Failed to expire messages up to {} on subscription 
{} to position {}",
+                    clientAppId(), topicName, subName, messageId, cause);
+            resumeAsyncResponseExceptionally(asyncResponse, cause);
+            return null;
+        });
+    }
 
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (!topicName.isPartitioned() && 
getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
-            log.warn("[{}] Not supported operation expire message up to {} on 
partitioned-topic {} {}",
-                    clientAppId(), messageId, topicName, subName);
-            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Expire message at position is not supported for 
partitioned-topic"));
-            return;
-        } else if (messageId.getPartitionIndex() != 
topicName.getPartitionIndex()) {
-            log.warn("[{}] Invalid parameter for expire message by position, 
partition index of passed in message"
-                            + " position {} doesn't match partition index of 
topic requested {}.",
-                    clientAppId(), messageId, topicName);
-            asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
-                    "Invalid parameter for expire message by position, 
partition index of message position "
-                            + "passed in doesn't match partition index for the 
topic."));
-        } else {
-            PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
+    private CompletableFuture<Void> 
internalExpireMessagesNonPartitionedTopicByPosition(AsyncResponse asyncResponse,
+                                                                               
         String subName,
+                                                                               
         MessageIdImpl messageId,
+                                                                               
         boolean isExcluded,
+                                                                               
         int batchIndex) {
+        return getTopicReferenceAsync(topicName).thenAccept(t -> {
+            PersistentTopic topic = (PersistentTopic) t;
             if (topic == null) {
                 asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Topic not found"));
                 return;
@@ -3545,7 +3558,8 @@ public class PersistentTopicsBase extends AdminResource {
             try {
                 PersistentSubscription sub = topic.getSubscription(subName);
                 if (sub == null) {
-                    asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Subscription not found"));
+                    asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                            "Subscription not found"));
                     return;
                 }
                 CompletableFuture<Integer> batchSizeFuture = new 
CompletableFuture<>();
@@ -3570,23 +3584,22 @@ public class PersistentTopicsBase extends AdminResource 
{
                         } else {
                             if (log.isDebugEnabled()) {
                                 log.debug("Expire message by position not 
issued on topic {} for subscription {} "
-                                        + "due to ongoing message expiration 
not finished or subscription "
-                                        + "almost catch up.", topicName, 
subName);
+                                        + "due to ongoing message expiration 
not finished or subscription almost "
+                                        + "catch up.", topicName, subName);
                             }
                             throw new RestException(Status.CONFLICT, "Expire 
message by position not issued on topic "
-                                    + topicName + " for subscription " + 
subName + " due to ongoing message expiration"
-                                    + " not finished or invalid message 
position provided.");
+                                    + topicName + " for subscription " + 
subName + " due to ongoing"
+                                    + " message expiration not finished or 
invalid message position provided.");
                         }
-                    } catch (NullPointerException npe) {
-                        throw new RestException(Status.NOT_FOUND, 
"Subscription not found");
                     } catch (Exception exception) {
                         log.error("[{}] Failed to expire messages up to {} on 
{} with subscription {} {}",
                                 clientAppId(), position, topicName, subName, 
exception);
                         throw new RestException(exception);
                     }
+                    asyncResponse.resume(Response.noContent().build());
                 }).exceptionally(e -> {
-                    log.error("[{}] Failed to expire messages up to {} on {} 
with subscription {} {}", clientAppId(),
-                            messageId, topicName, subName, e);
+                    log.error("[{}] Failed to expire messages up to {} on {} 
with subscription {} {}",
+                            clientAppId(), messageId, topicName, subName, e);
                     asyncResponse.resume(e);
                     return null;
                 });
@@ -3595,8 +3608,13 @@ public class PersistentTopicsBase extends AdminResource {
                         clientAppId(), topicName, messageId, subName, 
messageId, e);
                 resumeAsyncResponseExceptionally(asyncResponse, e);
             }
-        }
-        asyncResponse.resume(Response.noContent().build());
+        }).exceptionally(ex -> {
+            Throwable cause = ex.getCause();
+            log.error("[{}] Failed to expire messages up to {} on subscription 
{} to position {}", clientAppId(),
+                    topicName, subName, messageId, cause);
+            resumeAsyncResponseExceptionally(asyncResponse, cause);
+            return null;
+        });
     }
 
     protected void internalTriggerCompaction(AsyncResponse asyncResponse, 
boolean authoritative) {

Reply via email to