Technoboy- commented on a change in pull request #13880:
URL: https://github.com/apache/pulsar/pull/13880#discussion_r792460904



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1802,123 +1802,141 @@ protected void internalSkipMessages(String subName, 
int numMessages, boolean aut
 
     protected void internalExpireMessagesForAllSubscriptions(AsyncResponse 
asyncResponse, int expireTimeInSeconds,
             boolean authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            try {
-                validateGlobalNamespaceOwnership(namespaceName);
-            } catch (Exception e) {
-                log.error("[{}] Failed to expire messages for all subscription 
on topic {}",
-                        clientAppId(), topicName, e);
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-                return;
-            }
-        }
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
-                    expireTimeInSeconds, authoritative);
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
         } else {
-            getPartitionedTopicMetadataAsync(topicName,
-                    authoritative, false).thenAccept(partitionMetadata -> {
-                if (partitionMetadata.partitions > 0) {
-                    final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+            future = CompletableFuture.completedFuture(null);
+        }
+        future.thenAccept(__ ->
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                .thenAccept(partitionMetadata -> {
+                    if (topicName.isPartitioned()) {
+                        
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
+                                partitionMetadata, expireTimeInSeconds, 
authoritative);
+                    } else {
+                        if (partitionMetadata.partitions > 0) {
+                            final List<CompletableFuture<Void>> futures =
+                                    
Lists.newArrayListWithCapacity(partitionMetadata.partitions);
 
-                    // expire messages for each partition topic
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
-                        TopicName topicNamePartition = 
topicName.getPartition(i);
+                            // expire messages for each partition topic
+                            for (int i = 0; i < partitionMetadata.partitions; 
i++) {
+                                TopicName topicNamePartition = 
topicName.getPartition(i);
+                                try {
+                                    futures.add(pulsar()
+                                            .getAdminClient()
+                                            .topics()
+                                            
.expireMessagesForAllSubscriptionsAsync(
+                                                    
topicNamePartition.toString(), expireTimeInSeconds));
+                                } catch (Exception e) {
+                                    log.error("[{}] Failed to expire messages 
up to {} on {}",
+                                            clientAppId(), expireTimeInSeconds,
+                                            topicNamePartition, e);
+                                    asyncResponse.resume(new RestException(e));
+                                    return;
+                                }
+                            }
+
+                            FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
+                                if (exception != null) {
+                                    Throwable t = exception.getCause();
+                                    log.error("[{}] Failed to expire messages 
up to {} on {}",
+                                            clientAppId(), expireTimeInSeconds,
+                                            topicName, t);
+                                    asyncResponse.resume(new RestException(t));
+                                    return null;
+                                }
+                                
asyncResponse.resume(Response.noContent().build());
+                                return null;
+                            });
+                        } else {
+                            
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
+                                    partitionMetadata, expireTimeInSeconds, 
authoritative);
+                        }
+                    }
+                }
+             )
+        ).exceptionally(ex -> {
+            Throwable cause = FutureUtil.unwrapCompletionException(ex);
+            log.error("[{}] Failed to expire messages for all subscription on 
topic {}", clientAppId(), topicName,
+                    cause);
+            resumeAsyncResponseExceptionally(asyncResponse, cause);
+            return null;
+        });
+
+    }
+
+    private void 
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(AsyncResponse 
asyncResponse,
+                                                                               
  PartitionedTopicMetadata
+                                                                               
          partitionMetadata,
+                                                                               
  int expireTimeInSeconds,
+                                                                               
  boolean authoritative) {
+        try {
+            // validate ownership and redirect if current broker is not owner
+            validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES)
+                .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                .thenAccept(__ -> 
getTopicReferenceAsync(topicName).thenAccept(t -> {

Review comment:
       I think we should use thenCompose here:
   .thenCompose(__ -> getTopicReferenceAsync(topicName)).xxx




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to