This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b0d489e440 [improve][broker] Make AutoSubscriptionCreation async 
(#16329)
7b0d489e440 is described below

commit 7b0d489e44021822b0cbbcff64d045c92d6edc35
Author: gaozhangmin <[email protected]>
AuthorDate: Wed Jul 13 17:30:05 2022 +0800

    [improve][broker] Make AutoSubscriptionCreation async (#16329)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 59 ++++++++-----------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 68 ++++++++++++++++------
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 63 ++++++++++++++------
 3 files changed, 120 insertions(+), 70 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 11361dd23bc..677a9532f62 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -922,44 +922,35 @@ public abstract class NamespacesBase extends 
AdminResource {
                 }));
     }
 
-    protected void internalSetAutoSubscriptionCreation(
-            AsyncResponse asyncResponse, AutoSubscriptionCreationOverride 
autoSubscriptionCreationOverride) {
-        validateNamespacePolicyOperation(namespaceName, 
PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.WRITE);
-        validatePoliciesReadOnlyAccess();
-
+    protected CompletableFuture<Void> 
internalSetAutoSubscriptionCreationAsync(AutoSubscriptionCreationOverride
+                                                               
autoSubscriptionCreationOverride) {
         // Force to read the data s.t. the watch to the cache content is setup.
-        namespaceResources().setPoliciesAsync(namespaceName, policies -> {
-            policies.autoSubscriptionCreationOverride = 
autoSubscriptionCreationOverride;
-            return policies;
-        }).thenApply(r -> {
-            if (autoSubscriptionCreationOverride != null) {
-                String autoOverride = 
autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation() ? "enabled"
-                        : "disabled";
-                log.info("[{}] Successfully {} autoSubscriptionCreation on 
namespace {}", clientAppId(),
-                        autoOverride != null ? autoOverride : "removed", 
namespaceName);
-            }
-            asyncResponse.resume(Response.noContent().build());
-            return null;
-        }).exceptionally(e -> {
-            log.error("[{}] Failed to modify autoSubscriptionCreation status 
on namespace {}", clientAppId(),
-                    namespaceName, e.getCause());
-            if (e.getCause() instanceof NotFoundException) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Namespace does not exist"));
-                return null;
-            }
-            asyncResponse.resume(new RestException(e.getCause()));
-            return null;
-        });
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.AUTO_SUBSCRIPTION_CREATION,
+                PolicyOperation.WRITE)
+                .thenCompose(__ ->  validatePoliciesReadOnlyAccessAsync())
+                        .thenCompose(unused -> 
namespaceResources().setPoliciesAsync(namespaceName, policies -> {
+                            policies.autoSubscriptionCreationOverride = 
autoSubscriptionCreationOverride;
+                            return policies;
+                        }))
+                .thenAccept(r -> {
+                    if (autoSubscriptionCreationOverride != null) {
+                        String autoOverride = 
autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation()
+                                ? "enabled" : "disabled";
+                        log.info("[{}] Successfully {} 
autoSubscriptionCreation on namespace {}", clientAppId(),
+                                autoOverride, namespaceName);
+                    } else {
+                        log.info("[{}] Successfully remove 
autoSubscriptionCreation on namespace {}",
+                                clientAppId(), namespaceName);
+                    }
+                });
     }
 
-    protected AutoSubscriptionCreationOverride 
internalGetAutoSubscriptionCreation() {
-        validateNamespacePolicyOperation(namespaceName, 
PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.READ);
-        Policies policies = getNamespacePolicies(namespaceName);
-        return policies.autoSubscriptionCreationOverride;
-    }
+    protected CompletableFuture<AutoSubscriptionCreationOverride> 
internalGetAutoSubscriptionCreationAsync() {
 
-    protected void internalRemoveAutoSubscriptionCreation(AsyncResponse 
asyncResponse) {
-        internalSetAutoSubscriptionCreation(asyncResponse, null);
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.AUTO_SUBSCRIPTION_CREATION,
+                PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenApply(policies -> 
policies.autoSubscriptionCreationOverride);
     }
 
     protected CompletableFuture<Void> internalModifyDeduplicationAsync(Boolean 
enableDeduplication) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 5f2b8914293..673435fd4c8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -724,14 +724,24 @@ public class Namespaces extends NamespacesBase {
             @PathParam("property") String property, @PathParam("cluster") 
String cluster,
             @PathParam("namespace") String namespace,
             AutoSubscriptionCreationOverride autoSubscriptionCreationOverride) 
{
-        try {
-            validateNamespaceName(property, cluster, namespace);
-            internalSetAutoSubscriptionCreation(asyncResponse, 
autoSubscriptionCreationOverride);
-        } catch (RestException e) {
-            asyncResponse.resume(e);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(property, cluster, namespace);
+        
internalSetAutoSubscriptionCreationAsync(autoSubscriptionCreationOverride)
+                .thenAccept(__ -> {
+                    log.info("[{}] Successfully set autoSubscriptionCreation 
on namespace {}",
+                            clientAppId(), namespaceName);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(e -> {
+                    Throwable ex = FutureUtil.unwrapCompletionException(e);
+                    log.error("[{}] Failed to set autoSubscriptionCreation on 
namespace {}", clientAppId(),
+                            namespaceName, ex);
+                    if (ex instanceof NotFoundException) {
+                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                    } else {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    }
+                    return null;
+                });
     }
 
     @GET
@@ -739,11 +749,23 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Get autoSubscriptionCreation info in a namespace")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist")})
-    public AutoSubscriptionCreationOverride 
getAutoSubscriptionCreation(@PathParam("property") String property,
+    public void getAutoSubscriptionCreation(@Suspended final AsyncResponse 
asyncResponse,
+                                                                        
@PathParam("property") String property,
                                                                         
@PathParam("cluster") String cluster,
                                                                         
@PathParam("namespace") String namespace) {
         validateNamespaceName(property, cluster, namespace);
-        return internalGetAutoSubscriptionCreation();
+        internalGetAutoSubscriptionCreationAsync()
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(e -> {
+                    Throwable ex = FutureUtil.unwrapCompletionException(e);
+                    log.error("Failed to get autoSubscriptionCreation for 
namespace {}", namespaceName, ex);
+                    if (ex instanceof NotFoundException) {
+                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                    } else {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    }
+                    return null;
+                });
     }
 
     @DELETE
@@ -754,14 +776,24 @@ public class Namespaces extends NamespacesBase {
     public void removeAutoSubscriptionCreation(@Suspended final AsyncResponse 
asyncResponse,
                                         @PathParam("property") String 
property, @PathParam("cluster") String cluster,
                                         @PathParam("namespace") String 
namespace) {
-        try {
-            validateNamespaceName(property, cluster, namespace);
-            internalRemoveAutoSubscriptionCreation(asyncResponse);
-        } catch (RestException e) {
-            asyncResponse.resume(e);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(property, cluster, namespace);
+        internalSetAutoSubscriptionCreationAsync(null)
+                .thenAccept(__ -> {
+                    log.info("[{}] Successfully set autoSubscriptionCreation 
on namespace {}",
+                            clientAppId(), namespaceName);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(e -> {
+                    Throwable ex = FutureUtil.unwrapCompletionException(e);
+                    log.error("[{}] Failed to remove autoSubscriptionCreation 
on namespace {}", clientAppId(),
+                            namespaceName, ex);
+                    if (ex instanceof NotFoundException) {
+                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                    } else {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    }
+                    return null;
+                });
     }
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 6cc786c5c01..ab7388cf3eb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -655,14 +655,24 @@ public class Namespaces extends NamespacesBase {
             @PathParam("tenant") String tenant, @PathParam("namespace") String 
namespace,
             @ApiParam(value = "Settings for automatic subscription creation")
                     AutoSubscriptionCreationOverride 
autoSubscriptionCreationOverride) {
-        try {
-            validateNamespaceName(tenant, namespace);
-            internalSetAutoSubscriptionCreation(asyncResponse, 
autoSubscriptionCreationOverride);
-        } catch (RestException e) {
-            asyncResponse.resume(e);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(tenant, namespace);
+        
internalSetAutoSubscriptionCreationAsync(autoSubscriptionCreationOverride)
+                .thenAccept(__ -> {
+                    log.info("[{}] Successfully set autoSubscriptionCreation 
on namespace {}",
+                            clientAppId(), namespaceName);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(e -> {
+                    Throwable ex = FutureUtil.unwrapCompletionException(e);
+                    log.error("[{}] Failed to set autoSubscriptionCreation on 
namespace {}", clientAppId(),
+                            namespaceName, ex);
+                    if (ex instanceof 
MetadataStoreException.NotFoundException) {
+                        asyncResponse.resume(new 
RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
+                    } else {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    }
+                    return null;
+                });
     }
 
     @GET
@@ -670,10 +680,17 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Get autoSubscriptionCreation info in a namespace")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or namespace doesn't 
exist")})
-    public AutoSubscriptionCreationOverride 
getAutoSubscriptionCreation(@PathParam("tenant") String tenant,
+    public void getAutoSubscriptionCreation(@Suspended final AsyncResponse 
asyncResponse,
+                                                                        
@PathParam("tenant") String tenant,
                                                                         
@PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetAutoSubscriptionCreation();
+        internalGetAutoSubscriptionCreationAsync()
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    log.error("Failed to get autoSubscriptionCreation for 
namespace {}", namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
@@ -683,14 +700,24 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist") })
     public void removeAutoSubscriptionCreation(@Suspended final AsyncResponse 
asyncResponse,
                                         @PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace) {
-        try {
-            validateNamespaceName(tenant, namespace);
-            internalRemoveAutoSubscriptionCreation(asyncResponse);
-        } catch (RestException e) {
-            asyncResponse.resume(e);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(tenant, namespace);
+        internalSetAutoSubscriptionCreationAsync(null)
+                .thenAccept(__ -> {
+                    log.info("[{}] Successfully set autoSubscriptionCreation 
on namespace {}",
+                            clientAppId(), namespaceName);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(e -> {
+                    Throwable ex = FutureUtil.unwrapCompletionException(e);
+                    log.error("[{}] Failed to set autoSubscriptionCreation on 
namespace {}", clientAppId(),
+                            namespaceName, ex);
+                    if (ex instanceof 
MetadataStoreException.NotFoundException) {
+                        asyncResponse.resume(new 
RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
+                    } else {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    }
+                    return null;
+                });
     }
 
     @GET

Reply via email to