AnonHxy commented on code in PR #16464:
URL: https://github.com/apache/pulsar/pull/16464#discussion_r918478698


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -4920,73 +4920,45 @@ protected void handleTopicPolicyException(String 
methodName, Throwable thr, Asyn
         resumeAsyncResponseExceptionally(asyncResponse, cause);
     }
 
-    protected void internalTruncateNonPartitionedTopic(AsyncResponse 
asyncResponse, boolean authoritative) {
-        Topic topic;
-        try {
-            validateAdminAccessForTenant(topicName.getTenant());
-            validateTopicOwnership(topicName, authoritative);
-            topic = getTopicReference(topicName);
-        } catch (Exception e) {
-            log.error("[{}] Failed to truncate topic {}", clientAppId(), 
topicName, e);
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-        CompletableFuture<Void> future = topic.truncate();
-        future.thenAccept(a -> {
-            asyncResponse.resume(new 
RestException(Response.Status.NO_CONTENT.getStatusCode(),
-                    Response.Status.NO_CONTENT.getReasonPhrase()));
-        }).exceptionally(e -> {
-            asyncResponse.resume(e);
-            return null;
-        });
+    protected CompletableFuture<Void> 
internalTruncateNonPartitionedTopicAsync(boolean authoritative) {
+        return validateAdminAccessForTenantAsync(topicName.getTenant())
+            .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+            .thenCompose(__ -> getTopicReferenceAsync(topicName))
+            .thenCompose(Topic::truncate);
     }
 
-    protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean 
authoritative) {
+    protected CompletableFuture<Void> internalTruncateTopicAsync(boolean 
authoritative) {
 
         // If the topic name is a partition name, no need to get partition 
topic metadata again
         if (topicName.isPartitioned()) {
-            internalTruncateNonPartitionedTopic(asyncResponse, authoritative);
+            return internalTruncateNonPartitionedTopicAsync(authoritative);
         } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, 
false).whenComplete((meta, t) -> {
+            return getPartitionedTopicMetadataAsync(topicName, authoritative, 
false).thenCompose(meta -> {
                 if (meta.partitions > 0) {
-                    final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+                    final List<CompletableFuture<Void>> futures = new 
ArrayList<>(meta.partitions);
                     for (int i = 0; i < meta.partitions; i++) {
                         TopicName topicNamePartition = 
topicName.getPartition(i);
                         try {
-                            futures.add(pulsar().getAdminClient().topics()
+                            futures.add(
+                                pulsar().getAdminClient().topics()
                                 .truncateAsync(topicNamePartition.toString()));
                         } catch (Exception e) {
                             log.error("[{}] Failed to truncate topic {}", 
clientAppId(), topicNamePartition, e);
-                            asyncResponse.resume(new RestException(e));
-                            return;
+                            return FutureUtil.failedFuture(new 
RestException(e));
                         }
                     }
-                    FutureUtil.waitForAll(futures).handle((result, exception) 
-> {
+                    return FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
                         if (exception != null) {
                             Throwable th = exception.getCause();
                             if (th instanceof NotFoundException) {
-                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND, th.getMessage()));
-                            } else if (th instanceof WebApplicationException) {
-                                asyncResponse.resume(th);
-                            } else {
-                                log.error("[{}] Failed to truncate topic {}", 
clientAppId(), topicName, exception);
-                                asyncResponse.resume(new 
RestException(exception));
+                                throw new RestException(Status.NOT_FOUND, 
th.getMessage());

Review Comment:
   Fixed .PTAL @Technoboy- 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to