This is an automated email from the ASF dual-hosted git repository.
zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 814008066f0 [improve][broker][PIP-149]make deletePersistence method
async in Namespaces (#17206)
814008066f0 is described below
commit 814008066f0217099b2b7cf795647403c32d5204
Author: HuangZeGui <[email protected]>
AuthorDate: Tue Aug 30 11:24:23 2022 +0800
[improve][broker][PIP-149]make deletePersistence method async in Namespaces
(#17206)
* make deletePersistence method async in Namespaces
* update comment
* remove irrelevant import
* reduce unnecessary exceptions
* remove redundant exception log printing logic
Co-authored-by: huangzegui <[email protected]>
---
.../apache/pulsar/broker/admin/impl/NamespacesBase.java | 17 +++++++++++++----
.../org/apache/pulsar/broker/admin/v2/Namespaces.java | 13 ++++++++++---
2 files changed, 23 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 4d8f49be965..08c968a2d5d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1543,10 +1543,10 @@ public abstract class NamespacesBase extends
AdminResource {
}
}
- protected void internalDeletePersistence() {
- validateNamespacePolicyOperation(namespaceName,
PolicyName.PERSISTENCE, PolicyOperation.WRITE);
- validatePoliciesReadOnlyAccess();
- doUpdatePersistence(null);
+ protected CompletableFuture<Void> internalDeletePersistenceAsync() {
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.PERSISTENCE, PolicyOperation.WRITE)
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> doUpdatePersistenceAsync(null));
}
protected void internalSetPersistence(PersistencePolicies persistence) {
@@ -1572,6 +1572,15 @@ public abstract class NamespacesBase extends
AdminResource {
}
}
+ private CompletableFuture<Void>
doUpdatePersistenceAsync(PersistencePolicies persistence) {
+ return updatePoliciesAsync(namespaceName, policies -> {
+ policies.persistence = persistence;
+ return policies;
+ }).thenAccept(__ -> log.info("[{}] Successfully updated persistence
configuration: namespace={}, map={}",
+ clientAppId(), namespaceName, persistence)
+ );
+ }
+
protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse,
boolean authoritative) {
validateNamespaceOperation(namespaceName,
NamespaceOperation.CLEAR_BACKLOG);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 9c24bc65d61..e43e01ced93 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1255,10 +1255,17 @@ public class Namespaces extends NamespacesBase {
@Path("/{tenant}/{namespace}/persistence")
@ApiOperation(value = "Delete the persistence configuration for all topics
on a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission") })
- public void deletePersistence(@PathParam("tenant") String tenant,
- @PathParam("namespace") String
namespace) {
+ public void deletePersistence(@Suspended final AsyncResponse
asyncResponse, @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- internalDeletePersistence();
+ internalDeletePersistenceAsync()
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to delete the persistence for a
namespace {}", clientAppId(), namespaceName,
+ ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST