This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new ca45181f4db [improve][broker]Ensure namespace deletion doesn't fail
(#22627)
ca45181f4db is described below
commit ca45181f4db85f7a381bd9846ba45df2cbd81c36
Author: Enrico Olivelli <[email protected]>
AuthorDate: Mon May 13 11:50:39 2024 +0200
[improve][broker]Ensure namespace deletion doesn't fail (#22627)
(cherry picked from commit 936afecede8374b14d13e9d48e9372fec1c27447)
---
.../pulsar/broker/resources/BaseResources.java | 27 ++++++++---------
.../broker/resources/LocalPoliciesResources.java | 2 +-
.../broker/resources/NamespaceResources.java | 17 +++++++++--
.../pulsar/broker/resources/TopicResources.java | 35 ++++------------------
.../pulsar/broker/admin/impl/NamespacesBase.java | 16 ++++++++--
.../SystemTopicBasedTopicPoliciesService.java | 3 +-
.../apache/pulsar/metadata/api/MetadataStore.java | 22 ++++++++++++++
.../metadata/impl/AbstractMetadataStore.java | 13 ++++----
8 files changed, 78 insertions(+), 57 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index 4011a482075..00e381e0729 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -197,22 +197,21 @@ public class BaseResources<T> {
}
protected CompletableFuture<Void> deleteIfExistsAsync(String path) {
- return cache.exists(path).thenCompose(exists -> {
- if (!exists) {
- return CompletableFuture.completedFuture(null);
+ log.info("Deleting path: {}", path);
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ cache.delete(path).whenComplete((ignore, ex) -> {
+ if (ex != null && ex.getCause() instanceof
MetadataStoreException.NotFoundException) {
+ log.info("Path {} did not exist in metadata store", path);
+ future.complete(null);
+ } else if (ex != null) {
+ log.info("Failed to delete path from metadata store: {}",
path, ex);
+ future.completeExceptionally(ex);
+ } else {
+ log.info("Deleted path from metadata store: {}", path);
+ future.complete(null);
}
- CompletableFuture<Void> future = new CompletableFuture<>();
- cache.delete(path).whenComplete((ignore, ex) -> {
- if (ex != null && ex.getCause() instanceof
MetadataStoreException.NotFoundException) {
- future.complete(null);
- } else if (ex != null) {
- future.completeExceptionally(ex);
- } else {
- future.complete(null);
- }
- });
- return future;
});
+ return future;
}
protected boolean exists(String path) throws MetadataStoreException {
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
index c6b658c3bd0..ae3479fde59 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
@@ -79,7 +79,7 @@ public class LocalPoliciesResources extends
BaseResources<LocalPolicies> {
}
public CompletableFuture<Void> deleteLocalPoliciesAsync(NamespaceName ns) {
- return deleteAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString()));
+ return deleteIfExistsAsync(joinPath(LOCAL_POLICIES_ROOT,
ns.toString()));
}
public CompletableFuture<Void> deleteLocalPoliciesTenantAsync(String
tenant) {
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 975b23192f9..9d7c60cd344 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -115,7 +115,7 @@ public class NamespaceResources extends
BaseResources<Policies> {
}
public CompletableFuture<Void> deletePoliciesAsync(NamespaceName ns){
- return deleteAsync(joinPath(BASE_POLICIES_PATH, ns.toString()));
+ return deleteIfExistsAsync(joinPath(BASE_POLICIES_PATH,
ns.toString()));
}
public Optional<Policies> getPolicies(NamespaceName ns) throws
MetadataStoreException{
@@ -155,10 +155,18 @@ public class NamespaceResources extends
BaseResources<Policies> {
&& path.substring(LOCAL_POLICIES_ROOT.length() +
1).contains("/");
}
- // clear resource of `/namespace/{namespaceName}` for zk-node
+ /**
+ * Clear resource of `/namespace/{namespaceName}` for zk-node.
+ * @param ns the namespace name
+ * @return a handle to the results of the operation
+ * */
+ //
public CompletableFuture<Void> deleteNamespaceAsync(NamespaceName ns) {
final String namespacePath = joinPath(NAMESPACE_BASE_PATH,
ns.toString());
- return deleteIfExistsAsync(namespacePath);
+ // please beware that this will delete all the children of the
namespace
+ // including the ownership nodes (ephemeral nodes)
+ // see ServiceUnitUtils.path(ns) for the ownership node path
+ return getStore().deleteRecursive(namespacePath);
}
// clear resource of `/namespace/{tenant}` for zk-node
@@ -303,11 +311,14 @@ public class NamespaceResources extends
BaseResources<Policies> {
public CompletableFuture<Void>
clearPartitionedTopicMetadataAsync(NamespaceName namespaceName) {
final String globalPartitionedPath =
joinPath(PARTITIONED_TOPIC_PATH, namespaceName.toString());
+ log.info("Clearing partitioned topic metadata for namespace {},
path is {}",
+ namespaceName, globalPartitionedPath);
return getStore().deleteRecursive(globalPartitionedPath);
}
public CompletableFuture<Void> clearPartitionedTopicTenantAsync(String
tenant) {
final String partitionedTopicPath =
joinPath(PARTITIONED_TOPIC_PATH, tenant);
+ log.info("Clearing partitioned topic metadata for tenant {}, path
is {}", tenant, partitionedTopicPath);
return deleteIfExistsAsync(partitionedTopicPath);
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
index 413184764f5..f607da76b3c 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
@@ -75,11 +75,6 @@ public class TopicResources {
);
}
- public CompletableFuture<Void> deletePersistentTopicAsync(TopicName topic)
{
- String path = MANAGED_LEDGER_PATH + "/" +
topic.getPersistenceNamingEncoding();
- return store.delete(path, Optional.of(-1L));
- }
-
public CompletableFuture<Void> createPersistentTopicAsync(TopicName topic)
{
String path = MANAGED_LEDGER_PATH + "/" +
topic.getPersistenceNamingEncoding();
return store.put(path, new byte[0], Optional.of(-1L))
@@ -93,38 +88,20 @@ public class TopicResources {
public CompletableFuture<Void> clearNamespacePersistence(NamespaceName ns)
{
String path = MANAGED_LEDGER_PATH + "/" + ns;
- return store.exists(path)
- .thenCompose(exists -> {
- if (exists) {
- return store.delete(path, Optional.empty());
- } else {
- return CompletableFuture.completedFuture(null);
- }
- });
+ log.info("Clearing namespace persistence for namespace: {}, path {}",
ns, path);
+ return store.deleteIfExists(path, Optional.empty());
}
public CompletableFuture<Void> clearDomainPersistence(NamespaceName ns) {
String path = MANAGED_LEDGER_PATH + "/" + ns + "/persistent";
- return store.exists(path)
- .thenCompose(exists -> {
- if (exists) {
- return store.delete(path, Optional.empty());
- } else {
- return CompletableFuture.completedFuture(null);
- }
- });
+ log.info("Clearing domain persistence for namespace: {}, path {}", ns,
path);
+ return store.deleteIfExists(path, Optional.empty());
}
public CompletableFuture<Void> clearTenantPersistence(String tenant) {
String path = MANAGED_LEDGER_PATH + "/" + tenant;
- return store.exists(path)
- .thenCompose(exists -> {
- if (exists) {
- return store.deleteRecursive(path);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- });
+ log.info("Clearing tenant persistence for tenant: {}, path {}",
tenant, path);
+ return store.deleteRecursive(path);
}
void handleNotification(Notification notification) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 7d27605fb5e..4d26fe2a4c3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -309,8 +309,14 @@ public abstract class NamespacesBase extends AdminResource
{
clientAppId(), ex);
return FutureUtil.failedFuture(ex);
}
+ log.info("[{}] Deleting namespace
bundle {}/{}", clientAppId(),
+ namespaceName,
bundle.getBundleRange());
return
admin.namespaces().deleteNamespaceBundleAsync(namespaceName.toString(),
bundle.getBundleRange(),
force);
+ } else {
+ log.warn("[{}] Skipping deleting
namespace bundle {}/{} "
+ + "as it's not owned
by any broker",
+ clientAppId(), namespaceName,
bundle.getBundleRange());
}
return
CompletableFuture.completedFuture(null);
})
@@ -321,8 +327,11 @@ public abstract class NamespacesBase extends AdminResource
{
final Throwable rc =
FutureUtil.unwrapCompletionException(error);
if (rc instanceof MetadataStoreException) {
if (rc.getCause() != null && rc.getCause()
instanceof KeeperException.NotEmptyException) {
+ KeeperException.NotEmptyException ne =
+ (KeeperException.NotEmptyException)
rc.getCause();
log.info("[{}] There are in-flight topics
created during the namespace deletion, "
- + "retry to delete the namespace
again.", namespaceName);
+ + "retry to delete the namespace
again. (path {} is not empty on metadata)",
+ namespaceName, ne.getPath());
final int next = retryTimes - 1;
if (next > 0) {
// async recursive
@@ -330,7 +339,8 @@ public abstract class NamespacesBase extends AdminResource {
} else {
callback.completeExceptionally(
new RestException(Status.CONFLICT,
"The broker still have in-flight topics"
- + " created during
namespace deletion, please try again."));
+ + " created during
namespace deletion (path " + ne.getPath() + ") "
+ + "is not empty on
metadata store, please try again."));
// drop out recursive
}
return;
@@ -476,6 +486,8 @@ public abstract class NamespacesBase extends AdminResource {
@SuppressWarnings("deprecation")
protected CompletableFuture<Void>
internalDeleteNamespaceBundleAsync(String bundleRange, boolean authoritative,
boolean force) {
+ log.info("[{}] Deleting namespace bundle {}/{} authoritative:{}
force:{}",
+ clientAppId(), namespaceName, bundleRange, authoritative,
force);
return validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.DELETE_BUNDLE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 6d18d6d61b0..5156246bb5e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -543,7 +543,8 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
} else {
Throwable cause =
FutureUtil.unwrapCompletionException(ex);
if (cause instanceof
PulsarClientException.AlreadyClosedException) {
- log.warn("Read more topic policies exception,
close the read now!", ex);
+ log.info("Closing the topic policies reader for
{}",
+ reader.getSystemTopic().getTopicName());
cleanCacheAndCloseReader(
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
} else {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index 33942c19520..89b0e7a6fe1 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -23,9 +23,12 @@ import com.google.common.annotations.Beta;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Metadata store client interface.
@@ -36,6 +39,8 @@ import
org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
@Beta
public interface MetadataStore extends AutoCloseable {
+ Logger LOGGER = LoggerFactory.getLogger(MetadataStore.class);
+
/**
* Read the value of one key, identified by the path
*
@@ -121,6 +126,23 @@ public interface MetadataStore extends AutoCloseable {
*/
CompletableFuture<Void> delete(String path, Optional<Long>
expectedVersion);
+ default CompletableFuture<Void> deleteIfExists(String path, Optional<Long>
expectedVersion) {
+ return delete(path, expectedVersion)
+ .exceptionally(e -> {
+ if (e.getCause() instanceof NotFoundException) {
+ LOGGER.info("Path {} not found while deleting (this is
not a problem)", path);
+ return null;
+ } else {
+ if (expectedVersion.isEmpty()) {
+ LOGGER.info("Failed to delete path {}", path, e);
+ } else {
+ LOGGER.info("Failed to delete path {} with
expected version {}", path, expectedVersion, e);
+ }
+ throw new CompletionException(e);
+ }
+ });
+ }
+
/**
* Delete a key-value pair and all the children nodes.
*
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 77fd21f1342..7315e6a04a2 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -357,6 +357,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
@Override
public final CompletableFuture<Void> delete(String path, Optional<Long>
expectedVersion) {
+ log.info("Deleting path: {} (v. {})", path, expectedVersion);
if (isClosed()) {
return alreadyClosedFailedFuture();
}
@@ -401,11 +402,13 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
}
metadataCaches.forEach(c -> c.invalidate(path));
+ log.info("Deleted path: {} (v. {})", path, expectedVersion);
});
}
@Override
public CompletableFuture<Void> deleteRecursive(String path) {
+ log.info("Deleting recursively path: {}", path);
if (isClosed()) {
return alreadyClosedFailedFuture();
}
@@ -414,13 +417,9 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
children.stream()
.map(child -> deleteRecursive(path + "/" +
child))
.collect(Collectors.toList())))
- .thenCompose(__ -> exists(path))
- .thenCompose(exists -> {
- if (exists) {
- return delete(path, Optional.empty());
- } else {
- return CompletableFuture.completedFuture(null);
- }
+ .thenCompose(__ -> {
+ log.info("After deleting all children, now deleting path:
{}", path);
+ return deleteIfExists(path, Optional.empty());
});
}