This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 38555851359 [fix][broker] Fix delete namespace fail by a In-flight 
topic (#19374)
38555851359 is described below

commit 38555851359f9cfc172650c387a58c5a03809e97
Author: Qiang Zhao <[email protected]>
AuthorDate: Fri Feb 17 21:39:11 2023 +0800

    [fix][broker] Fix delete namespace fail by a In-flight topic (#19374)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 183 +++++++++++++--------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |   7 -
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |   7 -
 .../pulsar/broker/service/AbstractTopic.java       |   3 -
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |   6 +
 5 files changed, 119 insertions(+), 87 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 6746d29af73..30f01ece7de 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
@@ -113,6 +114,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.zookeeper.KeeperException;
 
 @Slf4j
 public abstract class NamespacesBase extends AdminResource {
@@ -202,78 +204,94 @@ public abstract class NamespacesBase extends 
AdminResource {
                 });
     }
 
-    @SuppressWarnings("unchecked")
-    protected CompletableFuture<Void> internalDeleteNamespaceAsync(boolean 
force) {
-        CompletableFuture<Policies> preconditionCheck = 
precheckWhenDeleteNamespace(namespaceName, force);
-        return preconditionCheck
+    /**
+     * Delete the namespace and retry to resolve some topics that were not 
created successfully(in metadata)
+     * during the deletion.
+     */
+    protected @Nonnull CompletableFuture<Void> 
internalDeleteNamespaceAsync(boolean force) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        internalRetryableDeleteNamespaceAsync0(force, 5, future);
+        return future;
+    }
+    private void internalRetryableDeleteNamespaceAsync0(boolean force, int 
retryTimes,
+                                                        @Nonnull 
CompletableFuture<Void> callback) {
+        precheckWhenDeleteNamespace(namespaceName, force)
                 .thenCompose(policies -> {
+                    final CompletableFuture<List<String>> topicsFuture;
                     if (policies == null || 
CollectionUtils.isEmpty(policies.replication_clusters)){
-                        return 
pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
-                    }
-                    return 
pulsar().getNamespaceService().getFullListOfTopics(namespaceName);
-                })
-                .thenCompose(allTopics -> 
pulsar().getNamespaceService().getFullListOfPartitionedTopic(namespaceName)
-                         .thenCompose(allPartitionedTopics -> {
-                             List<List<String>> topicsSum = new ArrayList<>(2);
-                             topicsSum.add(allTopics);
-                             topicsSum.add(allPartitionedTopics);
-                             return 
CompletableFuture.completedFuture(topicsSum);
-                         }))
-                .thenCompose(topics -> {
-                    List<String> allTopics = topics.get(0);
-                    ArrayList<String> allUserCreatedTopics = new ArrayList<>();
-                    List<String> allPartitionedTopics = topics.get(1);
-                    ArrayList<String> allUserCreatedPartitionTopics = new 
ArrayList<>();
-                    boolean hasNonSystemTopic = false;
-                    List<String> allSystemTopics = new ArrayList<>();
-                    List<String> allPartitionedSystemTopics = new 
ArrayList<>();
-                    List<String> topicPolicy = new ArrayList<>();
-                    List<String> partitionedTopicPolicy = new ArrayList<>();
-                    for (String topic : allTopics) {
-                        if 
(!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
-                            hasNonSystemTopic = true;
-                            allUserCreatedTopics.add(topic);
-                        } else {
-                            if 
(SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
-                                topicPolicy.add(topic);
-                            } else {
-                                allSystemTopics.add(topic);
-                            }
-                        }
-                    }
-                    for (String topic : allPartitionedTopics) {
-                        if 
(!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
-                            hasNonSystemTopic = true;
-                            allUserCreatedPartitionTopics.add(topic);
-                        } else {
-                            if 
(SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
-                                partitionedTopicPolicy.add(topic);
-                            } else {
-                                allPartitionedSystemTopics.add(topic);
-                            }
-                        }
-                    }
-                    if (!force) {
-                        if (hasNonSystemTopic) {
-                            throw new RestException(Status.CONFLICT, "Cannot 
delete non empty namespace");
-                        }
+                        topicsFuture = 
pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
+                    } else {
+                        topicsFuture = 
pulsar().getNamespaceService().getFullListOfTopics(namespaceName);
                     }
-                    return 
namespaceResources().setPoliciesAsync(namespaceName, old -> {
-                        old.deleted = true;
-                        return  old;
-                    }).thenCompose(ignore -> {
-                        return internalDeleteTopicsAsync(allUserCreatedTopics);
-                    }).thenCompose(ignore -> {
-                        return 
internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics);
-                    }).thenCompose(ignore -> {
-                        return internalDeleteTopicsAsync(allSystemTopics);
-                    }).thenCompose(ignore__ -> {
-                        return 
internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics);
-                    }).thenCompose(ignore -> {
-                        return internalDeleteTopicsAsync(topicPolicy);
-                    }).thenCompose(ignore__ -> {
-                        return 
internalDeletePartitionedTopicsAsync(partitionedTopicPolicy);
-                    });
+                    return topicsFuture.thenCompose(allTopics ->
+                            
pulsar().getNamespaceService().getFullListOfPartitionedTopic(namespaceName)
+                                    .thenCompose(allPartitionedTopics -> {
+                                        List<List<String>> topicsSum = new 
ArrayList<>(2);
+                                        topicsSum.add(allTopics);
+                                        topicsSum.add(allPartitionedTopics);
+                                        return 
CompletableFuture.completedFuture(topicsSum);
+                                    }))
+                            .thenCompose(topics -> {
+                                List<String> allTopics = topics.get(0);
+                                ArrayList<String> allUserCreatedTopics = new 
ArrayList<>();
+                                List<String> allPartitionedTopics = 
topics.get(1);
+                                ArrayList<String> 
allUserCreatedPartitionTopics = new ArrayList<>();
+                                boolean hasNonSystemTopic = false;
+                                List<String> allSystemTopics = new 
ArrayList<>();
+                                List<String> allPartitionedSystemTopics = new 
ArrayList<>();
+                                List<String> topicPolicy = new ArrayList<>();
+                                List<String> partitionedTopicPolicy = new 
ArrayList<>();
+                                for (String topic : allTopics) {
+                                    if 
(!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
+                                        hasNonSystemTopic = true;
+                                        allUserCreatedTopics.add(topic);
+                                    } else {
+                                        if 
(SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
+                                            topicPolicy.add(topic);
+                                        } else {
+                                            allSystemTopics.add(topic);
+                                        }
+                                    }
+                                }
+                                for (String topic : allPartitionedTopics) {
+                                    if 
(!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
+                                        hasNonSystemTopic = true;
+                                        
allUserCreatedPartitionTopics.add(topic);
+                                    } else {
+                                        if 
(SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
+                                            partitionedTopicPolicy.add(topic);
+                                        } else {
+                                            
allPartitionedSystemTopics.add(topic);
+                                        }
+                                    }
+                                }
+                                if (!force) {
+                                    if (hasNonSystemTopic) {
+                                        throw new 
RestException(Status.CONFLICT, "Cannot delete non empty namespace");
+                                    }
+                                }
+                                final CompletableFuture<Void> markDeleteFuture;
+                                if (policies != null && policies.deleted) {
+                                    markDeleteFuture = 
CompletableFuture.completedFuture(null);
+                                } else {
+                                    markDeleteFuture = 
namespaceResources().setPoliciesAsync(namespaceName, old -> {
+                                        old.deleted = true;
+                                        return old;
+                                    });
+                                }
+                                return markDeleteFuture.thenCompose(__ ->
+                                                
internalDeleteTopicsAsync(allUserCreatedTopics))
+                                        .thenCompose(ignore ->
+                                                
internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics))
+                                        .thenCompose(ignore ->
+                                                
internalDeleteTopicsAsync(allSystemTopics))
+                                        .thenCompose(ignore ->
+                                                
internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
+                                        .thenCompose(ignore ->
+                                                
internalDeleteTopicsAsync(topicPolicy))
+                                        .thenCompose(ignore ->
+                                                
internalDeletePartitionedTopicsAsync(partitionedTopicPolicy));
+                            });
                 })
                 .thenCompose(ignore -> pulsar().getNamespaceService()
                         
.getNamespaceBundleFactory().getBundlesAsync(namespaceName))
@@ -297,7 +315,32 @@ public abstract class NamespacesBase extends AdminResource 
{
                                     return 
CompletableFuture.completedFuture(null);
                                 })
                         ).collect(Collectors.toList())))
-                .thenCompose(ignore -> internalClearZkSources());
+                .thenCompose(ignore -> internalClearZkSources())
+                .whenComplete((result, error) -> {
+                    if (error != null) {
+                        final Throwable rc = 
FutureUtil.unwrapCompletionException(error);
+                        if (rc instanceof MetadataStoreException) {
+                            if (rc.getCause() != null && rc.getCause() 
instanceof KeeperException.NotEmptyException) {
+                                log.info("[{}] There are in-flight topics 
created during the namespace deletion, "
+                                        + "retry to delete the namespace 
again.", namespaceName);
+                                final int next = retryTimes - 1;
+                                if (next > 0) {
+                                    // async recursive
+                                    
internalRetryableDeleteNamespaceAsync0(force, next, callback);
+                                } else {
+                                    callback.completeExceptionally(
+                                            new RestException(Status.CONFLICT, 
"The broker still have in-flight topics"
+                                                    + " created during 
namespace deletion, please try again."));
+                                    // drop out recursive
+                                }
+                                return;
+                            }
+                        }
+                        callback.completeExceptionally(error);
+                        return;
+                    }
+                    callback.complete(result);
+                });
     }
 
     private CompletableFuture<Void> 
internalDeletePartitionedTopicsAsync(List<String> topicNames) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 59613558eb8..153e29506c3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -47,7 +47,6 @@ import javax.ws.rs.core.Response.Status;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.admin.impl.NamespacesBase;
 import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -249,12 +248,6 @@ public class Namespaces extends NamespacesBase {
                     asyncResponse.resume(Response.noContent().build());
                 })
                 .exceptionally(ex -> {
-                    Throwable cause = FutureUtil.unwrapCompletionException(ex);
-                    if (cause instanceof 
PulsarAdminException.ConflictException) {
-                        log.info("[{}] There are new topics created during the 
namespace deletion, "
-                                + "retry to delete the namespace again.", 
namespaceName);
-                        pulsar().getExecutor().execute(() -> 
internalDeleteNamespaceAsync(force));
-                    }
                     if (!isRedirectException(ex)) {
                         log.error("[{}] Failed to delete namespace {}", 
clientAppId(), namespaceName, ex);
                     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index f5e23db79b9..12ff229c2f0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -49,7 +49,6 @@ import javax.ws.rs.core.StreamingOutput;
 import org.apache.pulsar.broker.admin.impl.NamespacesBase;
 import org.apache.pulsar.broker.admin.impl.OffloaderObjectsScannerUtils;
 import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -197,12 +196,6 @@ public class Namespaces extends NamespacesBase {
                     asyncResponse.resume(Response.noContent().build());
                 })
                 .exceptionally(ex -> {
-                    Throwable cause = FutureUtil.unwrapCompletionException(ex);
-                    if (cause instanceof 
PulsarAdminException.ConflictException) {
-                        log.info("[{}] There are new topics created during the 
namespace deletion, "
-                                + "retry to delete the namespace again.", 
namespaceName);
-                        pulsar().getExecutor().execute(() -> 
internalDeleteNamespaceAsync(force));
-                    }
                     if (!isRedirectException(ex)) {
                         log.error("[{}] Failed to delete namespace {}", 
clientAppId(), namespaceName, ex);
                     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 4e095cd66ba..6245ce19eeb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -249,9 +249,6 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         if (log.isDebugEnabled()) {
             log.debug("[{}]updateTopicPolicyByNamespacePolicy,data={}", topic, 
namespacePolicies);
         }
-        if (namespacePolicies.deleted) {
-            return;
-        }
         
topicPolicies.getRetentionPolicies().updateNamespaceValue(namespacePolicies.retention_policies);
         
topicPolicies.getCompactionThreshold().updateNamespaceValue(namespacePolicies.compaction_threshold);
         topicPolicies.getReplicationClusters().updateNamespaceValue(
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 66b2b1d1470..90be220cfd8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -46,6 +46,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import javax.ws.rs.NotAcceptableException;
 import javax.ws.rs.core.Response.Status;
@@ -1681,6 +1682,11 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         // verify namespace can be deleted even without topic policy events
         admin.namespaces().deleteNamespace(namespace, true);
 
+        Awaitility.await().untilAsserted(() -> {
+            final CompletableFuture<Optional<Topic>> eventTopicFuture =
+                    
pulsar.getBrokerService().getTopics().get("persistent://test-tenant/test-ns2/__change_events");
+            assertNull(eventTopicFuture);
+        });
         admin.namespaces().createNamespace(namespace, Set.of("test"));
         // create topic
         String topic = namespace + "/test-topic2";

Reply via email to