This is an automated email from the ASF dual-hosted git repository.

houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bb290c8d39 [improve][broker][PIP-149]make setPersistence method async 
in Namespaces (#17421)
7bb290c8d39 is described below

commit 7bb290c8d394811d436a3e97054d81d4c8208af4
Author: HuangZeGui <[email protected]>
AuthorDate: Sun Sep 11 15:34:50 2022 +0800

    [improve][broker][PIP-149]make setPersistence method async in Namespaces 
(#17421)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 26 ++++---------------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 14 +++++++---
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 12 +++++++--
 .../apache/pulsar/broker/admin/NamespacesTest.java | 30 +++++++++++++++-------
 4 files changed, 47 insertions(+), 35 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index d0f1ef0acc2..b8f3ac8a7a3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1509,27 +1509,11 @@ public abstract class NamespacesBase extends 
AdminResource {
                 .thenCompose(__ -> doUpdatePersistenceAsync(null));
     }
 
-    protected void internalSetPersistence(PersistencePolicies persistence) {
-        validateNamespacePolicyOperation(namespaceName, 
PolicyName.PERSISTENCE, PolicyOperation.WRITE);
-        validatePoliciesReadOnlyAccess();
-        validatePersistencePolicies(persistence);
-
-        doUpdatePersistence(persistence);
-    }
-
-    private void doUpdatePersistence(PersistencePolicies persistence) {
-        try {
-            updatePolicies(namespaceName, policies -> {
-                policies.persistence = persistence;
-                return policies;
-            });
-            log.info("[{}] Successfully updated persistence configuration: 
namespace={}, map={}", clientAppId(),
-                    namespaceName, 
jsonMapper().writeValueAsString(persistence));
-        } catch (Exception e) {
-            log.error("[{}] Failed to update persistence configuration for 
namespace {}", clientAppId(), namespaceName,
-                    e);
-            throw new RestException(e);
-        }
+    protected CompletableFuture<Void> 
internalSetPersistenceAsync(PersistencePolicies persistence) {
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.PERSISTENCE, PolicyOperation.WRITE)
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenAccept(__ -> validatePersistencePolicies(persistence))
+                .thenCompose(__ -> doUpdatePersistenceAsync(persistence));
     }
 
     private CompletableFuture<Void> 
doUpdatePersistenceAsync(PersistencePolicies persistence) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 4f603aaeb30..54f4c0c85c7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -1213,10 +1213,18 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 404, message = "Namespace does not exist"),
             @ApiResponse(code = 409, message = "Concurrent modification"),
             @ApiResponse(code = 400, message = "Invalid persistence policies") 
})
-    public void setPersistence(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, PersistencePolicies 
persistence) {
+    public void setPersistence(@Suspended final AsyncResponse asyncResponse, 
@PathParam("property") String property,
+                               @PathParam("cluster") String cluster, 
@PathParam("namespace") String namespace,
+                               PersistencePolicies persistence) {
         validateNamespaceName(property, cluster, namespace);
-        internalSetPersistence(persistence);
+        internalSetPersistenceAsync(persistence)
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to update the persistence for a 
namespace {}", clientAppId(), namespaceName,
+                            ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 445641718da..03f5bbdd115 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1235,11 +1235,19 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 404, message = "Namespace does not exist"),
             @ApiResponse(code = 409, message = "Concurrent modification"),
             @ApiResponse(code = 400, message = "Invalid persistence 
policies")})
-    public void setPersistence(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace,
+    public void setPersistence(@Suspended final AsyncResponse asyncResponse, 
@PathParam("tenant") String tenant,
+                               @PathParam("namespace") String namespace,
                                @ApiParam(value = "Persistence policies for the 
specified namespace", required = true)
                                        PersistencePolicies persistence) {
         validateNamespaceName(tenant, namespace);
-        internalSetPersistence(persistence);
+        internalSetPersistenceAsync(persistence)
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to update the persistence for a 
namespace {}", clientAppId(), namespaceName,
+                            ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index c23407bb447..73cf4914ffe 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -101,6 +101,7 @@ import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
 import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
 import org.apache.zookeeper.KeeperException.Code;
@@ -196,6 +197,14 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
                 
.validateNamespacePolicyOperation(NamespaceName.get("other-tenant/use/test-namespace-1"),
                         PolicyName.RETENTION, PolicyOperation.WRITE);
 
+        doReturn(FutureUtil.failedFuture(new 
RestException(Status.UNAUTHORIZED, "unauthorized"))).when(namespaces)
+                
.validateNamespacePolicyOperationAsync(NamespaceName.get("other-tenant/use/test-namespace-1"),
+                        PolicyName.PERSISTENCE, PolicyOperation.WRITE);
+
+        doReturn(FutureUtil.failedFuture(new 
RestException(Status.UNAUTHORIZED, "unauthorized"))).when(namespaces)
+                
.validateNamespacePolicyOperationAsync(NamespaceName.get("other-tenant/use/test-namespace-1"),
+                        PolicyName.RETENTION, PolicyOperation.WRITE);
+
         nsSvc = pulsar.getNamespaceService();
     }
 
@@ -1065,8 +1074,12 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
     public void testPersistence() throws Exception {
         NamespaceName testNs = this.testLocalNamespaces.get(0);
         PersistencePolicies persistence1 = new PersistencePolicies(3, 2, 1, 
0.0);
-        namespaces.setPersistence(testNs.getTenant(), testNs.getCluster(), 
testNs.getLocalName(), persistence1);
         AsyncResponse response = mock(AsyncResponse.class);
+        namespaces.setPersistence(response, testNs.getTenant(), 
testNs.getCluster(), testNs.getLocalName(), persistence1);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
+        response = mock(AsyncResponse.class);
         namespaces.getPersistence(response, testNs.getTenant(), 
testNs.getCluster(), testNs.getLocalName());
         ArgumentCaptor<PersistencePolicies> captor = 
ArgumentCaptor.forClass(PersistencePolicies.class);
         verify(response, timeout(5000).times(1)).resume(captor.capture());
@@ -1076,14 +1089,13 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testPersistenceUnauthorized() throws Exception {
-        try {
-            NamespaceName testNs = this.testLocalNamespaces.get(3);
-            PersistencePolicies persistence = new PersistencePolicies(3, 2, 1, 
0.0);
-            namespaces.setPersistence(testNs.getTenant(), testNs.getCluster(), 
testNs.getLocalName(), persistence);
-            fail("Should fail");
-        } catch (RestException e) {
-            assertEquals(e.getResponse().getStatus(), 
Status.UNAUTHORIZED.getStatusCode());
-        }
+        NamespaceName testNs = this.testLocalNamespaces.get(3);
+        PersistencePolicies persistence = new PersistencePolicies(3, 2, 1, 
0.0);
+        AsyncResponse response = mock(AsyncResponse.class);
+        namespaces.setPersistence(response, testNs.getTenant(), 
testNs.getCluster(), testNs.getLocalName(), persistence);
+        ArgumentCaptor<RestException> errorCaptor = 
ArgumentCaptor.forClass(RestException.class);
+        verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
+        assertEquals(errorCaptor.getValue().getResponse().getStatus(), 
Response.Status.UNAUTHORIZED.getStatusCode());
     }
 
     @Test

Reply via email to