mattisonchao commented on a change in pull request #13845:
URL: https://github.com/apache/pulsar/pull/13845#discussion_r788363349



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -714,70 +714,73 @@ protected void 
internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
 
     protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean 
authoritative) {
         log.info("[{}] Unloading topic {}", clientAppId(), topicName);
-        try {
-            if (topicName.isGlobal()) {
-                validateGlobalNamespaceOwnership(namespaceName);
-            }
-        } catch (Exception e) {
-            log.error("[{}] Failed to unload topic {}", clientAppId(), 
topicName, e);
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
-                internalUnloadTransactionCoordinator(asyncResponse, 
authoritative);
-            } else {
-                internalUnloadNonPartitionedTopic(asyncResponse, 
authoritative);
-            }
-        } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
-                    .thenAccept(meta -> {
-                        if (meta.partitions > 0) {
-                            final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
-
-                            for (int i = 0; i < meta.partitions; i++) {
-                                TopicName topicNamePartition = 
topicName.getPartition(i);
-                                try {
-                                    
futures.add(pulsar().getAdminClient().topics().unloadAsync(
-                                            topicNamePartition.toString()));
-                                } catch (Exception e) {
-                                    log.error("[{}] Failed to unload topic 
{}", clientAppId(), topicNamePartition, e);
-                                    asyncResponse.resume(new RestException(e));
-                                    return;
-                                }
-                            }
-
-                            FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
-                                if (exception != null) {
-                                    Throwable th = exception.getCause();
-                                    if (th instanceof NotFoundException) {
-                                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND, th.getMessage()));
-                                    } else if (th instanceof 
WebApplicationException) {
-                                        asyncResponse.resume(th);
-                                    } else {
-                                        log.error("[{}] Failed to unload topic 
{}", clientAppId(), topicName,
-                                                exception);
-                                        asyncResponse.resume(new 
RestException(exception));
-                                    }
-                                } else {
-                                    
asyncResponse.resume(Response.noContent().build());
-                                }
-                                return null;
-                            });
-                        } else {
-                            internalUnloadNonPartitionedTopic(asyncResponse, 
authoritative);
-                        }
-                    }).exceptionally(t -> {
-                log.error("[{}] Failed to unload topic {}", clientAppId(), 
topicName, t);
-                if (t instanceof WebApplicationException) {
-                    asyncResponse.resume(t);
-                } else {
-                    asyncResponse.resume(new RestException(t));
-                }
-                return null;
-            });
-        }
+        CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
+        if (topicName.isGlobal()) {
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        }
+       future.thenAccept(__ ->{
+           // If the topic name is a partition name, no need to get partition 
topic metadata again
+           if (topicName.isPartitioned()) {
+               if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
+                   internalUnloadTransactionCoordinatorAsync(asyncResponse, 
authoritative);
+               } else {
+                   internalUnloadNonPartitionedTopicAsync(asyncResponse, 
authoritative);
+               }
+           } else {
+               getPartitionedTopicMetadataAsync(topicName, authoritative, 
false)
+                       .thenAccept(meta -> {
+                           if (meta.partitions > 0) {
+                               final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+
+                               for (int i = 0; i < meta.partitions; i++) {
+                                   TopicName topicNamePartition = 
topicName.getPartition(i);
+                                   try {
+                                       
futures.add(pulsar().getAdminClient().topics().unloadAsync(
+                                               topicNamePartition.toString()));
+                                   } catch (Exception e) {
+                                       log.error("[{}] Failed to unload topic 
{}", clientAppId(),
+                                               topicNamePartition, e);
+                                       asyncResponse.resume(new 
RestException(e));
+                                       return;
+                                   }
+                               }
+
+                               FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
+                                   if (exception != null) {
+                                       Throwable th = exception.getCause();
+                                       if (th instanceof NotFoundException) {
+                                           asyncResponse.resume(new 
RestException(Status.NOT_FOUND, th.getMessage()));
+                                       } else if (th instanceof 
WebApplicationException) {
+                                           asyncResponse.resume(th);
+                                       } else {
+                                           log.error("[{}] Failed to unload 
topic {}", clientAppId(), topicName,
+                                                   exception);
+                                           asyncResponse.resume(new 
RestException(exception));
+                                       }
+                                   } else {
+                                       
asyncResponse.resume(Response.noContent().build());
+                                   }
+                                   return null;
+                               });
+                           } else {
+                               
internalUnloadNonPartitionedTopicAsync(asyncResponse, authoritative);
+                           }
+                       }).exceptionally(t -> {
+                           log.error("[{}] Failed to unload topic {}", 
clientAppId(), topicName, t);

Review comment:
       fixed

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -714,70 +714,73 @@ protected void 
internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
 
     protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean 
authoritative) {
         log.info("[{}] Unloading topic {}", clientAppId(), topicName);
-        try {
-            if (topicName.isGlobal()) {
-                validateGlobalNamespaceOwnership(namespaceName);
-            }
-        } catch (Exception e) {
-            log.error("[{}] Failed to unload topic {}", clientAppId(), 
topicName, e);
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
-                internalUnloadTransactionCoordinator(asyncResponse, 
authoritative);
-            } else {
-                internalUnloadNonPartitionedTopic(asyncResponse, 
authoritative);
-            }
-        } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
-                    .thenAccept(meta -> {
-                        if (meta.partitions > 0) {
-                            final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
-
-                            for (int i = 0; i < meta.partitions; i++) {
-                                TopicName topicNamePartition = 
topicName.getPartition(i);
-                                try {
-                                    
futures.add(pulsar().getAdminClient().topics().unloadAsync(
-                                            topicNamePartition.toString()));
-                                } catch (Exception e) {
-                                    log.error("[{}] Failed to unload topic 
{}", clientAppId(), topicNamePartition, e);
-                                    asyncResponse.resume(new RestException(e));
-                                    return;
-                                }
-                            }
-
-                            FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
-                                if (exception != null) {
-                                    Throwable th = exception.getCause();
-                                    if (th instanceof NotFoundException) {
-                                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND, th.getMessage()));
-                                    } else if (th instanceof 
WebApplicationException) {
-                                        asyncResponse.resume(th);
-                                    } else {
-                                        log.error("[{}] Failed to unload topic 
{}", clientAppId(), topicName,
-                                                exception);
-                                        asyncResponse.resume(new 
RestException(exception));
-                                    }
-                                } else {
-                                    
asyncResponse.resume(Response.noContent().build());
-                                }
-                                return null;
-                            });
-                        } else {
-                            internalUnloadNonPartitionedTopic(asyncResponse, 
authoritative);
-                        }
-                    }).exceptionally(t -> {
-                log.error("[{}] Failed to unload topic {}", clientAppId(), 
topicName, t);
-                if (t instanceof WebApplicationException) {
-                    asyncResponse.resume(t);
-                } else {
-                    asyncResponse.resume(new RestException(t));
-                }
-                return null;
-            });
-        }
+        CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
+        if (topicName.isGlobal()) {
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        }
+       future.thenAccept(__ ->{
+           // If the topic name is a partition name, no need to get partition 
topic metadata again
+           if (topicName.isPartitioned()) {
+               if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
+                   internalUnloadTransactionCoordinatorAsync(asyncResponse, 
authoritative);
+               } else {
+                   internalUnloadNonPartitionedTopicAsync(asyncResponse, 
authoritative);
+               }
+           } else {
+               getPartitionedTopicMetadataAsync(topicName, authoritative, 
false)
+                       .thenAccept(meta -> {
+                           if (meta.partitions > 0) {
+                               final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();

Review comment:
       fixed

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -714,70 +714,73 @@ protected void 
internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
 
     protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean 
authoritative) {
         log.info("[{}] Unloading topic {}", clientAppId(), topicName);
-        try {
-            if (topicName.isGlobal()) {
-                validateGlobalNamespaceOwnership(namespaceName);
-            }
-        } catch (Exception e) {
-            log.error("[{}] Failed to unload topic {}", clientAppId(), 
topicName, e);
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
-                internalUnloadTransactionCoordinator(asyncResponse, 
authoritative);
-            } else {
-                internalUnloadNonPartitionedTopic(asyncResponse, 
authoritative);
-            }
-        } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
-                    .thenAccept(meta -> {
-                        if (meta.partitions > 0) {
-                            final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
-
-                            for (int i = 0; i < meta.partitions; i++) {
-                                TopicName topicNamePartition = 
topicName.getPartition(i);
-                                try {
-                                    
futures.add(pulsar().getAdminClient().topics().unloadAsync(
-                                            topicNamePartition.toString()));
-                                } catch (Exception e) {
-                                    log.error("[{}] Failed to unload topic 
{}", clientAppId(), topicNamePartition, e);
-                                    asyncResponse.resume(new RestException(e));
-                                    return;
-                                }
-                            }
-
-                            FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
-                                if (exception != null) {
-                                    Throwable th = exception.getCause();
-                                    if (th instanceof NotFoundException) {
-                                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND, th.getMessage()));
-                                    } else if (th instanceof 
WebApplicationException) {
-                                        asyncResponse.resume(th);
-                                    } else {
-                                        log.error("[{}] Failed to unload topic 
{}", clientAppId(), topicName,
-                                                exception);
-                                        asyncResponse.resume(new 
RestException(exception));
-                                    }
-                                } else {
-                                    
asyncResponse.resume(Response.noContent().build());
-                                }
-                                return null;
-                            });
-                        } else {
-                            internalUnloadNonPartitionedTopic(asyncResponse, 
authoritative);
-                        }
-                    }).exceptionally(t -> {
-                log.error("[{}] Failed to unload topic {}", clientAppId(), 
topicName, t);
-                if (t instanceof WebApplicationException) {
-                    asyncResponse.resume(t);
-                } else {
-                    asyncResponse.resume(new RestException(t));
-                }
-                return null;
-            });
-        }
+        CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
+        if (topicName.isGlobal()) {
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        }
+       future.thenAccept(__ ->{
+           // If the topic name is a partition name, no need to get partition 
topic metadata again
+           if (topicName.isPartitioned()) {
+               if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
+                   internalUnloadTransactionCoordinatorAsync(asyncResponse, 
authoritative);
+               } else {
+                   internalUnloadNonPartitionedTopicAsync(asyncResponse, 
authoritative);
+               }
+           } else {
+               getPartitionedTopicMetadataAsync(topicName, authoritative, 
false)
+                       .thenAccept(meta -> {
+                           if (meta.partitions > 0) {
+                               final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+
+                               for (int i = 0; i < meta.partitions; i++) {
+                                   TopicName topicNamePartition = 
topicName.getPartition(i);
+                                   try {
+                                       
futures.add(pulsar().getAdminClient().topics().unloadAsync(
+                                               topicNamePartition.toString()));
+                                   } catch (Exception e) {
+                                       log.error("[{}] Failed to unload topic 
{}", clientAppId(),
+                                               topicNamePartition, e);
+                                       asyncResponse.resume(new 
RestException(e));
+                                       return;
+                                   }
+                               }
+
+                               FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
+                                   if (exception != null) {
+                                       Throwable th = exception.getCause();
+                                       if (th instanceof NotFoundException) {
+                                           asyncResponse.resume(new 
RestException(Status.NOT_FOUND, th.getMessage()));
+                                       } else if (th instanceof 
WebApplicationException) {
+                                           asyncResponse.resume(th);
+                                       } else {
+                                           log.error("[{}] Failed to unload 
topic {}", clientAppId(), topicName,
+                                                   exception);
+                                           asyncResponse.resume(new 
RestException(exception));
+                                       }
+                                   } else {
+                                       
asyncResponse.resume(Response.noContent().build());
+                                   }
+                                   return null;
+                               });
+                           } else {
+                               
internalUnloadNonPartitionedTopicAsync(asyncResponse, authoritative);
+                           }
+                       }).exceptionally(t -> {
+                           log.error("[{}] Failed to unload topic {}", 
clientAppId(), topicName, t);
+                           if (t instanceof WebApplicationException) {
+                               asyncResponse.resume(t);
+                           } else {
+                               asyncResponse.resume(new RestException(t));
+                           }
+                           return null;
+                       });
+           }
+       }).exceptionally(ex -> {
+           Throwable cause = ex.getCause();
+           log.error("[{}] Failed to unload topic {}", clientAppId(), 
topicName, cause);

Review comment:
       fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to