This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 38555851359 [fix][broker] Fix delete namespace fail by a In-flight
topic (#19374)
38555851359 is described below
commit 38555851359f9cfc172650c387a58c5a03809e97
Author: Qiang Zhao <[email protected]>
AuthorDate: Fri Feb 17 21:39:11 2023 +0800
[fix][broker] Fix delete namespace fail by a In-flight topic (#19374)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 183 +++++++++++++--------
.../apache/pulsar/broker/admin/v1/Namespaces.java | 7 -
.../apache/pulsar/broker/admin/v2/Namespaces.java | 7 -
.../pulsar/broker/service/AbstractTopic.java | 3 -
.../apache/pulsar/broker/admin/AdminApi2Test.java | 6 +
5 files changed, 119 insertions(+), 87 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 6746d29af73..30f01ece7de 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
@@ -113,6 +114,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.zookeeper.KeeperException;
@Slf4j
public abstract class NamespacesBase extends AdminResource {
@@ -202,78 +204,94 @@ public abstract class NamespacesBase extends
AdminResource {
});
}
- @SuppressWarnings("unchecked")
- protected CompletableFuture<Void> internalDeleteNamespaceAsync(boolean
force) {
- CompletableFuture<Policies> preconditionCheck =
precheckWhenDeleteNamespace(namespaceName, force);
- return preconditionCheck
+ /**
+ * Delete the namespace and retry to resolve some topics that were not
created successfully(in metadata)
+ * during the deletion.
+ */
+ protected @Nonnull CompletableFuture<Void>
internalDeleteNamespaceAsync(boolean force) {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+ internalRetryableDeleteNamespaceAsync0(force, 5, future);
+ return future;
+ }
+ private void internalRetryableDeleteNamespaceAsync0(boolean force, int
retryTimes,
+ @Nonnull
CompletableFuture<Void> callback) {
+ precheckWhenDeleteNamespace(namespaceName, force)
.thenCompose(policies -> {
+ final CompletableFuture<List<String>> topicsFuture;
if (policies == null ||
CollectionUtils.isEmpty(policies.replication_clusters)){
- return
pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
- }
- return
pulsar().getNamespaceService().getFullListOfTopics(namespaceName);
- })
- .thenCompose(allTopics ->
pulsar().getNamespaceService().getFullListOfPartitionedTopic(namespaceName)
- .thenCompose(allPartitionedTopics -> {
- List<List<String>> topicsSum = new ArrayList<>(2);
- topicsSum.add(allTopics);
- topicsSum.add(allPartitionedTopics);
- return
CompletableFuture.completedFuture(topicsSum);
- }))
- .thenCompose(topics -> {
- List<String> allTopics = topics.get(0);
- ArrayList<String> allUserCreatedTopics = new ArrayList<>();
- List<String> allPartitionedTopics = topics.get(1);
- ArrayList<String> allUserCreatedPartitionTopics = new
ArrayList<>();
- boolean hasNonSystemTopic = false;
- List<String> allSystemTopics = new ArrayList<>();
- List<String> allPartitionedSystemTopics = new
ArrayList<>();
- List<String> topicPolicy = new ArrayList<>();
- List<String> partitionedTopicPolicy = new ArrayList<>();
- for (String topic : allTopics) {
- if
(!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
- hasNonSystemTopic = true;
- allUserCreatedTopics.add(topic);
- } else {
- if
(SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
- topicPolicy.add(topic);
- } else {
- allSystemTopics.add(topic);
- }
- }
- }
- for (String topic : allPartitionedTopics) {
- if
(!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
- hasNonSystemTopic = true;
- allUserCreatedPartitionTopics.add(topic);
- } else {
- if
(SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
- partitionedTopicPolicy.add(topic);
- } else {
- allPartitionedSystemTopics.add(topic);
- }
- }
- }
- if (!force) {
- if (hasNonSystemTopic) {
- throw new RestException(Status.CONFLICT, "Cannot
delete non empty namespace");
- }
+ topicsFuture =
pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
+ } else {
+ topicsFuture =
pulsar().getNamespaceService().getFullListOfTopics(namespaceName);
}
- return
namespaceResources().setPoliciesAsync(namespaceName, old -> {
- old.deleted = true;
- return old;
- }).thenCompose(ignore -> {
- return internalDeleteTopicsAsync(allUserCreatedTopics);
- }).thenCompose(ignore -> {
- return
internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics);
- }).thenCompose(ignore -> {
- return internalDeleteTopicsAsync(allSystemTopics);
- }).thenCompose(ignore__ -> {
- return
internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics);
- }).thenCompose(ignore -> {
- return internalDeleteTopicsAsync(topicPolicy);
- }).thenCompose(ignore__ -> {
- return
internalDeletePartitionedTopicsAsync(partitionedTopicPolicy);
- });
+ return topicsFuture.thenCompose(allTopics ->
+
pulsar().getNamespaceService().getFullListOfPartitionedTopic(namespaceName)
+ .thenCompose(allPartitionedTopics -> {
+ List<List<String>> topicsSum = new
ArrayList<>(2);
+ topicsSum.add(allTopics);
+ topicsSum.add(allPartitionedTopics);
+ return
CompletableFuture.completedFuture(topicsSum);
+ }))
+ .thenCompose(topics -> {
+ List<String> allTopics = topics.get(0);
+ ArrayList<String> allUserCreatedTopics = new
ArrayList<>();
+ List<String> allPartitionedTopics =
topics.get(1);
+ ArrayList<String>
allUserCreatedPartitionTopics = new ArrayList<>();
+ boolean hasNonSystemTopic = false;
+ List<String> allSystemTopics = new
ArrayList<>();
+ List<String> allPartitionedSystemTopics = new
ArrayList<>();
+ List<String> topicPolicy = new ArrayList<>();
+ List<String> partitionedTopicPolicy = new
ArrayList<>();
+ for (String topic : allTopics) {
+ if
(!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
+ hasNonSystemTopic = true;
+ allUserCreatedTopics.add(topic);
+ } else {
+ if
(SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
+ topicPolicy.add(topic);
+ } else {
+ allSystemTopics.add(topic);
+ }
+ }
+ }
+ for (String topic : allPartitionedTopics) {
+ if
(!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
+ hasNonSystemTopic = true;
+
allUserCreatedPartitionTopics.add(topic);
+ } else {
+ if
(SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
+ partitionedTopicPolicy.add(topic);
+ } else {
+
allPartitionedSystemTopics.add(topic);
+ }
+ }
+ }
+ if (!force) {
+ if (hasNonSystemTopic) {
+ throw new
RestException(Status.CONFLICT, "Cannot delete non empty namespace");
+ }
+ }
+ final CompletableFuture<Void> markDeleteFuture;
+ if (policies != null && policies.deleted) {
+ markDeleteFuture =
CompletableFuture.completedFuture(null);
+ } else {
+ markDeleteFuture =
namespaceResources().setPoliciesAsync(namespaceName, old -> {
+ old.deleted = true;
+ return old;
+ });
+ }
+ return markDeleteFuture.thenCompose(__ ->
+
internalDeleteTopicsAsync(allUserCreatedTopics))
+ .thenCompose(ignore ->
+
internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics))
+ .thenCompose(ignore ->
+
internalDeleteTopicsAsync(allSystemTopics))
+ .thenCompose(ignore ->
+
internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
+ .thenCompose(ignore ->
+
internalDeleteTopicsAsync(topicPolicy))
+ .thenCompose(ignore ->
+
internalDeletePartitionedTopicsAsync(partitionedTopicPolicy));
+ });
})
.thenCompose(ignore -> pulsar().getNamespaceService()
.getNamespaceBundleFactory().getBundlesAsync(namespaceName))
@@ -297,7 +315,32 @@ public abstract class NamespacesBase extends AdminResource
{
return
CompletableFuture.completedFuture(null);
})
).collect(Collectors.toList())))
- .thenCompose(ignore -> internalClearZkSources());
+ .thenCompose(ignore -> internalClearZkSources())
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ final Throwable rc =
FutureUtil.unwrapCompletionException(error);
+ if (rc instanceof MetadataStoreException) {
+ if (rc.getCause() != null && rc.getCause()
instanceof KeeperException.NotEmptyException) {
+ log.info("[{}] There are in-flight topics
created during the namespace deletion, "
+ + "retry to delete the namespace
again.", namespaceName);
+ final int next = retryTimes - 1;
+ if (next > 0) {
+ // async recursive
+
internalRetryableDeleteNamespaceAsync0(force, next, callback);
+ } else {
+ callback.completeExceptionally(
+ new RestException(Status.CONFLICT,
"The broker still have in-flight topics"
+ + " created during
namespace deletion, please try again."));
+ // drop out recursive
+ }
+ return;
+ }
+ }
+ callback.completeExceptionally(error);
+ return;
+ }
+ callback.complete(result);
+ });
}
private CompletableFuture<Void>
internalDeletePartitionedTopicsAsync(List<String> topicNames) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 59613558eb8..153e29506c3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -47,7 +47,6 @@ import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -249,12 +248,6 @@ public class Namespaces extends NamespacesBase {
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
- Throwable cause = FutureUtil.unwrapCompletionException(ex);
- if (cause instanceof
PulsarAdminException.ConflictException) {
- log.info("[{}] There are new topics created during the
namespace deletion, "
- + "retry to delete the namespace again.",
namespaceName);
- pulsar().getExecutor().execute(() ->
internalDeleteNamespaceAsync(force));
- }
if (!isRedirectException(ex)) {
log.error("[{}] Failed to delete namespace {}",
clientAppId(), namespaceName, ex);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index f5e23db79b9..12ff229c2f0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -49,7 +49,6 @@ import javax.ws.rs.core.StreamingOutput;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.admin.impl.OffloaderObjectsScannerUtils;
import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -197,12 +196,6 @@ public class Namespaces extends NamespacesBase {
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
- Throwable cause = FutureUtil.unwrapCompletionException(ex);
- if (cause instanceof
PulsarAdminException.ConflictException) {
- log.info("[{}] There are new topics created during the
namespace deletion, "
- + "retry to delete the namespace again.",
namespaceName);
- pulsar().getExecutor().execute(() ->
internalDeleteNamespaceAsync(force));
- }
if (!isRedirectException(ex)) {
log.error("[{}] Failed to delete namespace {}",
clientAppId(), namespaceName, ex);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 4e095cd66ba..6245ce19eeb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -249,9 +249,6 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
if (log.isDebugEnabled()) {
log.debug("[{}]updateTopicPolicyByNamespacePolicy,data={}", topic,
namespacePolicies);
}
- if (namespacePolicies.deleted) {
- return;
- }
topicPolicies.getRetentionPolicies().updateNamespaceValue(namespacePolicies.retention_policies);
topicPolicies.getCompactionThreshold().updateNamespaceValue(namespacePolicies.compaction_threshold);
topicPolicies.getReplicationClusters().updateNamespaceValue(
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 66b2b1d1470..90be220cfd8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -46,6 +46,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.NotAcceptableException;
import javax.ws.rs.core.Response.Status;
@@ -1681,6 +1682,11 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
// verify namespace can be deleted even without topic policy events
admin.namespaces().deleteNamespace(namespace, true);
+ Awaitility.await().untilAsserted(() -> {
+ final CompletableFuture<Optional<Topic>> eventTopicFuture =
+
pulsar.getBrokerService().getTopics().get("persistent://test-tenant/test-ns2/__change_events");
+ assertNull(eventTopicFuture);
+ });
admin.namespaces().createNamespace(namespace, Set.of("test"));
// create topic
String topic = namespace + "/test-topic2";