codelipenghui commented on a change in pull request #13845:
URL: https://github.com/apache/pulsar/pull/13845#discussion_r788346085



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -714,70 +714,73 @@ protected void 
internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
 
     protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean 
authoritative) {
         log.info("[{}] Unloading topic {}", clientAppId(), topicName);
-        try {
-            if (topicName.isGlobal()) {
-                validateGlobalNamespaceOwnership(namespaceName);
-            }
-        } catch (Exception e) {
-            log.error("[{}] Failed to unload topic {}", clientAppId(), 
topicName, e);
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
-                internalUnloadTransactionCoordinator(asyncResponse, 
authoritative);
-            } else {
-                internalUnloadNonPartitionedTopic(asyncResponse, 
authoritative);
-            }
-        } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
-                    .thenAccept(meta -> {
-                        if (meta.partitions > 0) {
-                            final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
-
-                            for (int i = 0; i < meta.partitions; i++) {
-                                TopicName topicNamePartition = 
topicName.getPartition(i);
-                                try {
-                                    
futures.add(pulsar().getAdminClient().topics().unloadAsync(
-                                            topicNamePartition.toString()));
-                                } catch (Exception e) {
-                                    log.error("[{}] Failed to unload topic 
{}", clientAppId(), topicNamePartition, e);
-                                    asyncResponse.resume(new RestException(e));
-                                    return;
-                                }
-                            }
-
-                            FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
-                                if (exception != null) {
-                                    Throwable th = exception.getCause();
-                                    if (th instanceof NotFoundException) {
-                                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND, th.getMessage()));
-                                    } else if (th instanceof 
WebApplicationException) {
-                                        asyncResponse.resume(th);
-                                    } else {
-                                        log.error("[{}] Failed to unload topic 
{}", clientAppId(), topicName,
-                                                exception);
-                                        asyncResponse.resume(new 
RestException(exception));
-                                    }
-                                } else {
-                                    
asyncResponse.resume(Response.noContent().build());
-                                }
-                                return null;
-                            });
-                        } else {
-                            internalUnloadNonPartitionedTopic(asyncResponse, 
authoritative);
-                        }
-                    }).exceptionally(t -> {
-                log.error("[{}] Failed to unload topic {}", clientAppId(), 
topicName, t);
-                if (t instanceof WebApplicationException) {
-                    asyncResponse.resume(t);
-                } else {
-                    asyncResponse.resume(new RestException(t));
-                }
-                return null;
-            });
-        }
+        CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
+        if (topicName.isGlobal()) {
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        }
+       future.thenAccept(__ ->{
+           // If the topic name is a partition name, no need to get partition 
topic metadata again
+           if (topicName.isPartitioned()) {
+               if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
+                   internalUnloadTransactionCoordinatorAsync(asyncResponse, 
authoritative);
+               } else {
+                   internalUnloadNonPartitionedTopicAsync(asyncResponse, 
authoritative);
+               }
+           } else {
+               getPartitionedTopicMetadataAsync(topicName, authoritative, 
false)
+                       .thenAccept(meta -> {
+                           if (meta.partitions > 0) {
+                               final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();

Review comment:
       If we know the partitions, we'd better specify the size of the array 
list to avoid the expansion of the array list.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -714,70 +714,73 @@ protected void 
internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
 
     protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean 
authoritative) {
         log.info("[{}] Unloading topic {}", clientAppId(), topicName);
-        try {
-            if (topicName.isGlobal()) {
-                validateGlobalNamespaceOwnership(namespaceName);
-            }
-        } catch (Exception e) {
-            log.error("[{}] Failed to unload topic {}", clientAppId(), 
topicName, e);
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
-                internalUnloadTransactionCoordinator(asyncResponse, 
authoritative);
-            } else {
-                internalUnloadNonPartitionedTopic(asyncResponse, 
authoritative);
-            }
-        } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
-                    .thenAccept(meta -> {
-                        if (meta.partitions > 0) {
-                            final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
-
-                            for (int i = 0; i < meta.partitions; i++) {
-                                TopicName topicNamePartition = 
topicName.getPartition(i);
-                                try {
-                                    
futures.add(pulsar().getAdminClient().topics().unloadAsync(
-                                            topicNamePartition.toString()));
-                                } catch (Exception e) {
-                                    log.error("[{}] Failed to unload topic 
{}", clientAppId(), topicNamePartition, e);
-                                    asyncResponse.resume(new RestException(e));
-                                    return;
-                                }
-                            }
-
-                            FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
-                                if (exception != null) {
-                                    Throwable th = exception.getCause();
-                                    if (th instanceof NotFoundException) {
-                                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND, th.getMessage()));
-                                    } else if (th instanceof 
WebApplicationException) {
-                                        asyncResponse.resume(th);
-                                    } else {
-                                        log.error("[{}] Failed to unload topic 
{}", clientAppId(), topicName,
-                                                exception);
-                                        asyncResponse.resume(new 
RestException(exception));
-                                    }
-                                } else {
-                                    
asyncResponse.resume(Response.noContent().build());
-                                }
-                                return null;
-                            });
-                        } else {
-                            internalUnloadNonPartitionedTopic(asyncResponse, 
authoritative);
-                        }
-                    }).exceptionally(t -> {
-                log.error("[{}] Failed to unload topic {}", clientAppId(), 
topicName, t);
-                if (t instanceof WebApplicationException) {
-                    asyncResponse.resume(t);
-                } else {
-                    asyncResponse.resume(new RestException(t));
-                }
-                return null;
-            });
-        }
+        CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
+        if (topicName.isGlobal()) {
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        }
+       future.thenAccept(__ ->{
+           // If the topic name is a partition name, no need to get partition 
topic metadata again
+           if (topicName.isPartitioned()) {
+               if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
+                   internalUnloadTransactionCoordinatorAsync(asyncResponse, 
authoritative);
+               } else {
+                   internalUnloadNonPartitionedTopicAsync(asyncResponse, 
authoritative);
+               }
+           } else {
+               getPartitionedTopicMetadataAsync(topicName, authoritative, 
false)
+                       .thenAccept(meta -> {
+                           if (meta.partitions > 0) {
+                               final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+
+                               for (int i = 0; i < meta.partitions; i++) {
+                                   TopicName topicNamePartition = 
topicName.getPartition(i);
+                                   try {
+                                       
futures.add(pulsar().getAdminClient().topics().unloadAsync(
+                                               topicNamePartition.toString()));
+                                   } catch (Exception e) {
+                                       log.error("[{}] Failed to unload topic 
{}", clientAppId(),
+                                               topicNamePartition, e);
+                                       asyncResponse.resume(new 
RestException(e));
+                                       return;
+                                   }
+                               }
+
+                               FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
+                                   if (exception != null) {
+                                       Throwable th = exception.getCause();
+                                       if (th instanceof NotFoundException) {
+                                           asyncResponse.resume(new 
RestException(Status.NOT_FOUND, th.getMessage()));
+                                       } else if (th instanceof 
WebApplicationException) {
+                                           asyncResponse.resume(th);
+                                       } else {
+                                           log.error("[{}] Failed to unload 
topic {}", clientAppId(), topicName,
+                                                   exception);
+                                           asyncResponse.resume(new 
RestException(exception));
+                                       }
+                                   } else {
+                                       
asyncResponse.resume(Response.noContent().build());
+                                   }
+                                   return null;
+                               });
+                           } else {
+                               
internalUnloadNonPartitionedTopicAsync(asyncResponse, authoritative);
+                           }
+                       }).exceptionally(t -> {
+                           log.error("[{}] Failed to unload topic {}", 
clientAppId(), topicName, t);
+                           if (t instanceof WebApplicationException) {
+                               asyncResponse.resume(t);
+                           } else {
+                               asyncResponse.resume(new RestException(t));
+                           }
+                           return null;
+                       });
+           }
+       }).exceptionally(ex -> {
+           Throwable cause = ex.getCause();
+           log.error("[{}] Failed to unload topic {}", clientAppId(), 
topicName, cause);

Review comment:
       Failed to validate the global namespace ownership while unloading topic

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -714,70 +714,73 @@ protected void 
internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
 
     protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean 
authoritative) {
         log.info("[{}] Unloading topic {}", clientAppId(), topicName);
-        try {
-            if (topicName.isGlobal()) {
-                validateGlobalNamespaceOwnership(namespaceName);
-            }
-        } catch (Exception e) {
-            log.error("[{}] Failed to unload topic {}", clientAppId(), 
topicName, e);
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
-                internalUnloadTransactionCoordinator(asyncResponse, 
authoritative);
-            } else {
-                internalUnloadNonPartitionedTopic(asyncResponse, 
authoritative);
-            }
-        } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
-                    .thenAccept(meta -> {
-                        if (meta.partitions > 0) {
-                            final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
-
-                            for (int i = 0; i < meta.partitions; i++) {
-                                TopicName topicNamePartition = 
topicName.getPartition(i);
-                                try {
-                                    
futures.add(pulsar().getAdminClient().topics().unloadAsync(
-                                            topicNamePartition.toString()));
-                                } catch (Exception e) {
-                                    log.error("[{}] Failed to unload topic 
{}", clientAppId(), topicNamePartition, e);
-                                    asyncResponse.resume(new RestException(e));
-                                    return;
-                                }
-                            }
-
-                            FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
-                                if (exception != null) {
-                                    Throwable th = exception.getCause();
-                                    if (th instanceof NotFoundException) {
-                                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND, th.getMessage()));
-                                    } else if (th instanceof 
WebApplicationException) {
-                                        asyncResponse.resume(th);
-                                    } else {
-                                        log.error("[{}] Failed to unload topic 
{}", clientAppId(), topicName,
-                                                exception);
-                                        asyncResponse.resume(new 
RestException(exception));
-                                    }
-                                } else {
-                                    
asyncResponse.resume(Response.noContent().build());
-                                }
-                                return null;
-                            });
-                        } else {
-                            internalUnloadNonPartitionedTopic(asyncResponse, 
authoritative);
-                        }
-                    }).exceptionally(t -> {
-                log.error("[{}] Failed to unload topic {}", clientAppId(), 
topicName, t);
-                if (t instanceof WebApplicationException) {
-                    asyncResponse.resume(t);
-                } else {
-                    asyncResponse.resume(new RestException(t));
-                }
-                return null;
-            });
-        }
+        CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
+        if (topicName.isGlobal()) {
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        }
+       future.thenAccept(__ ->{
+           // If the topic name is a partition name, no need to get partition 
topic metadata again
+           if (topicName.isPartitioned()) {
+               if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
+                   internalUnloadTransactionCoordinatorAsync(asyncResponse, 
authoritative);
+               } else {
+                   internalUnloadNonPartitionedTopicAsync(asyncResponse, 
authoritative);
+               }
+           } else {
+               getPartitionedTopicMetadataAsync(topicName, authoritative, 
false)
+                       .thenAccept(meta -> {
+                           if (meta.partitions > 0) {
+                               final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+
+                               for (int i = 0; i < meta.partitions; i++) {
+                                   TopicName topicNamePartition = 
topicName.getPartition(i);
+                                   try {
+                                       
futures.add(pulsar().getAdminClient().topics().unloadAsync(
+                                               topicNamePartition.toString()));
+                                   } catch (Exception e) {
+                                       log.error("[{}] Failed to unload topic 
{}", clientAppId(),
+                                               topicNamePartition, e);
+                                       asyncResponse.resume(new 
RestException(e));
+                                       return;
+                                   }
+                               }
+
+                               FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
+                                   if (exception != null) {
+                                       Throwable th = exception.getCause();
+                                       if (th instanceof NotFoundException) {
+                                           asyncResponse.resume(new 
RestException(Status.NOT_FOUND, th.getMessage()));
+                                       } else if (th instanceof 
WebApplicationException) {
+                                           asyncResponse.resume(th);
+                                       } else {
+                                           log.error("[{}] Failed to unload 
topic {}", clientAppId(), topicName,
+                                                   exception);
+                                           asyncResponse.resume(new 
RestException(exception));
+                                       }
+                                   } else {
+                                       
asyncResponse.resume(Response.noContent().build());
+                                   }
+                                   return null;
+                               });
+                           } else {
+                               
internalUnloadNonPartitionedTopicAsync(asyncResponse, authoritative);
+                           }
+                       }).exceptionally(t -> {
+                           log.error("[{}] Failed to unload topic {}", 
clientAppId(), topicName, t);

Review comment:
       Should be `Failed to get partitioned metadata while unloading topic {}`

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -969,50 +972,42 @@ protected void internalUnloadTopic(AsyncResponse 
asyncResponse, boolean authorit
             });
     }
 
-    private void internalUnloadNonPartitionedTopic(AsyncResponse 
asyncResponse, boolean authoritative) {
-        try {
-            validateTopicOperation(topicName, TopicOperation.UNLOAD);
-        } catch (Exception e) {
-            log.error("[{}] Failed to unload topic {},{}", clientAppId(), 
topicName, e.getMessage());
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-
-        validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(__ -> getTopicReferenceAsync(topicName))
-                .thenCompose(topic -> topic.close(false))
-                .thenRun(() -> {
-                    log.info("[{}] Successfully unloaded topic {}", 
clientAppId(), topicName);
-                    asyncResponse.resume(Response.noContent().build());
-                })
+    private void internalUnloadNonPartitionedTopicAsync(AsyncResponse 
asyncResponse, boolean authoritative) {
+        validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
+                .thenCompose(unused -> validateTopicOwnershipAsync(topicName, 
authoritative)
+                        .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                        .thenCompose(topic -> topic.close(false))
+                        .thenRun(() -> {
+                            log.info("[{}] Successfully unloaded topic {}", 
clientAppId(), topicName);
+                            asyncResponse.resume(Response.noContent().build());
+                        }))
                 .exceptionally(ex -> {
-                    log.error("[{}] Failed to unload topic {}, {}", 
clientAppId(), topicName, ex.getMessage());
-                    asyncResponse.resume(ex.getCause());
+                    Throwable cause = ex.getCause();
+                    log.error("[{}] Failed to unload topic {}, {}", 
clientAppId(), topicName, cause);
+                    resumeAsyncResponseExceptionally(asyncResponse, cause);
                     return null;
                 });
     }
 
-    private void internalUnloadTransactionCoordinator(AsyncResponse 
asyncResponse, boolean authoritative) {
-        try {
-            validateTopicOperation(topicName, TopicOperation.UNLOAD);
-        } catch (Exception e) {
-            log.error("[{}] Failed to unload tc {},{}", clientAppId(), 
topicName.getPartitionIndex(), e.getMessage());
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-        validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(v -> pulsar()
-                .getTransactionMetadataStoreService()
-                
.removeTransactionMetadataStore(TransactionCoordinatorID.get(topicName.getPartitionIndex())))
-                .thenRun(() -> {
-                    log.info("[{}] Successfully unloaded tc {}", 
clientAppId(), topicName.getPartitionIndex());
-                    asyncResponse.resume(Response.noContent().build());
-                }).exceptionally(ex -> {
-                    log.error("[{}] Failed to unload tc {}, {}", 
clientAppId(), topicName.getPartitionIndex(),
-                    ex.getMessage());
-            asyncResponse.resume(ex.getCause());
-            return null;
-        });
+    private void internalUnloadTransactionCoordinatorAsync(AsyncResponse 
asyncResponse, boolean authoritative) {
+        validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)

Review comment:
       Hmmm, looks like we make the wrong permission before, for the 
transaction coordinator, only the superuser can unload it. @congbobo184 Could 
you please help confirm?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -969,50 +972,42 @@ protected void internalUnloadTopic(AsyncResponse 
asyncResponse, boolean authorit
             });
     }
 
-    private void internalUnloadNonPartitionedTopic(AsyncResponse 
asyncResponse, boolean authoritative) {
-        try {
-            validateTopicOperation(topicName, TopicOperation.UNLOAD);
-        } catch (Exception e) {
-            log.error("[{}] Failed to unload topic {},{}", clientAppId(), 
topicName, e.getMessage());
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-
-        validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(__ -> getTopicReferenceAsync(topicName))
-                .thenCompose(topic -> topic.close(false))
-                .thenRun(() -> {
-                    log.info("[{}] Successfully unloaded topic {}", 
clientAppId(), topicName);
-                    asyncResponse.resume(Response.noContent().build());
-                })
+    private void internalUnloadNonPartitionedTopicAsync(AsyncResponse 
asyncResponse, boolean authoritative) {
+        validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
+                .thenCompose(unused -> validateTopicOwnershipAsync(topicName, 
authoritative)
+                        .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                        .thenCompose(topic -> topic.close(false))
+                        .thenRun(() -> {
+                            log.info("[{}] Successfully unloaded topic {}", 
clientAppId(), topicName);
+                            asyncResponse.resume(Response.noContent().build());
+                        }))
                 .exceptionally(ex -> {
-                    log.error("[{}] Failed to unload topic {}, {}", 
clientAppId(), topicName, ex.getMessage());
-                    asyncResponse.resume(ex.getCause());
+                    Throwable cause = ex.getCause();
+                    log.error("[{}] Failed to unload topic {}, {}", 
clientAppId(), topicName, cause);
+                    resumeAsyncResponseExceptionally(asyncResponse, cause);
                     return null;
                 });
     }
 
-    private void internalUnloadTransactionCoordinator(AsyncResponse 
asyncResponse, boolean authoritative) {
-        try {
-            validateTopicOperation(topicName, TopicOperation.UNLOAD);
-        } catch (Exception e) {
-            log.error("[{}] Failed to unload tc {},{}", clientAppId(), 
topicName.getPartitionIndex(), e.getMessage());
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-        validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(v -> pulsar()
-                .getTransactionMetadataStoreService()
-                
.removeTransactionMetadataStore(TransactionCoordinatorID.get(topicName.getPartitionIndex())))
-                .thenRun(() -> {
-                    log.info("[{}] Successfully unloaded tc {}", 
clientAppId(), topicName.getPartitionIndex());
-                    asyncResponse.resume(Response.noContent().build());
-                }).exceptionally(ex -> {
-                    log.error("[{}] Failed to unload tc {}, {}", 
clientAppId(), topicName.getPartitionIndex(),
-                    ex.getMessage());
-            asyncResponse.resume(ex.getCause());
-            return null;
-        });
+    private void internalUnloadTransactionCoordinatorAsync(AsyncResponse 
asyncResponse, boolean authoritative) {
+        validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)

Review comment:
       It's ok to push a separate PR to fix this one, to make this PR more 
focus on what it wants to do

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -714,70 +714,73 @@ protected void 
internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
 
     protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean 
authoritative) {
         log.info("[{}] Unloading topic {}", clientAppId(), topicName);
-        try {
-            if (topicName.isGlobal()) {
-                validateGlobalNamespaceOwnership(namespaceName);
-            }
-        } catch (Exception e) {
-            log.error("[{}] Failed to unload topic {}", clientAppId(), 
topicName, e);
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
-                internalUnloadTransactionCoordinator(asyncResponse, 
authoritative);
-            } else {
-                internalUnloadNonPartitionedTopic(asyncResponse, 
authoritative);
-            }
-        } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
-                    .thenAccept(meta -> {
-                        if (meta.partitions > 0) {
-                            final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
-
-                            for (int i = 0; i < meta.partitions; i++) {
-                                TopicName topicNamePartition = 
topicName.getPartition(i);
-                                try {
-                                    
futures.add(pulsar().getAdminClient().topics().unloadAsync(
-                                            topicNamePartition.toString()));
-                                } catch (Exception e) {
-                                    log.error("[{}] Failed to unload topic 
{}", clientAppId(), topicNamePartition, e);
-                                    asyncResponse.resume(new RestException(e));
-                                    return;
-                                }
-                            }
-
-                            FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
-                                if (exception != null) {
-                                    Throwable th = exception.getCause();
-                                    if (th instanceof NotFoundException) {
-                                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND, th.getMessage()));
-                                    } else if (th instanceof 
WebApplicationException) {
-                                        asyncResponse.resume(th);
-                                    } else {
-                                        log.error("[{}] Failed to unload topic 
{}", clientAppId(), topicName,
-                                                exception);
-                                        asyncResponse.resume(new 
RestException(exception));
-                                    }
-                                } else {
-                                    
asyncResponse.resume(Response.noContent().build());
-                                }
-                                return null;
-                            });
-                        } else {
-                            internalUnloadNonPartitionedTopic(asyncResponse, 
authoritative);
-                        }
-                    }).exceptionally(t -> {
-                log.error("[{}] Failed to unload topic {}", clientAppId(), 
topicName, t);
-                if (t instanceof WebApplicationException) {
-                    asyncResponse.resume(t);
-                } else {
-                    asyncResponse.resume(new RestException(t));
-                }
-                return null;
-            });
-        }
+        CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
+        if (topicName.isGlobal()) {
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        }

Review comment:
       ```suggestion
           CompletableFuture<Void> future = null;
           if (topicName.isGlobal()) {
               future = validateGlobalNamespaceOwnershipAsync(namespaceName);
           } else {
               future = CompletableFuture.completedFuture(null);
           }
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -714,70 +714,73 @@ protected void 
internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
 
     protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean 
authoritative) {
         log.info("[{}] Unloading topic {}", clientAppId(), topicName);
-        try {
-            if (topicName.isGlobal()) {
-                validateGlobalNamespaceOwnership(namespaceName);
-            }
-        } catch (Exception e) {
-            log.error("[{}] Failed to unload topic {}", clientAppId(), 
topicName, e);
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
-                internalUnloadTransactionCoordinator(asyncResponse, 
authoritative);
-            } else {
-                internalUnloadNonPartitionedTopic(asyncResponse, 
authoritative);
-            }
-        } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
-                    .thenAccept(meta -> {
-                        if (meta.partitions > 0) {
-                            final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
-
-                            for (int i = 0; i < meta.partitions; i++) {
-                                TopicName topicNamePartition = 
topicName.getPartition(i);
-                                try {
-                                    
futures.add(pulsar().getAdminClient().topics().unloadAsync(
-                                            topicNamePartition.toString()));
-                                } catch (Exception e) {
-                                    log.error("[{}] Failed to unload topic 
{}", clientAppId(), topicNamePartition, e);
-                                    asyncResponse.resume(new RestException(e));
-                                    return;
-                                }
-                            }
-
-                            FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
-                                if (exception != null) {
-                                    Throwable th = exception.getCause();
-                                    if (th instanceof NotFoundException) {
-                                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND, th.getMessage()));
-                                    } else if (th instanceof 
WebApplicationException) {
-                                        asyncResponse.resume(th);
-                                    } else {
-                                        log.error("[{}] Failed to unload topic 
{}", clientAppId(), topicName,
-                                                exception);
-                                        asyncResponse.resume(new 
RestException(exception));
-                                    }
-                                } else {
-                                    
asyncResponse.resume(Response.noContent().build());
-                                }
-                                return null;
-                            });
-                        } else {
-                            internalUnloadNonPartitionedTopic(asyncResponse, 
authoritative);
-                        }
-                    }).exceptionally(t -> {
-                log.error("[{}] Failed to unload topic {}", clientAppId(), 
topicName, t);
-                if (t instanceof WebApplicationException) {
-                    asyncResponse.resume(t);
-                } else {
-                    asyncResponse.resume(new RestException(t));
-                }
-                return null;
-            });
-        }
+        CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
+        if (topicName.isGlobal()) {
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        }
+       future.thenAccept(__ ->{

Review comment:
       ```suggestion
          future.thenAccept(__ -> {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to