AnonHxy commented on a change in pull request #13901:
URL: https://github.com/apache/pulsar/pull/13901#discussion_r790138568



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1651,118 +1651,123 @@ private void 
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncRes
     }
 
     protected void internalSkipAllMessages(AsyncResponse asyncResponse, String 
subName, boolean authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            try {
-                validateGlobalNamespaceOwnership(namespaceName);
-            } catch (Exception e) {
-                log.error("[{}] Failed to skip all messages for subscription 
{} on topic {}",
-                        clientAppId(), subName, topicName, e);
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-                return;
-            }
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            future = CompletableFuture.completedFuture(null);
         }
 
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.SKIP, subName);
-
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, 
subName, authoritative);
-        } else {
-            getPartitionedTopicMetadataAsync(topicName,
-                    authoritative, false).thenAccept(partitionMetadata -> {
-                if (partitionMetadata.partitions > 0) {
-                    final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+        future.thenRun(() -> validateTopicOwnershipAsync(topicName, 
authoritative))
+            .thenRun(() -> validateTopicOperationAsync(topicName, 
TopicOperation.SKIP, subName))
+            .thenRun(() -> {
+                // If the topic name is a partition name, no need to get 
partition topic metadata again
+                if (topicName.isPartitioned()) {
+                    
internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName, 
authoritative);
+                } else {
+                    getPartitionedTopicMetadataAsync(topicName,
+                        authoritative, false).thenAccept(partitionMetadata -> {
+                        if (partitionMetadata.partitions > 0) {
+                            final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
 
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
-                        TopicName topicNamePartition = 
topicName.getPartition(i);
-                        try {
-                            futures.add(pulsar()
-                                    .getAdminClient()
-                                    .topics()
-                                    
.skipAllMessagesAsync(topicNamePartition.toString(),
+                            for (int i = 0; i < partitionMetadata.partitions; 
i++) {
+                                TopicName topicNamePartition = 
topicName.getPartition(i);
+                                try {
+                                    futures.add(pulsar()
+                                        .getAdminClient()
+                                        .topics()
+                                        
.skipAllMessagesAsync(topicNamePartition.toString(),
                                             subName));
-                        } catch (Exception e) {
-                            log.error("[{}] Failed to skip all messages {} {}",
-                                    clientAppId(), topicNamePartition, 
subName, e);
-                            asyncResponse.resume(new RestException(e));
-                            return;
-                        }
-                    }
+                                } catch (Exception e) {
+                                    log.error("[{}] Failed to skip all 
messages {} {}",
+                                        clientAppId(), topicNamePartition, 
subName, e);
+                                    asyncResponse.resume(new RestException(e));
+                                    return;
+                                }
+                            }
 
-                    FutureUtil.waitForAll(futures).handle((result, exception) 
-> {
-                        if (exception != null) {
-                            Throwable t = exception.getCause();
-                            if (t instanceof NotFoundException) {
-                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Subscription not found"));
-                                return null;
-                            } else {
-                                log.error("[{}] Failed to skip all messages {} 
{}",
-                                        clientAppId(), topicName, subName, t);
-                                asyncResponse.resume(new RestException(t));
+                            FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
+                                if (exception != null) {
+                                    Throwable t = exception.getCause();
+                                    if (t instanceof NotFoundException) {
+                                        asyncResponse.resume(
+                                            new 
RestException(Status.NOT_FOUND, "Subscription not found"));
+                                        return null;
+                                    } else {
+                                        log.error("[{}] Failed to skip all 
messages {} {}",
+                                            clientAppId(), topicName, subName, 
t);
+                                        asyncResponse.resume(new 
RestException(t));
+                                        return null;
+                                    }
+                                }
+
+                                
asyncResponse.resume(Response.noContent().build());
                                 return null;
-                            }
+                            });
+                        } else {
+                            
internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName, 
authoritative);
                         }
-
-                        asyncResponse.resume(Response.noContent().build());
+                    }).exceptionally(ex -> {
+                        log.error("[{}] Failed to skip all messages for 
subscription {} on topic {}",
+                            clientAppId(), subName, topicName, ex);
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
                         return null;
                     });
-                } else {
-                    
internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, 
authoritative);
                 }
             }).exceptionally(ex -> {
                 log.error("[{}] Failed to skip all messages for subscription 
{} on topic {}",

Review comment:
       OK, I see there is a better method `FutureUtil.unwrapCompletionException`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to