Technoboy- commented on a change in pull request #13805:
URL: https://github.com/apache/pulsar/pull/13805#discussion_r791415791



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -560,156 +561,130 @@ protected PartitionedTopicMetadata 
internalGetPartitionedMetadata(boolean author
 
     protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, 
boolean authoritative,
                                                   boolean force, boolean 
deleteSchema) {
-        try {
-            validateNamespaceOperation(topicName.getNamespaceObject(), 
NamespaceOperation.DELETE_TOPIC);
-            validateTopicOwnership(topicName, authoritative);
-        } catch (WebApplicationException wae) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Failed to delete partitioned topic {}, 
redirecting to other brokers.",
-                        clientAppId(), topicName, wae);
-            }
-            resumeAsyncResponseExceptionally(asyncResponse, wae);
-            return;
-        } catch (Exception e) {
-            log.error("[{}] Failed to delete partitioned topic {}", 
clientAppId(), topicName, e);
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-        final CompletableFuture<Void> future = new CompletableFuture<>();
-        
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenAccept(partitionMeta
 -> {
-            final int numPartitions = partitionMeta.partitions;
-            if (numPartitions > 0) {
-                final AtomicInteger count = new AtomicInteger(numPartitions);
-                if (deleteSchema) {
-                    count.incrementAndGet();
-                    
pulsar().getBrokerService().deleteSchemaStorage(topicName.getPartition(0).toString())
-                            .whenComplete((r, ex) -> {
-                                if (ex != null) {
-                                    log.warn("Failed to delete schema storage 
of topic: {}", topicName);
-                                }
-                                if (count.decrementAndGet() == 0) {
-                                    future.complete(null);
-                                }
-                            });
-                }
-                // delete authentication policies of the partitioned topic
-                CompletableFuture<Void> deleteAuthFuture = new 
CompletableFuture<>();
-                pulsar().getPulsarResources().getNamespaceResources()
-                        .setPoliciesAsync(topicName.getNamespaceObject(), p -> 
{
-                            for (int i = 0; i < numPartitions; i++) {
-                                
p.auth_policies.getTopicAuthentication().remove(topicName.getPartition(i).toString());
+        validateNamespaceOperationAsync(topicName.getNamespaceObject(), 
NamespaceOperation.DELETE_TOPIC)
+                .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                .thenCompose(__ -> pulsar().getBrokerService()
+                        .fetchPartitionedTopicMetadataAsync(topicName)
+                        .thenCompose(partitionedMeta -> {
+                            final int numPartitions = 
partitionedMeta.partitions;
+                            if (numPartitions < 1){
+                                return CompletableFuture.completedFuture(null);
                             }
-                            
p.auth_policies.getTopicAuthentication().remove(topicName.toString());
-                            return p;
-                        }).thenAccept(v -> {
-                            log.info("Successfully delete authentication 
policies for partitioned topic {}", topicName);
-                            deleteAuthFuture.complete(null);
-                        }).exceptionally(ex -> {
-                            if (ex.getCause() instanceof 
MetadataStoreException.NotFoundException) {
-                                log.warn("Namespace policies of {} not found", 
topicName.getNamespaceObject());
-                                deleteAuthFuture.complete(null);
-                            } else {
-                                log.error("Failed to delete authentication 
policies for partitioned topic {}",
-                                        topicName, ex);
-                                deleteAuthFuture.completeExceptionally(ex);
+                            if (deleteSchema) {
+                                return pulsar().getBrokerService()
+                                        
.deleteSchemaStorage(topicName.getPartition(0).toString())
+                                        .thenCompose(unused ->
+                                                
internalRemovePartitionsAuthenticationPolicies(numPartitions))
+                                        .thenCompose(unused2 -> 
internalRemovePartitionsTopic(numPartitions, force));
                             }
-                            return null;
-                        });
-
-                deleteAuthFuture.whenComplete((r, ex) -> {
-                    if (ex != null) {
-                        future.completeExceptionally(ex);
-                        return;
-                    }
-                    for (int i = 0; i < numPartitions; i++) {
-                        TopicName topicNamePartition = 
topicName.getPartition(i);
-                        try {
-                            pulsar().getAdminClient().topics()
-                                    
.deleteAsync(topicNamePartition.toString(), force)
-                                    .whenComplete((r1, ex1) -> {
-                                        if (ex1 != null) {
-                                            if (ex1 instanceof 
NotFoundException) {
-                                                // if the sub-topic is not 
found, the client might not have called
-                                                // create producer or it might 
have been deleted earlier,
-                                                //so we ignore the 404 error.
-                                                // For all other exception,
-                                                //we fail the delete partition 
method even if a single
-                                                // partition is failed to be 
deleted
-                                                if (log.isDebugEnabled()) {
-                                                    log.debug("[{}] Partition 
not found: {}", clientAppId(),
-                                                            
topicNamePartition);
-                                                }
-                                            } else {
-                                                log.error("[{}] Failed to 
delete partition {}", clientAppId(),
-                                                        topicNamePartition, 
ex1);
-                                                
future.completeExceptionally(ex1);
-                                                return;
-                                            }
-                                        } else {
-                                            log.info("[{}] Deleted partition 
{}", clientAppId(), topicNamePartition);
-                                        }
-                                        if (count.decrementAndGet() == 0) {
-                                            future.complete(null);
-                                        }
-                                    });
-                        } catch (Exception e) {
-                            log.error("[{}] Failed to delete partition {}", 
clientAppId(), topicNamePartition, e);
-                            future.completeExceptionally(e);
+                            return 
internalRemovePartitionsAuthenticationPolicies(numPartitions)
+                                    .thenCompose(unused -> 
internalRemovePartitionsTopic(numPartitions, force));
+                        })
+                // Only tries to delete the znode for partitioned topic when 
all its partitions are successfully deleted
+                ).thenCompose(__ -> namespaceResources()
+                                
.getPartitionedTopicResources().deletePartitionedTopicAsync(topicName))
+                .thenAccept(__ -> {
+                    log.info("[{}] Deleted partitioned topic {}", 
clientAppId(), topicName);
+                    asyncResponse.resume(Response.noContent().build());
+                }).exceptionally(ex -> {
+                    Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                    if (realCause instanceof WebApplicationException
+                            && ((WebApplicationException) 
realCause).getResponse().getStatus()
+                            == Status.TEMPORARY_REDIRECT.getStatusCode()) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Failed to delete partitioned topic 
{}, redirecting to other brokers.",
+                                    clientAppId(), topicName, realCause);
                         }
-                    }
-                });
-            } else {
-                future.complete(null);
-            }
-        }).exceptionally(ex -> {
-            future.completeExceptionally(ex);
-            return null;
-        });
-
-        future.whenComplete((r, ex) -> {
-            if (ex != null) {
-                if (ex instanceof PreconditionFailedException) {
-                    asyncResponse.resume(
-                            new RestException(Status.PRECONDITION_FAILED, 
"Topic has active producers/subscriptions"));
-                    return;
-                } else if (ex instanceof PulsarAdminException) {
-                    asyncResponse.resume(new 
RestException((PulsarAdminException) ex));
-                    return;
-                } else if (ex instanceof WebApplicationException) {
-                    asyncResponse.resume(ex);
-                    return;
-                } else {
-                    asyncResponse.resume(new RestException(ex));
-                    return;
-                }
-            }
-            // Only tries to delete the znode for partitioned topic when all 
its partitions are successfully deleted
-            try {
-                namespaceResources().getPartitionedTopicResources()
-                        .deletePartitionedTopicAsync(topicName).thenAccept(r2 
-> {
-                            log.info("[{}] Deleted partitioned topic {}", 
clientAppId(), topicName);
-                            asyncResponse.resume(Response.noContent().build());
-                }).exceptionally(ex1 -> {
-                    log.error("[{}] Failed to delete partitioned topic {}", 
clientAppId(), topicName, ex1.getCause());
-                    if (ex1.getCause()
-                            instanceof 
org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException) {
+                    } else if (realCause instanceof 
PreconditionFailedException) {
+                        asyncResponse.resume(
+                                new RestException(Status.PRECONDITION_FAILED,
+                                        "Topic has active 
producers/subscriptions"));
+                    } else if (realCause instanceof WebApplicationException){
+                        asyncResponse.resume(realCause);
+                    } else if (realCause instanceof 
MetadataStoreException.NotFoundException) {
+                        log.warn("Namespace policies of {} not found", 
topicName.getNamespaceObject());
                         asyncResponse.resume(new RestException(
                                 new RestException(Status.NOT_FOUND, 
"Partitioned topic does not exist")));
-                    } else if (ex1
-                            .getCause()
-                            instanceof 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException) {
-                        asyncResponse.resume(
-                                new RestException(new 
RestException(Status.CONFLICT, "Concurrent modification")));
+                    } else if (realCause instanceof PulsarAdminException) {
+                        asyncResponse.resume(new 
RestException((PulsarAdminException) realCause));
+                    } else if (realCause instanceof 
MetadataStoreException.BadVersionException) {
+                        asyncResponse.resume(new RestException(
+                                new RestException(Status.CONFLICT, "Concurrent 
modification")));
                     } else {
-                        asyncResponse.resume(new 
RestException((ex1.getCause())));
+                        log.warn("[{}] Fail to Delete partitioned topic {}", 
clientAppId(), topicName, realCause);
+                        asyncResponse.resume(new RestException(realCause));
                     }
                     return null;
                 });
-            } catch (Exception e1) {
-                log.error("[{}] Failed to delete partitioned topic {}", 
clientAppId(), topicName, e1);
-                asyncResponse.resume(new RestException(e1));
-            }
-        });
+    }
+
+    private CompletableFuture<Void> internalRemovePartitionsTopic(int 
numPartitions, boolean force) {
+        return FutureUtil.waitForAll(IntStream.range(0, numPartitions)
+                .mapToObj(i -> {
+                    TopicName topicNamePartition = topicName.getPartition(i);
+                    try {
+                        CompletableFuture<Void> deleteFutures = new 
CompletableFuture<>();
+                        pulsar().getAdminClient().topics()
+                                .deleteAsync(topicNamePartition.toString(), 
force)
+                                .whenComplete((r, ex) -> {
+                                    if (ex != null) {
+                                        Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                                        if (realCause instanceof 
NotFoundException){
+                                            // if the sub-topic is not found, 
the client might not have called
+                                            // create producer or it might 
have been deleted earlier,
+                                            // so we ignore the 404 error.
+                                            // For all other exception,
+                                            // we fail the delete partition 
method even if a single
+                                            // partition is failed to be 
deleted
+                                            if (log.isDebugEnabled()) {
+                                                log.debug("[{}] Partition not 
found: {}", clientAppId(),
+                                                        topicNamePartition);
+                                            }
+                                            deleteFutures.complete(null);
+                                        } else {
+                                            log.error("[{}] Failed to delete 
partition {}", clientAppId(),
+                                                    topicNamePartition, 
realCause);
+                                            
deleteFutures.completeExceptionally(realCause);
+                                        }
+                                    } else {
+                                        deleteFutures.complete(null);
+                                    }
+                                });
+                        return deleteFutures;
+                    } catch (PulsarServerException ex) {
+                        log.error("[{}] Failed to get admin client while 
delete partition {}",
+                                clientAppId(), topicNamePartition, ex);
+                        return FutureUtil.failedFuture(ex);
+                    }
+                }).collect(Collectors.toList()));
+    }
+
+    private CompletableFuture<Void> 
internalRemovePartitionsAuthenticationPolicies(int numPartitions) {
+        CompletableFuture<Void> setPoliciesFuture = new CompletableFuture<>();
+        pulsar().getPulsarResources().getNamespaceResources()
+                .setPoliciesAsync(topicName.getNamespaceObject(), p -> {
+                    IntStream.range(0, numPartitions)
+                            .forEach(i -> 
p.auth_policies.getTopicAuthentication()
+                                    
.remove(topicName.getPartition(i).toString()));
+                    
p.auth_policies.getTopicAuthentication().remove(topicName.toString());
+                    return p;
+                })
+                .whenComplete((r, ex) -> {
+                    if (ex != null){
+                        Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                        if (realCause instanceof 
MetadataStoreException.NotFoundException) {
+                            log.warn("Namespace policies of {} not found", 
topicName.getNamespaceObject());
+                            setPoliciesFuture.complete(null);
+                        } else {
+                            log.error("Failed to delete authentication 
policies for partitioned topic {}",
+                                    topicName, ex);
+                            setPoliciesFuture.completeExceptionally(realCause);
+                        }
+                    }

Review comment:
       add `else` block




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to