Jason918 commented on a change in pull request #13880:
URL: https://github.com/apache/pulsar/pull/13880#discussion_r789695562



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -3445,6 +3465,68 @@ private void 
internalExpireMessagesByTimestampForSinglePartition(String subName,
         }
     }
 
+    private CompletableFuture<Void> 
internalExpireMessagesByTimestampForSinglePartitionAsync(

Review comment:
       `internalExpireMessagesByTimestampForSinglePartition` should be 
implemented by this async method.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -3317,74 +3317,94 @@ protected void 
internalTerminatePartitionedTopic(AsyncResponse asyncResponse, bo
 
     protected void internalExpireMessagesByTimestamp(AsyncResponse 
asyncResponse, String subName,
                                                      int expireTimeInSeconds, 
boolean authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            try {
-                internalExpireMessagesByTimestampForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
-            } catch (WebApplicationException wae) {
-                asyncResponse.resume(wae);
-                return;
-            } catch (Exception e) {
-                asyncResponse.resume(new RestException(e));
-                return;
-            }
-            asyncResponse.resume(Response.noContent().build());
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
         } else {
-            PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
-            if (partitionMetadata.partitions > 0) {
-                final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+            future = CompletableFuture.completedFuture(null);
+        }
+        future.thenAccept(__ -> {
+            // If the topic name is a partition name, no need to get partition 
topic metadata again
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false)

Review comment:
       Why do we move "getPartitionedTopicMetadataAsync" before `if 
(topicName.isPartitioned())` ? 

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -3317,74 +3317,94 @@ protected void 
internalTerminatePartitionedTopic(AsyncResponse asyncResponse, bo
 
     protected void internalExpireMessagesByTimestamp(AsyncResponse 
asyncResponse, String subName,
                                                      int expireTimeInSeconds, 
boolean authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            try {
-                internalExpireMessagesByTimestampForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
-            } catch (WebApplicationException wae) {
-                asyncResponse.resume(wae);
-                return;
-            } catch (Exception e) {
-                asyncResponse.resume(new RestException(e));
-                return;
-            }
-            asyncResponse.resume(Response.noContent().build());
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
         } else {
-            PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
-            if (partitionMetadata.partitions > 0) {
-                final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+            future = CompletableFuture.completedFuture(null);
+        }
+        future.thenAccept(__ -> {
+            // If the topic name is a partition name, no need to get partition 
topic metadata again
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                    .thenAccept(partitionMetadata -> {
+                        if (topicName.isPartitioned()) {
+                            
internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata, 
subName,
+                                    expireTimeInSeconds, authoritative)
+                                    .thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()))
+                                    .exceptionally(e -> {
+                                        Throwable cause = e.getCause();
+                                        log.error("[{}] Failed to expire 
messages up to {} on {}", clientAppId(),
+                                                expireTimeInSeconds, 
topicName, cause);
+                                        
resumeAsyncResponseExceptionally(asyncResponse, cause);
+                                        return null;
+                                    });
+                        } else {
+                            if (partitionMetadata.partitions > 0) {
+                                final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
 
-                // expire messages for each partition topic
-                for (int i = 0; i < partitionMetadata.partitions; i++) {
-                    TopicName topicNamePartition = topicName.getPartition(i);
-                    try {
-                        futures.add(pulsar()
-                                .getAdminClient()
-                                .topics()
-                                
.expireMessagesAsync(topicNamePartition.toString(),
-                                        subName, expireTimeInSeconds));
-                    } catch (Exception e) {
-                        log.error("[{}] Failed to expire messages up to {} on 
{}", clientAppId(), expireTimeInSeconds,
-                            topicNamePartition, e);
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-                }
+                                // expire messages for each partition topic
+                                for (int i = 0; i < 
partitionMetadata.partitions; i++) {
+                                    TopicName topicNamePartition = 
topicName.getPartition(i);
+                                    try {
+                                        futures.add(pulsar()
+                                                .getAdminClient()
+                                                .topics()
+                                                
.expireMessagesAsync(topicNamePartition.toString(),
+                                                        subName, 
expireTimeInSeconds));
+                                    } catch (Exception e) {
+                                        log.error("[{}] Failed to expire 
messages up to {} on {}", clientAppId(),
+                                                expireTimeInSeconds, 
topicNamePartition, e);
+                                        asyncResponse.resume(new 
RestException(e));
+                                        return;
+                                    }
+                                }
 
-                FutureUtil.waitForAll(futures).handle((result, exception) -> {
-                    if (exception != null) {
-                        Throwable t = exception.getCause();
-                        if (t instanceof NotFoundException) {
-                            asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Subscription not found"));
-                            return null;
-                        } else {
-                            log.error("[{}] Failed to expire messages up to {} 
on {}",
-                                    clientAppId(), expireTimeInSeconds,
-                                    topicName, t);
-                            asyncResponse.resume(new RestException(t));
-                            return null;
+                                FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
+                                    if (exception != null) {
+                                        Throwable t = exception.getCause();
+                                        if (t instanceof NotFoundException) {
+                                            asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
+                                                    "Subscription not found"));
+                                            return null;
+                                        } else {
+                                            log.error("[{}] Failed to expire 
messages up to {} on {}",
+                                                    clientAppId(), 
expireTimeInSeconds,
+                                                    topicName, t);
+                                            asyncResponse.resume(new 
RestException(t));
+                                            return null;
+                                        }
+                                    }
+                                    
asyncResponse.resume(Response.noContent().build());
+                                    return null;
+                                });
+                            } else {
+                                
internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata, 
subName,
+                                        expireTimeInSeconds, authoritative)
+                                        .thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()))
+                                        .exceptionally(e -> {
+                                            Throwable cause = e.getCause();
+                                            log.error("[{}] Failed to expire 
messages up to {} on {}", clientAppId(),
+                                                    expireTimeInSeconds, 
topicName, cause);
+                                            
resumeAsyncResponseExceptionally(asyncResponse, cause);
+                                            return null;
+                                        });
+                            }
                         }
-                    }
+                    }).exceptionally(e -> {
+                        Throwable cause = e.getCause();
+                        log.error("[{}] Failed to expire messages up to {} on 
{}", clientAppId(),
+                                expireTimeInSeconds, topicName, cause);
+                        resumeAsyncResponseExceptionally(asyncResponse, cause);
+                        return null;
+                    });
+        }).exceptionally(e -> {
+            Throwable cause = e.getCause();
+            log.error("[{}] Failed to validate global namespace ownership to 
expire messages up to {} on {}",
+                    clientAppId(), expireTimeInSeconds, topicName, cause);
+            resumeAsyncResponseExceptionally(asyncResponse, cause);
+            return null;
+        });

Review comment:
       Can we merge these two exceptionally ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to