This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9077a73b30e [fix][broker] Retry to delete the namespace if new topics
created during the namespace deletion (#16676)
9077a73b30e is described below
commit 9077a73b30ea1ad0b8fa2f0cda589797ffcece2e
Author: Penghui Li <[email protected]>
AuthorDate: Wed Jul 20 23:11:44 2022 +0800
[fix][broker] Retry to delete the namespace if new topics created during
the namespace deletion (#16676)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 26 +++++++++++++++-------
1 file changed, 18 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index cdfd6a735cf..f5da9ff6aa3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -346,8 +346,15 @@ public abstract class NamespacesBase extends AdminResource
{
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
- log.error("[{}] Failed to remove namespace {}", clientAppId(),
namespaceName, ex.getCause());
- resumeAsyncResponseExceptionally(asyncResponse, ex);
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ log.error("[{}] Failed to remove namespace {}", clientAppId(),
namespaceName, cause);
+ if (cause instanceof PulsarAdminException.ConflictException) {
+ log.info("[{}] There are new topics created during the
namespace deletion, "
+ + "retry to delete the namespace again.",
namespaceName);
+ pulsar().getExecutor().execute(() ->
internalDeleteNamespace(asyncResponse, authoritative));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
return null;
});
}
@@ -538,15 +545,18 @@ public abstract class NamespacesBase extends
AdminResource {
FutureUtil.waitForAll(bundleFutures).thenCompose(__ ->
internalClearZkSources()).handle((result, exception) -> {
if (exception != null) {
- if (exception.getCause() instanceof PulsarAdminException) {
- asyncResponse.resume(new
RestException((PulsarAdminException) exception.getCause()));
- return null;
+ Throwable cause =
FutureUtil.unwrapCompletionException(exception);
+ if (cause instanceof PulsarAdminException.ConflictException) {
+ log.info("[{}] There are new topics created during the
namespace deletion, "
+ + "retry to force delete the namespace again.",
namespaceName);
+ pulsar().getExecutor().execute(() ->
+ internalDeleteNamespaceForcefully(asyncResponse,
authoritative));
} else {
log.error("[{}] Failed to remove forcefully owned
namespace {}",
- clientAppId(), namespaceName, exception);
- asyncResponse.resume(new
RestException(exception.getCause()));
- return null;
+ clientAppId(), namespaceName, cause);
+ asyncResponse.resume(new RestException(cause));
}
+ return null;
}
asyncResponse.resume(Response.noContent().build());
return null;