Jason918 commented on a change in pull request #13880:
URL: https://github.com/apache/pulsar/pull/13880#discussion_r793182579



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1802,123 +1802,135 @@ protected void internalSkipMessages(String subName, 
int numMessages, boolean aut
 
     protected void internalExpireMessagesForAllSubscriptions(AsyncResponse 
asyncResponse, int expireTimeInSeconds,
             boolean authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            try {
-                validateGlobalNamespaceOwnership(namespaceName);
-            } catch (Exception e) {
-                log.error("[{}] Failed to expire messages for all subscription 
on topic {}",
-                        clientAppId(), topicName, e);
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-                return;
-            }
-        }
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
-                    expireTimeInSeconds, authoritative);
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
         } else {
-            getPartitionedTopicMetadataAsync(topicName,
-                    authoritative, false).thenAccept(partitionMetadata -> {
-                if (partitionMetadata.partitions > 0) {
-                    final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+            future = CompletableFuture.completedFuture(null);
+        }
+        future.thenCompose(__ ->
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                .thenAccept(partitionMetadata -> {
+                    if (topicName.isPartitioned()) {
+                        
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
+                                partitionMetadata, expireTimeInSeconds, 
authoritative);
+                    } else {
+                        if (partitionMetadata.partitions > 0) {
+                            final List<CompletableFuture<Void>> futures =
+                                    
Lists.newArrayListWithCapacity(partitionMetadata.partitions);
 
-                    // expire messages for each partition topic
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
-                        TopicName topicNamePartition = 
topicName.getPartition(i);
+                            // expire messages for each partition topic
+                            for (int i = 0; i < partitionMetadata.partitions; 
i++) {
+                                TopicName topicNamePartition = 
topicName.getPartition(i);
+                                try {
+                                    futures.add(pulsar()
+                                            .getAdminClient()
+                                            .topics()
+                                            
.expireMessagesForAllSubscriptionsAsync(
+                                                    
topicNamePartition.toString(), expireTimeInSeconds));
+                                } catch (Exception e) {
+                                    log.error("[{}] Failed to expire messages 
up to {} on {}",
+                                            clientAppId(), expireTimeInSeconds,
+                                            topicNamePartition, e);
+                                    asyncResponse.resume(new RestException(e));
+                                    return;
+                                }
+                            }
+
+                            FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
+                                if (exception != null) {
+                                    Throwable t = exception.getCause();
+                                    log.error("[{}] Failed to expire messages 
up to {} on {}",
+                                            clientAppId(), expireTimeInSeconds,
+                                            topicName, t);
+                                    asyncResponse.resume(new RestException(t));
+                                    return null;
+                                }
+                                
asyncResponse.resume(Response.noContent().build());
+                                return null;
+                            });
+                        } else {
+                            
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
+                                    partitionMetadata, expireTimeInSeconds, 
authoritative);
+                        }
+                    }
+                }
+             )
+        ).exceptionally(ex -> {
+            Throwable cause = FutureUtil.unwrapCompletionException(ex);
+            log.error("[{}] Failed to expire messages for all subscription on 
topic {}", clientAppId(), topicName,
+                    cause);
+            resumeAsyncResponseExceptionally(asyncResponse, cause);
+            return null;
+        });
+
+    }
+
+    private void 
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(AsyncResponse 
asyncResponse,
+                                                                               
  PartitionedTopicMetadata
+                                                                               
  partitionMetadata,
+                                                                               
  int expireTimeInSeconds,
+                                                                               
  boolean authoritative) {
+        try {
+            // validate ownership and redirect if current broker is not owner
+            validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES)
+                .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                .thenCompose(__ -> 
getTopicReferenceAsync(topicName).thenAccept(t -> {
+                    if (t == null) {
+                        resumeAsyncResponseExceptionally(asyncResponse, new 
RestException(Status.NOT_FOUND,
+                                "Topic not found"));
+                        return;
+                    }
+                    if (!(t instanceof PersistentTopic)) {
+                        resumeAsyncResponseExceptionally(asyncResponse, new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                "Expire messages for all subscriptions on a 
non-persistent topic is not allowed"));
+                        return;
+                    }
+                    PersistentTopic topic = (PersistentTopic) t;
+                    final List<CompletableFuture<Void>> futures =
+                            Lists.newArrayListWithCapacity((int) 
topic.getReplicators().size());
+                    List<String> subNames =
+                            Lists.newArrayListWithCapacity((int) 
topic.getReplicators().size()
+                                    + (int) topic.getSubscriptions().size());
+                    subNames.addAll(topic.getReplicators().keys());
+                    subNames.addAll(topic.getSubscriptions().keys());
+                    for (int i = 0; i < subNames.size(); i++) {
                         try {
-                            futures.add(pulsar()
-                                    .getAdminClient()
-                                    .topics()
-                                    .expireMessagesForAllSubscriptionsAsync(
-                                            topicNamePartition.toString(), 
expireTimeInSeconds));
+                            
futures.add(internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata,
+                                    subNames.get(i), expireTimeInSeconds, 
authoritative));
                         } catch (Exception e) {
-                            log.error("[{}] Failed to expire messages up to {} 
on {}",
-                                    clientAppId(), expireTimeInSeconds,
-                                    topicNamePartition, e);
+                            log.error("[{}] Failed to expire messages for all 
subscription up to {} on {}",
+                                    clientAppId(), expireTimeInSeconds, 
topicName, e);
                             asyncResponse.resume(new RestException(e));
                             return;
                         }
                     }
 
                     FutureUtil.waitForAll(futures).handle((result, exception) 
-> {
                         if (exception != null) {
-                            Throwable t = exception.getCause();
-                            log.error("[{}] Failed to expire messages up to {} 
on {}",
-                                    clientAppId(), expireTimeInSeconds,
-                                    topicName, t);
-                            asyncResponse.resume(new RestException(t));
+                            Throwable throwable = exception.getCause();
+                            log.error("[{}] Failed to expire messages for all 
subscription up to {} on {}",
+                                    clientAppId(), expireTimeInSeconds, 
topicName, throwable);
+                            asyncResponse.resume(new RestException(throwable));
                             return null;
                         }
-
                         asyncResponse.resume(Response.noContent().build());
                         return null;
                     });
-                } else {
-                    
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
-                            expireTimeInSeconds, authoritative);
-                }
-            }).exceptionally(ex -> {
-                log.error("[{}] Failed to expire messages for all subscription 
on topic {}",
-                        clientAppId(), topicName, ex);
-                resumeAsyncResponseExceptionally(asyncResponse, ex);
-                return null;
-            });
-        }
-    }
 
-    private void 
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(AsyncResponse 
asyncResponse,
-                                                                               
  int expireTimeInSeconds,
-                                                                               
  boolean authoritative) {
-        // validate ownership and redirect if current broker is not owner
-        PersistentTopic topic;
-        try {
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES);
-            topic = (PersistentTopic) getTopicReference(topicName);
-        } catch (WebApplicationException wae) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Failed to expire messages for all subscription 
on topic {},"
-                                + " redirecting to other brokers.",
-                        clientAppId(), topicName, wae);
-            }
-            resumeAsyncResponseExceptionally(asyncResponse, wae);
-            return;
-        } catch (Exception e) {
-            log.error("[{}] Failed to expire messages for all subscription on 
topic {}",
-                    clientAppId(), topicName, e);
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-        final AtomicReference<Throwable> exception = new AtomicReference<>();
-
-        topic.getReplicators().forEach((subName, replicator) -> {
-            try {
-                internalExpireMessagesByTimestampForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
-            } catch (Throwable t) {
-                exception.set(t);
-            }
-        });
-
-        topic.getSubscriptions().forEach((subName, subscriber) -> {
-            try {
-                internalExpireMessagesByTimestampForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
-            } catch (Throwable t) {
-                exception.set(t);
-            }
-        });
-
-        if (exception.get() != null) {
-            if (exception.get() instanceof WebApplicationException) {
-                WebApplicationException wae = (WebApplicationException) 
exception.get();
-                asyncResponse.resume(wae);
-                return;
-            } else {
-                asyncResponse.resume(new RestException(exception.get()));
-                return;
+                        })
+                ).exceptionally(e -> {
+                        Throwable throwable = e.getCause();
+                        log.error("[{}] Failed to expire messages for all 
subscription up to {} on {}", clientAppId(),
+                                expireTimeInSeconds, topicName, throwable);
+                        asyncResponse.resume(new RestException(throwable));
+                        return null;
+                    });
+            } catch (Exception e) {

Review comment:
       This catch seems not necessary?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -3317,131 +3329,142 @@ protected void 
internalTerminatePartitionedTopic(AsyncResponse asyncResponse, bo
 
     protected void internalExpireMessagesByTimestamp(AsyncResponse 
asyncResponse, String subName,
                                                      int expireTimeInSeconds, 
boolean authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            try {
-                internalExpireMessagesByTimestampForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
-            } catch (WebApplicationException wae) {
-                asyncResponse.resume(wae);
-                return;
-            } catch (Exception e) {
-                asyncResponse.resume(new RestException(e));
-                return;
-            }
-            asyncResponse.resume(Response.noContent().build());
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
         } else {
-            PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
-            if (partitionMetadata.partitions > 0) {
-                final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+            future = CompletableFuture.completedFuture(null);
+        }
+        future.thenCompose(__ ->
+            // If the topic name is a partition name, no need to get partition 
topic metadata again
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                    .thenCompose(partitionMetadata -> {
+                        if (topicName.isPartitioned()) {
+                            return 
internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata, 
subName,
+                                    expireTimeInSeconds, authoritative)
+                                    .thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()));
+                        } else {
+                            if (partitionMetadata.partitions > 0) {
+                                return 
CompletableFuture.completedFuture(null).thenAccept(unused -> {
+                                    final List<CompletableFuture<Void>> 
futures = Lists.newArrayList();
 
-                // expire messages for each partition topic
-                for (int i = 0; i < partitionMetadata.partitions; i++) {
-                    TopicName topicNamePartition = topicName.getPartition(i);
-                    try {
-                        futures.add(pulsar()
-                                .getAdminClient()
-                                .topics()
-                                
.expireMessagesAsync(topicNamePartition.toString(),
-                                        subName, expireTimeInSeconds));
-                    } catch (Exception e) {
-                        log.error("[{}] Failed to expire messages up to {} on 
{}", clientAppId(), expireTimeInSeconds,
-                            topicNamePartition, e);
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-                }
+                                    // expire messages for each partition topic
+                                    for (int i = 0; i < 
partitionMetadata.partitions; i++) {
+                                        TopicName topicNamePartition = 
topicName.getPartition(i);
+                                        try {
+                                            futures.add(pulsar()
+                                                    .getAdminClient()
+                                                    .topics()
+                                                    
.expireMessagesAsync(topicNamePartition.toString(),
+                                                            subName, 
expireTimeInSeconds));
+                                        } catch (Exception e) {
+                                            log.error("[{}] Failed to expire 
messages up to {} on {}", clientAppId(),
+                                                    expireTimeInSeconds, 
topicNamePartition, e);
+                                            asyncResponse.resume(new 
RestException(e));
+                                            return;
+                                        }
+                                    }
 
-                FutureUtil.waitForAll(futures).handle((result, exception) -> {
-                    if (exception != null) {
-                        Throwable t = exception.getCause();
-                        if (t instanceof NotFoundException) {
-                            asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Subscription not found"));
-                            return null;
-                        } else {
-                            log.error("[{}] Failed to expire messages up to {} 
on {}",
-                                    clientAppId(), expireTimeInSeconds,
-                                    topicName, t);
-                            asyncResponse.resume(new RestException(t));
-                            return null;
+                                    
FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                                        if (exception != null) {
+                                            Throwable t = exception.getCause();
+                                            if (t instanceof 
NotFoundException) {
+                                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
+                                                        "Subscription not 
found"));
+                                                return null;
+                                            } else {
+                                                log.error("[{}] Failed to 
expire messages up to {} on {}",
+                                                        clientAppId(), 
expireTimeInSeconds,
+                                                        topicName, t);
+                                                asyncResponse.resume(new 
RestException(t));
+                                                return null;
+                                            }
+                                        }
+                                        
asyncResponse.resume(Response.noContent().build());
+                                        return null;
+                                    });
+                                });
+                            } else {
+                                return 
internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata,
+                                        subName, expireTimeInSeconds, 
authoritative)
+                                        .thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()));
+                            }
                         }
-                    }
-
-                    asyncResponse.resume(Response.noContent().build());
-                    return null;
-                });
-            } else {
-                try {
-                    
internalExpireMessagesByTimestampForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
-                } catch (WebApplicationException wae) {
-                    asyncResponse.resume(wae);
-                    return;
-                } catch (Exception e) {
-                    asyncResponse.resume(new RestException(e));
-                    return;
-                }
-                asyncResponse.resume(Response.noContent().build());
-            }
-        }
+                    })
+        ).exceptionally(e -> {
+            Throwable cause = e.getCause();
+            log.error("[{}] Failed to expire messages up to {} on {}", 
clientAppId(), expireTimeInSeconds, topicName,
+                    cause);
+            resumeAsyncResponseExceptionally(asyncResponse, cause);
+            return null;
+        });
     }
 
-    private void internalExpireMessagesByTimestampForSinglePartition(String 
subName, int expireTimeInSeconds,
+    private CompletableFuture<Void> 
internalExpireMessagesByTimestampForSinglePartitionAsync(
+            PartitionedTopicMetadata partitionMetadata, String subName, int 
expireTimeInSeconds,
             boolean authoritative) {
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (!topicName.isPartitioned() && 
getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
+        if (!topicName.isPartitioned() && partitionMetadata.partitions > 0) {
             String msg = "This method should not be called for partitioned 
topic";
-            log.error("[{}] {} {} {}", clientAppId(), msg, topicName, subName);
-            throw new IllegalStateException(msg);
-        }
-
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES);
-
-        if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
-            log.error("[{}] Not supported operation of non-persistent topic {} 
{}", clientAppId(), topicName, subName);
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Expire messages on a non-persistent topic is not 
allowed");
-        }
+            return FutureUtil.failedFuture(new IllegalStateException(msg));
+        } else {
+            final CompletableFuture<Void> resultFuture = new 
CompletableFuture<>();
+            try {
+                validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES)
+                    .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                    .thenCompose(__ -> 
getTopicReferenceAsync(topicName).thenAccept(t -> {
+                        if (t == null) {
+                            resultFuture.completeExceptionally(new 
RestException(Status.NOT_FOUND, "Topic not found"));
+                            return;
+                        }
+                        if (!(t instanceof PersistentTopic)) {
+                            resultFuture.completeExceptionally(new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                    "Expire messages on a non-persistent topic 
is not allowed"));
+                            return;
+                        }
+                        PersistentTopic topic = (PersistentTopic) t;
 
-        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-        try {
-            boolean issued;
-            if (subName.startsWith(topic.getReplicatorPrefix())) {
-                String remoteCluster = 
PersistentReplicator.getRemoteCluster(subName);
-                PersistentReplicator repl = (PersistentReplicator) 
topic.getPersistentReplicator(remoteCluster);
-                checkNotNull(repl);
-                issued = repl.expireMessages(expireTimeInSeconds);
-            } else {
-                PersistentSubscription sub = topic.getSubscription(subName);
-                checkNotNull(sub);
-                issued = sub.expireMessages(expireTimeInSeconds);
-            }
-            if (issued) {
-                log.info("[{}] Message expire started up to {} on {} {}", 
clientAppId(), expireTimeInSeconds, topicName,
-                        subName);
-            } else {
-                if (log.isDebugEnabled()) {
-                    log.debug("Expire message by timestamp not issued on topic 
{} for subscription {} due to ongoing "
-                            + "message expiration not finished or subscription 
almost catch up. If it's performed on "
-                            + "a partitioned topic operation might succeeded 
on other partitions, please check "
-                            + "stats of individual partition.", topicName, 
subName);
-                }
-                throw new RestException(Status.CONFLICT, "Expire message by 
timestamp not issued on topic "
-                + topicName + " for subscription " + subName + " due to 
ongoing message expiration not finished or "
-                + " subscription almost catch up. If it's performed on a 
partitioned topic operation might succeeded "
-                + "on other partitions, please check stats of individual 
partition.");
+                        boolean issued;
+                        if (subName.startsWith(topic.getReplicatorPrefix())) {
+                            String remoteCluster = 
PersistentReplicator.getRemoteCluster(subName);
+                            PersistentReplicator repl = (PersistentReplicator) 
topic
+                                    .getPersistentReplicator(remoteCluster);
+                            checkNotNull(repl);
+                            issued = repl.expireMessages(expireTimeInSeconds);
+                        } else {
+                            PersistentSubscription sub = 
topic.getSubscription(subName);
+                            checkNotNull(sub);
+                            issued = sub.expireMessages(expireTimeInSeconds);
+                        }
+                        if (issued) {
+                            log.info("[{}] Message expire started up to {} on 
{} {}", clientAppId(),
+                                    expireTimeInSeconds, topicName, subName);
+                            resultFuture.complete(__);
+                        } else {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Expire message by timestamp not 
issued on topic {} for subscription {} "
+                                        + "due to ongoing message expiration 
not finished or subscription almost"
+                                        + " catch up. If it's performed on a 
partitioned topic operation might "
+                                        + "succeeded on other partitions, 
please check stats of individual "
+                                        + "partition.", topicName, subName);
+                            }
+                            resultFuture.completeExceptionally(new 
RestException(Status.CONFLICT, "Expire message "
+                                    + "by timestamp not issued on topic " + 
topicName + " for subscription "
+                                    + subName + " due to ongoing message 
expiration not finished or subscription "
+                                    + "almost catch  up. If it's performed on 
a partitioned topic operation might"
+                                    + " succeeded on other partitions, please 
check stats of individual partition."
+                            ));
+                            return;
+                        }
+                    })
+                    ).exceptionally(e -> {
+                    resultFuture.completeExceptionally(e);
+                    return null;
+                });
+            } catch (Exception e) {

Review comment:
       Is this catch necessary? 

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -3317,131 +3329,142 @@ protected void 
internalTerminatePartitionedTopic(AsyncResponse asyncResponse, bo
 
     protected void internalExpireMessagesByTimestamp(AsyncResponse 
asyncResponse, String subName,
                                                      int expireTimeInSeconds, 
boolean authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            try {
-                internalExpireMessagesByTimestampForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
-            } catch (WebApplicationException wae) {
-                asyncResponse.resume(wae);
-                return;
-            } catch (Exception e) {
-                asyncResponse.resume(new RestException(e));
-                return;
-            }
-            asyncResponse.resume(Response.noContent().build());
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
         } else {
-            PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
-            if (partitionMetadata.partitions > 0) {
-                final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+            future = CompletableFuture.completedFuture(null);
+        }
+        future.thenCompose(__ ->
+            // If the topic name is a partition name, no need to get partition 
topic metadata again
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                    .thenCompose(partitionMetadata -> {
+                        if (topicName.isPartitioned()) {
+                            return 
internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata, 
subName,
+                                    expireTimeInSeconds, authoritative)
+                                    .thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()));
+                        } else {
+                            if (partitionMetadata.partitions > 0) {
+                                return 
CompletableFuture.completedFuture(null).thenAccept(unused -> {
+                                    final List<CompletableFuture<Void>> 
futures = Lists.newArrayList();
 
-                // expire messages for each partition topic
-                for (int i = 0; i < partitionMetadata.partitions; i++) {
-                    TopicName topicNamePartition = topicName.getPartition(i);
-                    try {
-                        futures.add(pulsar()
-                                .getAdminClient()
-                                .topics()
-                                
.expireMessagesAsync(topicNamePartition.toString(),
-                                        subName, expireTimeInSeconds));
-                    } catch (Exception e) {
-                        log.error("[{}] Failed to expire messages up to {} on 
{}", clientAppId(), expireTimeInSeconds,
-                            topicNamePartition, e);
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-                }
+                                    // expire messages for each partition topic
+                                    for (int i = 0; i < 
partitionMetadata.partitions; i++) {
+                                        TopicName topicNamePartition = 
topicName.getPartition(i);
+                                        try {
+                                            futures.add(pulsar()
+                                                    .getAdminClient()
+                                                    .topics()
+                                                    
.expireMessagesAsync(topicNamePartition.toString(),
+                                                            subName, 
expireTimeInSeconds));
+                                        } catch (Exception e) {
+                                            log.error("[{}] Failed to expire 
messages up to {} on {}", clientAppId(),
+                                                    expireTimeInSeconds, 
topicNamePartition, e);
+                                            asyncResponse.resume(new 
RestException(e));
+                                            return;
+                                        }
+                                    }
 
-                FutureUtil.waitForAll(futures).handle((result, exception) -> {
-                    if (exception != null) {
-                        Throwable t = exception.getCause();
-                        if (t instanceof NotFoundException) {
-                            asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Subscription not found"));
-                            return null;
-                        } else {
-                            log.error("[{}] Failed to expire messages up to {} 
on {}",
-                                    clientAppId(), expireTimeInSeconds,
-                                    topicName, t);
-                            asyncResponse.resume(new RestException(t));
-                            return null;
+                                    
FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                                        if (exception != null) {
+                                            Throwable t = exception.getCause();
+                                            if (t instanceof 
NotFoundException) {
+                                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
+                                                        "Subscription not 
found"));
+                                                return null;
+                                            } else {
+                                                log.error("[{}] Failed to 
expire messages up to {} on {}",
+                                                        clientAppId(), 
expireTimeInSeconds,
+                                                        topicName, t);
+                                                asyncResponse.resume(new 
RestException(t));
+                                                return null;
+                                            }
+                                        }
+                                        
asyncResponse.resume(Response.noContent().build());
+                                        return null;
+                                    });
+                                });
+                            } else {
+                                return 
internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata,
+                                        subName, expireTimeInSeconds, 
authoritative)
+                                        .thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()));
+                            }
                         }
-                    }
-
-                    asyncResponse.resume(Response.noContent().build());
-                    return null;
-                });
-            } else {
-                try {
-                    
internalExpireMessagesByTimestampForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
-                } catch (WebApplicationException wae) {
-                    asyncResponse.resume(wae);
-                    return;
-                } catch (Exception e) {
-                    asyncResponse.resume(new RestException(e));
-                    return;
-                }
-                asyncResponse.resume(Response.noContent().build());
-            }
-        }
+                    })
+        ).exceptionally(e -> {
+            Throwable cause = e.getCause();

Review comment:
       Can we use `FutureUtil.unwrapCompletionException(ex);` ?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1802,123 +1802,135 @@ protected void internalSkipMessages(String subName, 
int numMessages, boolean aut
 
     protected void internalExpireMessagesForAllSubscriptions(AsyncResponse 
asyncResponse, int expireTimeInSeconds,
             boolean authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            try {
-                validateGlobalNamespaceOwnership(namespaceName);
-            } catch (Exception e) {
-                log.error("[{}] Failed to expire messages for all subscription 
on topic {}",
-                        clientAppId(), topicName, e);
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-                return;
-            }
-        }
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
-                    expireTimeInSeconds, authoritative);
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
         } else {
-            getPartitionedTopicMetadataAsync(topicName,
-                    authoritative, false).thenAccept(partitionMetadata -> {
-                if (partitionMetadata.partitions > 0) {
-                    final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+            future = CompletableFuture.completedFuture(null);
+        }
+        future.thenCompose(__ ->
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                .thenAccept(partitionMetadata -> {
+                    if (topicName.isPartitioned()) {
+                        
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
+                                partitionMetadata, expireTimeInSeconds, 
authoritative);
+                    } else {
+                        if (partitionMetadata.partitions > 0) {
+                            final List<CompletableFuture<Void>> futures =
+                                    
Lists.newArrayListWithCapacity(partitionMetadata.partitions);
 
-                    // expire messages for each partition topic
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
-                        TopicName topicNamePartition = 
topicName.getPartition(i);
+                            // expire messages for each partition topic
+                            for (int i = 0; i < partitionMetadata.partitions; 
i++) {
+                                TopicName topicNamePartition = 
topicName.getPartition(i);
+                                try {
+                                    futures.add(pulsar()
+                                            .getAdminClient()
+                                            .topics()
+                                            
.expireMessagesForAllSubscriptionsAsync(
+                                                    
topicNamePartition.toString(), expireTimeInSeconds));
+                                } catch (Exception e) {
+                                    log.error("[{}] Failed to expire messages 
up to {} on {}",
+                                            clientAppId(), expireTimeInSeconds,
+                                            topicNamePartition, e);
+                                    asyncResponse.resume(new RestException(e));
+                                    return;
+                                }
+                            }
+
+                            FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
+                                if (exception != null) {
+                                    Throwable t = exception.getCause();
+                                    log.error("[{}] Failed to expire messages 
up to {} on {}",
+                                            clientAppId(), expireTimeInSeconds,
+                                            topicName, t);
+                                    asyncResponse.resume(new RestException(t));
+                                    return null;
+                                }
+                                
asyncResponse.resume(Response.noContent().build());
+                                return null;
+                            });
+                        } else {
+                            
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
+                                    partitionMetadata, expireTimeInSeconds, 
authoritative);
+                        }
+                    }
+                }
+             )
+        ).exceptionally(ex -> {
+            Throwable cause = FutureUtil.unwrapCompletionException(ex);
+            log.error("[{}] Failed to expire messages for all subscription on 
topic {}", clientAppId(), topicName,
+                    cause);
+            resumeAsyncResponseExceptionally(asyncResponse, cause);
+            return null;
+        });
+
+    }
+
+    private void 
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(AsyncResponse 
asyncResponse,
+                                                                               
  PartitionedTopicMetadata
+                                                                               
  partitionMetadata,
+                                                                               
  int expireTimeInSeconds,
+                                                                               
  boolean authoritative) {
+        try {
+            // validate ownership and redirect if current broker is not owner
+            validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES)
+                .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                .thenCompose(__ -> 
getTopicReferenceAsync(topicName).thenAccept(t -> {
+                    if (t == null) {
+                        resumeAsyncResponseExceptionally(asyncResponse, new 
RestException(Status.NOT_FOUND,
+                                "Topic not found"));
+                        return;
+                    }
+                    if (!(t instanceof PersistentTopic)) {
+                        resumeAsyncResponseExceptionally(asyncResponse, new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                "Expire messages for all subscriptions on a 
non-persistent topic is not allowed"));
+                        return;
+                    }
+                    PersistentTopic topic = (PersistentTopic) t;
+                    final List<CompletableFuture<Void>> futures =
+                            Lists.newArrayListWithCapacity((int) 
topic.getReplicators().size());
+                    List<String> subNames =
+                            Lists.newArrayListWithCapacity((int) 
topic.getReplicators().size()
+                                    + (int) topic.getSubscriptions().size());
+                    subNames.addAll(topic.getReplicators().keys());
+                    subNames.addAll(topic.getSubscriptions().keys());
+                    for (int i = 0; i < subNames.size(); i++) {
                         try {
-                            futures.add(pulsar()
-                                    .getAdminClient()
-                                    .topics()
-                                    .expireMessagesForAllSubscriptionsAsync(
-                                            topicNamePartition.toString(), 
expireTimeInSeconds));
+                            
futures.add(internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata,
+                                    subNames.get(i), expireTimeInSeconds, 
authoritative));
                         } catch (Exception e) {
-                            log.error("[{}] Failed to expire messages up to {} 
on {}",
-                                    clientAppId(), expireTimeInSeconds,
-                                    topicNamePartition, e);
+                            log.error("[{}] Failed to expire messages for all 
subscription up to {} on {}",
+                                    clientAppId(), expireTimeInSeconds, 
topicName, e);
                             asyncResponse.resume(new RestException(e));
                             return;
                         }
                     }
 
                     FutureUtil.waitForAll(futures).handle((result, exception) 
-> {
                         if (exception != null) {
-                            Throwable t = exception.getCause();
-                            log.error("[{}] Failed to expire messages up to {} 
on {}",
-                                    clientAppId(), expireTimeInSeconds,
-                                    topicName, t);
-                            asyncResponse.resume(new RestException(t));
+                            Throwable throwable = exception.getCause();

Review comment:
       Can we use `FutureUtil.unwrapCompletionException(ex);` ?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -3317,131 +3329,142 @@ protected void 
internalTerminatePartitionedTopic(AsyncResponse asyncResponse, bo
 
     protected void internalExpireMessagesByTimestamp(AsyncResponse 
asyncResponse, String subName,
                                                      int expireTimeInSeconds, 
boolean authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            try {
-                internalExpireMessagesByTimestampForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
-            } catch (WebApplicationException wae) {
-                asyncResponse.resume(wae);
-                return;
-            } catch (Exception e) {
-                asyncResponse.resume(new RestException(e));
-                return;
-            }
-            asyncResponse.resume(Response.noContent().build());
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
         } else {
-            PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
-            if (partitionMetadata.partitions > 0) {
-                final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+            future = CompletableFuture.completedFuture(null);
+        }
+        future.thenCompose(__ ->
+            // If the topic name is a partition name, no need to get partition 
topic metadata again
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                    .thenCompose(partitionMetadata -> {
+                        if (topicName.isPartitioned()) {
+                            return 
internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata, 
subName,
+                                    expireTimeInSeconds, authoritative)
+                                    .thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()));
+                        } else {
+                            if (partitionMetadata.partitions > 0) {
+                                return 
CompletableFuture.completedFuture(null).thenAccept(unused -> {
+                                    final List<CompletableFuture<Void>> 
futures = Lists.newArrayList();
 
-                // expire messages for each partition topic
-                for (int i = 0; i < partitionMetadata.partitions; i++) {
-                    TopicName topicNamePartition = topicName.getPartition(i);
-                    try {
-                        futures.add(pulsar()
-                                .getAdminClient()
-                                .topics()
-                                
.expireMessagesAsync(topicNamePartition.toString(),
-                                        subName, expireTimeInSeconds));
-                    } catch (Exception e) {
-                        log.error("[{}] Failed to expire messages up to {} on 
{}", clientAppId(), expireTimeInSeconds,
-                            topicNamePartition, e);
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-                }
+                                    // expire messages for each partition topic
+                                    for (int i = 0; i < 
partitionMetadata.partitions; i++) {
+                                        TopicName topicNamePartition = 
topicName.getPartition(i);
+                                        try {
+                                            futures.add(pulsar()
+                                                    .getAdminClient()
+                                                    .topics()
+                                                    
.expireMessagesAsync(topicNamePartition.toString(),
+                                                            subName, 
expireTimeInSeconds));
+                                        } catch (Exception e) {
+                                            log.error("[{}] Failed to expire 
messages up to {} on {}", clientAppId(),
+                                                    expireTimeInSeconds, 
topicNamePartition, e);
+                                            asyncResponse.resume(new 
RestException(e));
+                                            return;
+                                        }
+                                    }
 
-                FutureUtil.waitForAll(futures).handle((result, exception) -> {
-                    if (exception != null) {
-                        Throwable t = exception.getCause();
-                        if (t instanceof NotFoundException) {
-                            asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Subscription not found"));
-                            return null;
-                        } else {
-                            log.error("[{}] Failed to expire messages up to {} 
on {}",
-                                    clientAppId(), expireTimeInSeconds,
-                                    topicName, t);
-                            asyncResponse.resume(new RestException(t));
-                            return null;
+                                    
FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                                        if (exception != null) {
+                                            Throwable t = exception.getCause();
+                                            if (t instanceof 
NotFoundException) {
+                                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
+                                                        "Subscription not 
found"));
+                                                return null;
+                                            } else {
+                                                log.error("[{}] Failed to 
expire messages up to {} on {}",
+                                                        clientAppId(), 
expireTimeInSeconds,
+                                                        topicName, t);
+                                                asyncResponse.resume(new 
RestException(t));
+                                                return null;
+                                            }
+                                        }
+                                        
asyncResponse.resume(Response.noContent().build());
+                                        return null;
+                                    });
+                                });
+                            } else {
+                                return 
internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata,
+                                        subName, expireTimeInSeconds, 
authoritative)
+                                        .thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()));
+                            }
                         }
-                    }
-
-                    asyncResponse.resume(Response.noContent().build());
-                    return null;
-                });
-            } else {
-                try {
-                    
internalExpireMessagesByTimestampForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
-                } catch (WebApplicationException wae) {
-                    asyncResponse.resume(wae);
-                    return;
-                } catch (Exception e) {
-                    asyncResponse.resume(new RestException(e));
-                    return;
-                }
-                asyncResponse.resume(Response.noContent().build());
-            }
-        }
+                    })
+        ).exceptionally(e -> {
+            Throwable cause = e.getCause();
+            log.error("[{}] Failed to expire messages up to {} on {}", 
clientAppId(), expireTimeInSeconds, topicName,
+                    cause);
+            resumeAsyncResponseExceptionally(asyncResponse, cause);
+            return null;
+        });
     }
 
-    private void internalExpireMessagesByTimestampForSinglePartition(String 
subName, int expireTimeInSeconds,
+    private CompletableFuture<Void> 
internalExpireMessagesByTimestampForSinglePartitionAsync(
+            PartitionedTopicMetadata partitionMetadata, String subName, int 
expireTimeInSeconds,
             boolean authoritative) {
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (!topicName.isPartitioned() && 
getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
+        if (!topicName.isPartitioned() && partitionMetadata.partitions > 0) {
             String msg = "This method should not be called for partitioned 
topic";
-            log.error("[{}] {} {} {}", clientAppId(), msg, topicName, subName);
-            throw new IllegalStateException(msg);
-        }
-
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES);
-
-        if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
-            log.error("[{}] Not supported operation of non-persistent topic {} 
{}", clientAppId(), topicName, subName);
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Expire messages on a non-persistent topic is not 
allowed");
-        }
+            return FutureUtil.failedFuture(new IllegalStateException(msg));
+        } else {
+            final CompletableFuture<Void> resultFuture = new 
CompletableFuture<>();
+            try {
+                validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES)
+                    .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                    .thenCompose(__ -> 
getTopicReferenceAsync(topicName).thenAccept(t -> {
+                        if (t == null) {
+                            resultFuture.completeExceptionally(new 
RestException(Status.NOT_FOUND, "Topic not found"));
+                            return;
+                        }
+                        if (!(t instanceof PersistentTopic)) {
+                            resultFuture.completeExceptionally(new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                    "Expire messages on a non-persistent topic 
is not allowed"));
+                            return;
+                        }
+                        PersistentTopic topic = (PersistentTopic) t;
 
-        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-        try {
-            boolean issued;
-            if (subName.startsWith(topic.getReplicatorPrefix())) {
-                String remoteCluster = 
PersistentReplicator.getRemoteCluster(subName);
-                PersistentReplicator repl = (PersistentReplicator) 
topic.getPersistentReplicator(remoteCluster);
-                checkNotNull(repl);
-                issued = repl.expireMessages(expireTimeInSeconds);
-            } else {
-                PersistentSubscription sub = topic.getSubscription(subName);
-                checkNotNull(sub);
-                issued = sub.expireMessages(expireTimeInSeconds);
-            }
-            if (issued) {
-                log.info("[{}] Message expire started up to {} on {} {}", 
clientAppId(), expireTimeInSeconds, topicName,
-                        subName);
-            } else {
-                if (log.isDebugEnabled()) {
-                    log.debug("Expire message by timestamp not issued on topic 
{} for subscription {} due to ongoing "
-                            + "message expiration not finished or subscription 
almost catch up. If it's performed on "
-                            + "a partitioned topic operation might succeeded 
on other partitions, please check "
-                            + "stats of individual partition.", topicName, 
subName);
-                }
-                throw new RestException(Status.CONFLICT, "Expire message by 
timestamp not issued on topic "
-                + topicName + " for subscription " + subName + " due to 
ongoing message expiration not finished or "
-                + " subscription almost catch up. If it's performed on a 
partitioned topic operation might succeeded "
-                + "on other partitions, please check stats of individual 
partition.");
+                        boolean issued;
+                        if (subName.startsWith(topic.getReplicatorPrefix())) {
+                            String remoteCluster = 
PersistentReplicator.getRemoteCluster(subName);
+                            PersistentReplicator repl = (PersistentReplicator) 
topic
+                                    .getPersistentReplicator(remoteCluster);
+                            checkNotNull(repl);
+                            issued = repl.expireMessages(expireTimeInSeconds);
+                        } else {
+                            PersistentSubscription sub = 
topic.getSubscription(subName);
+                            checkNotNull(sub);
+                            issued = sub.expireMessages(expireTimeInSeconds);
+                        }
+                        if (issued) {
+                            log.info("[{}] Message expire started up to {} on 
{} {}", clientAppId(),
+                                    expireTimeInSeconds, topicName, subName);
+                            resultFuture.complete(__);
+                        } else {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Expire message by timestamp not 
issued on topic {} for subscription {} "
+                                        + "due to ongoing message expiration 
not finished or subscription almost"
+                                        + " catch up. If it's performed on a 
partitioned topic operation might "
+                                        + "succeeded on other partitions, 
please check stats of individual "
+                                        + "partition.", topicName, subName);
+                            }
+                            resultFuture.completeExceptionally(new 
RestException(Status.CONFLICT, "Expire message "
+                                    + "by timestamp not issued on topic " + 
topicName + " for subscription "
+                                    + subName + " due to ongoing message 
expiration not finished or subscription "
+                                    + "almost catch  up. If it's performed on 
a partitioned topic operation might"
+                                    + " succeeded on other partitions, please 
check stats of individual partition."
+                            ));
+                            return;
+                        }
+                    })
+                    ).exceptionally(e -> {

Review comment:
       Can we use `FutureUtil.unwrapCompletionException(ex);` ?
   Need error log?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -3317,131 +3329,142 @@ protected void 
internalTerminatePartitionedTopic(AsyncResponse asyncResponse, bo
 
     protected void internalExpireMessagesByTimestamp(AsyncResponse 
asyncResponse, String subName,
                                                      int expireTimeInSeconds, 
boolean authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            try {
-                internalExpireMessagesByTimestampForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
-            } catch (WebApplicationException wae) {
-                asyncResponse.resume(wae);
-                return;
-            } catch (Exception e) {
-                asyncResponse.resume(new RestException(e));
-                return;
-            }
-            asyncResponse.resume(Response.noContent().build());
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
         } else {
-            PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
-            if (partitionMetadata.partitions > 0) {
-                final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+            future = CompletableFuture.completedFuture(null);
+        }
+        future.thenCompose(__ ->
+            // If the topic name is a partition name, no need to get partition 
topic metadata again
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                    .thenCompose(partitionMetadata -> {
+                        if (topicName.isPartitioned()) {
+                            return 
internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata, 
subName,
+                                    expireTimeInSeconds, authoritative)
+                                    .thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()));
+                        } else {
+                            if (partitionMetadata.partitions > 0) {
+                                return 
CompletableFuture.completedFuture(null).thenAccept(unused -> {
+                                    final List<CompletableFuture<Void>> 
futures = Lists.newArrayList();
 
-                // expire messages for each partition topic
-                for (int i = 0; i < partitionMetadata.partitions; i++) {
-                    TopicName topicNamePartition = topicName.getPartition(i);
-                    try {
-                        futures.add(pulsar()
-                                .getAdminClient()
-                                .topics()
-                                
.expireMessagesAsync(topicNamePartition.toString(),
-                                        subName, expireTimeInSeconds));
-                    } catch (Exception e) {
-                        log.error("[{}] Failed to expire messages up to {} on 
{}", clientAppId(), expireTimeInSeconds,
-                            topicNamePartition, e);
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-                }
+                                    // expire messages for each partition topic
+                                    for (int i = 0; i < 
partitionMetadata.partitions; i++) {
+                                        TopicName topicNamePartition = 
topicName.getPartition(i);
+                                        try {
+                                            futures.add(pulsar()
+                                                    .getAdminClient()
+                                                    .topics()
+                                                    
.expireMessagesAsync(topicNamePartition.toString(),
+                                                            subName, 
expireTimeInSeconds));
+                                        } catch (Exception e) {
+                                            log.error("[{}] Failed to expire 
messages up to {} on {}", clientAppId(),
+                                                    expireTimeInSeconds, 
topicNamePartition, e);
+                                            asyncResponse.resume(new 
RestException(e));
+                                            return;
+                                        }
+                                    }
 
-                FutureUtil.waitForAll(futures).handle((result, exception) -> {
-                    if (exception != null) {
-                        Throwable t = exception.getCause();
-                        if (t instanceof NotFoundException) {
-                            asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Subscription not found"));
-                            return null;
-                        } else {
-                            log.error("[{}] Failed to expire messages up to {} 
on {}",
-                                    clientAppId(), expireTimeInSeconds,
-                                    topicName, t);
-                            asyncResponse.resume(new RestException(t));
-                            return null;
+                                    
FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                                        if (exception != null) {
+                                            Throwable t = exception.getCause();
+                                            if (t instanceof 
NotFoundException) {
+                                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
+                                                        "Subscription not 
found"));
+                                                return null;
+                                            } else {
+                                                log.error("[{}] Failed to 
expire messages up to {} on {}",
+                                                        clientAppId(), 
expireTimeInSeconds,
+                                                        topicName, t);
+                                                asyncResponse.resume(new 
RestException(t));
+                                                return null;
+                                            }
+                                        }
+                                        
asyncResponse.resume(Response.noContent().build());
+                                        return null;
+                                    });
+                                });
+                            } else {
+                                return 
internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata,
+                                        subName, expireTimeInSeconds, 
authoritative)
+                                        .thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()));
+                            }
                         }
-                    }
-
-                    asyncResponse.resume(Response.noContent().build());
-                    return null;
-                });
-            } else {
-                try {
-                    
internalExpireMessagesByTimestampForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
-                } catch (WebApplicationException wae) {
-                    asyncResponse.resume(wae);
-                    return;
-                } catch (Exception e) {
-                    asyncResponse.resume(new RestException(e));
-                    return;
-                }
-                asyncResponse.resume(Response.noContent().build());
-            }
-        }
+                    })
+        ).exceptionally(e -> {
+            Throwable cause = e.getCause();
+            log.error("[{}] Failed to expire messages up to {} on {}", 
clientAppId(), expireTimeInSeconds, topicName,
+                    cause);
+            resumeAsyncResponseExceptionally(asyncResponse, cause);
+            return null;
+        });
     }
 
-    private void internalExpireMessagesByTimestampForSinglePartition(String 
subName, int expireTimeInSeconds,
+    private CompletableFuture<Void> 
internalExpireMessagesByTimestampForSinglePartitionAsync(
+            PartitionedTopicMetadata partitionMetadata, String subName, int 
expireTimeInSeconds,
             boolean authoritative) {
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (!topicName.isPartitioned() && 
getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
+        if (!topicName.isPartitioned() && partitionMetadata.partitions > 0) {
             String msg = "This method should not be called for partitioned 
topic";
-            log.error("[{}] {} {} {}", clientAppId(), msg, topicName, subName);
-            throw new IllegalStateException(msg);
-        }
-
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES);
-
-        if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
-            log.error("[{}] Not supported operation of non-persistent topic {} 
{}", clientAppId(), topicName, subName);
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Expire messages on a non-persistent topic is not 
allowed");
-        }
+            return FutureUtil.failedFuture(new IllegalStateException(msg));
+        } else {
+            final CompletableFuture<Void> resultFuture = new 
CompletableFuture<>();
+            try {
+                validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES)
+                    .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                    .thenCompose(__ -> 
getTopicReferenceAsync(topicName).thenAccept(t -> {
+                        if (t == null) {
+                            resultFuture.completeExceptionally(new 
RestException(Status.NOT_FOUND, "Topic not found"));
+                            return;
+                        }
+                        if (!(t instanceof PersistentTopic)) {
+                            resultFuture.completeExceptionally(new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                    "Expire messages on a non-persistent topic 
is not allowed"));
+                            return;
+                        }
+                        PersistentTopic topic = (PersistentTopic) t;
 
-        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-        try {
-            boolean issued;
-            if (subName.startsWith(topic.getReplicatorPrefix())) {
-                String remoteCluster = 
PersistentReplicator.getRemoteCluster(subName);
-                PersistentReplicator repl = (PersistentReplicator) 
topic.getPersistentReplicator(remoteCluster);
-                checkNotNull(repl);
-                issued = repl.expireMessages(expireTimeInSeconds);
-            } else {
-                PersistentSubscription sub = topic.getSubscription(subName);
-                checkNotNull(sub);
-                issued = sub.expireMessages(expireTimeInSeconds);
-            }
-            if (issued) {
-                log.info("[{}] Message expire started up to {} on {} {}", 
clientAppId(), expireTimeInSeconds, topicName,
-                        subName);
-            } else {
-                if (log.isDebugEnabled()) {
-                    log.debug("Expire message by timestamp not issued on topic 
{} for subscription {} due to ongoing "
-                            + "message expiration not finished or subscription 
almost catch up. If it's performed on "
-                            + "a partitioned topic operation might succeeded 
on other partitions, please check "
-                            + "stats of individual partition.", topicName, 
subName);
-                }
-                throw new RestException(Status.CONFLICT, "Expire message by 
timestamp not issued on topic "
-                + topicName + " for subscription " + subName + " due to 
ongoing message expiration not finished or "
-                + " subscription almost catch up. If it's performed on a 
partitioned topic operation might succeeded "
-                + "on other partitions, please check stats of individual 
partition.");
+                        boolean issued;
+                        if (subName.startsWith(topic.getReplicatorPrefix())) {
+                            String remoteCluster = 
PersistentReplicator.getRemoteCluster(subName);
+                            PersistentReplicator repl = (PersistentReplicator) 
topic
+                                    .getPersistentReplicator(remoteCluster);
+                            checkNotNull(repl);
+                            issued = repl.expireMessages(expireTimeInSeconds);
+                        } else {
+                            PersistentSubscription sub = 
topic.getSubscription(subName);
+                            checkNotNull(sub);
+                            issued = sub.expireMessages(expireTimeInSeconds);
+                        }
+                        if (issued) {
+                            log.info("[{}] Message expire started up to {} on 
{} {}", clientAppId(),
+                                    expireTimeInSeconds, topicName, subName);
+                            resultFuture.complete(__);
+                        } else {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Expire message by timestamp not 
issued on topic {} for subscription {} "
+                                        + "due to ongoing message expiration 
not finished or subscription almost"
+                                        + " catch up. If it's performed on a 
partitioned topic operation might "
+                                        + "succeeded on other partitions, 
please check stats of individual "
+                                        + "partition.", topicName, subName);
+                            }
+                            resultFuture.completeExceptionally(new 
RestException(Status.CONFLICT, "Expire message "
+                                    + "by timestamp not issued on topic " + 
topicName + " for subscription "
+                                    + subName + " due to ongoing message 
expiration not finished or subscription "
+                                    + "almost catch  up. If it's performed on 
a partitioned topic operation might"
+                                    + " succeeded on other partitions, please 
check stats of individual partition."
+                            ));
+                            return;
+                        }
+                    })
+                    ).exceptionally(e -> {
+                    resultFuture.completeExceptionally(e);
+                    return null;
+                });
+            } catch (Exception e) {
+                resultFuture.completeExceptionally(e);
+                return null;

Review comment:
       should return `resultFuture`

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1802,123 +1802,135 @@ protected void internalSkipMessages(String subName, 
int numMessages, boolean aut
 
     protected void internalExpireMessagesForAllSubscriptions(AsyncResponse 
asyncResponse, int expireTimeInSeconds,
             boolean authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            try {
-                validateGlobalNamespaceOwnership(namespaceName);
-            } catch (Exception e) {
-                log.error("[{}] Failed to expire messages for all subscription 
on topic {}",
-                        clientAppId(), topicName, e);
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-                return;
-            }
-        }
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
-                    expireTimeInSeconds, authoritative);
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
         } else {
-            getPartitionedTopicMetadataAsync(topicName,
-                    authoritative, false).thenAccept(partitionMetadata -> {
-                if (partitionMetadata.partitions > 0) {
-                    final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+            future = CompletableFuture.completedFuture(null);
+        }
+        future.thenCompose(__ ->
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                .thenAccept(partitionMetadata -> {
+                    if (topicName.isPartitioned()) {
+                        
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
+                                partitionMetadata, expireTimeInSeconds, 
authoritative);
+                    } else {
+                        if (partitionMetadata.partitions > 0) {
+                            final List<CompletableFuture<Void>> futures =
+                                    
Lists.newArrayListWithCapacity(partitionMetadata.partitions);
 
-                    // expire messages for each partition topic
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
-                        TopicName topicNamePartition = 
topicName.getPartition(i);
+                            // expire messages for each partition topic
+                            for (int i = 0; i < partitionMetadata.partitions; 
i++) {
+                                TopicName topicNamePartition = 
topicName.getPartition(i);
+                                try {
+                                    futures.add(pulsar()
+                                            .getAdminClient()
+                                            .topics()
+                                            
.expireMessagesForAllSubscriptionsAsync(
+                                                    
topicNamePartition.toString(), expireTimeInSeconds));
+                                } catch (Exception e) {
+                                    log.error("[{}] Failed to expire messages 
up to {} on {}",
+                                            clientAppId(), expireTimeInSeconds,
+                                            topicNamePartition, e);
+                                    asyncResponse.resume(new RestException(e));
+                                    return;
+                                }
+                            }
+
+                            FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
+                                if (exception != null) {
+                                    Throwable t = exception.getCause();
+                                    log.error("[{}] Failed to expire messages 
up to {} on {}",
+                                            clientAppId(), expireTimeInSeconds,
+                                            topicName, t);
+                                    asyncResponse.resume(new RestException(t));
+                                    return null;
+                                }
+                                
asyncResponse.resume(Response.noContent().build());
+                                return null;
+                            });
+                        } else {
+                            
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
+                                    partitionMetadata, expireTimeInSeconds, 
authoritative);
+                        }
+                    }
+                }
+             )
+        ).exceptionally(ex -> {
+            Throwable cause = FutureUtil.unwrapCompletionException(ex);
+            log.error("[{}] Failed to expire messages for all subscription on 
topic {}", clientAppId(), topicName,
+                    cause);
+            resumeAsyncResponseExceptionally(asyncResponse, cause);
+            return null;
+        });
+
+    }
+
+    private void 
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(AsyncResponse 
asyncResponse,
+                                                                               
  PartitionedTopicMetadata
+                                                                               
  partitionMetadata,
+                                                                               
  int expireTimeInSeconds,
+                                                                               
  boolean authoritative) {
+        try {
+            // validate ownership and redirect if current broker is not owner
+            validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES)
+                .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                .thenCompose(__ -> 
getTopicReferenceAsync(topicName).thenAccept(t -> {
+                    if (t == null) {
+                        resumeAsyncResponseExceptionally(asyncResponse, new 
RestException(Status.NOT_FOUND,
+                                "Topic not found"));
+                        return;
+                    }
+                    if (!(t instanceof PersistentTopic)) {
+                        resumeAsyncResponseExceptionally(asyncResponse, new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                "Expire messages for all subscriptions on a 
non-persistent topic is not allowed"));
+                        return;
+                    }
+                    PersistentTopic topic = (PersistentTopic) t;
+                    final List<CompletableFuture<Void>> futures =
+                            Lists.newArrayListWithCapacity((int) 
topic.getReplicators().size());
+                    List<String> subNames =
+                            Lists.newArrayListWithCapacity((int) 
topic.getReplicators().size()
+                                    + (int) topic.getSubscriptions().size());
+                    subNames.addAll(topic.getReplicators().keys());
+                    subNames.addAll(topic.getSubscriptions().keys());
+                    for (int i = 0; i < subNames.size(); i++) {
                         try {
-                            futures.add(pulsar()
-                                    .getAdminClient()
-                                    .topics()
-                                    .expireMessagesForAllSubscriptionsAsync(
-                                            topicNamePartition.toString(), 
expireTimeInSeconds));
+                            
futures.add(internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata,
+                                    subNames.get(i), expireTimeInSeconds, 
authoritative));
                         } catch (Exception e) {
-                            log.error("[{}] Failed to expire messages up to {} 
on {}",
-                                    clientAppId(), expireTimeInSeconds,
-                                    topicNamePartition, e);
+                            log.error("[{}] Failed to expire messages for all 
subscription up to {} on {}",
+                                    clientAppId(), expireTimeInSeconds, 
topicName, e);
                             asyncResponse.resume(new RestException(e));
                             return;
                         }
                     }
 
                     FutureUtil.waitForAll(futures).handle((result, exception) 
-> {
                         if (exception != null) {
-                            Throwable t = exception.getCause();
-                            log.error("[{}] Failed to expire messages up to {} 
on {}",
-                                    clientAppId(), expireTimeInSeconds,
-                                    topicName, t);
-                            asyncResponse.resume(new RestException(t));
+                            Throwable throwable = exception.getCause();
+                            log.error("[{}] Failed to expire messages for all 
subscription up to {} on {}",
+                                    clientAppId(), expireTimeInSeconds, 
topicName, throwable);
+                            asyncResponse.resume(new RestException(throwable));
                             return null;
                         }
-
                         asyncResponse.resume(Response.noContent().build());
                         return null;
                     });
-                } else {
-                    
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
-                            expireTimeInSeconds, authoritative);
-                }
-            }).exceptionally(ex -> {
-                log.error("[{}] Failed to expire messages for all subscription 
on topic {}",
-                        clientAppId(), topicName, ex);
-                resumeAsyncResponseExceptionally(asyncResponse, ex);
-                return null;
-            });
-        }
-    }
 
-    private void 
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(AsyncResponse 
asyncResponse,
-                                                                               
  int expireTimeInSeconds,
-                                                                               
  boolean authoritative) {
-        // validate ownership and redirect if current broker is not owner
-        PersistentTopic topic;
-        try {
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES);
-            topic = (PersistentTopic) getTopicReference(topicName);
-        } catch (WebApplicationException wae) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Failed to expire messages for all subscription 
on topic {},"
-                                + " redirecting to other brokers.",
-                        clientAppId(), topicName, wae);
-            }
-            resumeAsyncResponseExceptionally(asyncResponse, wae);
-            return;
-        } catch (Exception e) {
-            log.error("[{}] Failed to expire messages for all subscription on 
topic {}",
-                    clientAppId(), topicName, e);
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-        final AtomicReference<Throwable> exception = new AtomicReference<>();
-
-        topic.getReplicators().forEach((subName, replicator) -> {
-            try {
-                internalExpireMessagesByTimestampForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
-            } catch (Throwable t) {
-                exception.set(t);
-            }
-        });
-
-        topic.getSubscriptions().forEach((subName, subscriber) -> {
-            try {
-                internalExpireMessagesByTimestampForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
-            } catch (Throwable t) {
-                exception.set(t);
-            }
-        });
-
-        if (exception.get() != null) {
-            if (exception.get() instanceof WebApplicationException) {
-                WebApplicationException wae = (WebApplicationException) 
exception.get();
-                asyncResponse.resume(wae);
-                return;
-            } else {
-                asyncResponse.resume(new RestException(exception.get()));
-                return;
+                        })
+                ).exceptionally(e -> {
+                        Throwable throwable = e.getCause();

Review comment:
       Can we use `FutureUtil.unwrapCompletionException(ex);` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to