This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7b0d489e440 [improve][broker] Make AutoSubscriptionCreation async
(#16329)
7b0d489e440 is described below
commit 7b0d489e44021822b0cbbcff64d045c92d6edc35
Author: gaozhangmin <[email protected]>
AuthorDate: Wed Jul 13 17:30:05 2022 +0800
[improve][broker] Make AutoSubscriptionCreation async (#16329)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 59 ++++++++-----------
.../apache/pulsar/broker/admin/v1/Namespaces.java | 68 ++++++++++++++++------
.../apache/pulsar/broker/admin/v2/Namespaces.java | 63 ++++++++++++++------
3 files changed, 120 insertions(+), 70 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 11361dd23bc..677a9532f62 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -922,44 +922,35 @@ public abstract class NamespacesBase extends
AdminResource {
}));
}
- protected void internalSetAutoSubscriptionCreation(
- AsyncResponse asyncResponse, AutoSubscriptionCreationOverride
autoSubscriptionCreationOverride) {
- validateNamespacePolicyOperation(namespaceName,
PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.WRITE);
- validatePoliciesReadOnlyAccess();
-
+ protected CompletableFuture<Void>
internalSetAutoSubscriptionCreationAsync(AutoSubscriptionCreationOverride
+
autoSubscriptionCreationOverride) {
// Force to read the data s.t. the watch to the cache content is setup.
- namespaceResources().setPoliciesAsync(namespaceName, policies -> {
- policies.autoSubscriptionCreationOverride =
autoSubscriptionCreationOverride;
- return policies;
- }).thenApply(r -> {
- if (autoSubscriptionCreationOverride != null) {
- String autoOverride =
autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation() ? "enabled"
- : "disabled";
- log.info("[{}] Successfully {} autoSubscriptionCreation on
namespace {}", clientAppId(),
- autoOverride != null ? autoOverride : "removed",
namespaceName);
- }
- asyncResponse.resume(Response.noContent().build());
- return null;
- }).exceptionally(e -> {
- log.error("[{}] Failed to modify autoSubscriptionCreation status
on namespace {}", clientAppId(),
- namespaceName, e.getCause());
- if (e.getCause() instanceof NotFoundException) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Namespace does not exist"));
- return null;
- }
- asyncResponse.resume(new RestException(e.getCause()));
- return null;
- });
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.AUTO_SUBSCRIPTION_CREATION,
+ PolicyOperation.WRITE)
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(unused ->
namespaceResources().setPoliciesAsync(namespaceName, policies -> {
+ policies.autoSubscriptionCreationOverride =
autoSubscriptionCreationOverride;
+ return policies;
+ }))
+ .thenAccept(r -> {
+ if (autoSubscriptionCreationOverride != null) {
+ String autoOverride =
autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation()
+ ? "enabled" : "disabled";
+ log.info("[{}] Successfully {}
autoSubscriptionCreation on namespace {}", clientAppId(),
+ autoOverride, namespaceName);
+ } else {
+ log.info("[{}] Successfully remove
autoSubscriptionCreation on namespace {}",
+ clientAppId(), namespaceName);
+ }
+ });
}
- protected AutoSubscriptionCreationOverride
internalGetAutoSubscriptionCreation() {
- validateNamespacePolicyOperation(namespaceName,
PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.READ);
- Policies policies = getNamespacePolicies(namespaceName);
- return policies.autoSubscriptionCreationOverride;
- }
+ protected CompletableFuture<AutoSubscriptionCreationOverride>
internalGetAutoSubscriptionCreationAsync() {
- protected void internalRemoveAutoSubscriptionCreation(AsyncResponse
asyncResponse) {
- internalSetAutoSubscriptionCreation(asyncResponse, null);
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.AUTO_SUBSCRIPTION_CREATION,
+ PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenApply(policies ->
policies.autoSubscriptionCreationOverride);
}
protected CompletableFuture<Void> internalModifyDeduplicationAsync(Boolean
enableDeduplication) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 5f2b8914293..673435fd4c8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -724,14 +724,24 @@ public class Namespaces extends NamespacesBase {
@PathParam("property") String property, @PathParam("cluster")
String cluster,
@PathParam("namespace") String namespace,
AutoSubscriptionCreationOverride autoSubscriptionCreationOverride)
{
- try {
- validateNamespaceName(property, cluster, namespace);
- internalSetAutoSubscriptionCreation(asyncResponse,
autoSubscriptionCreationOverride);
- } catch (RestException e) {
- asyncResponse.resume(e);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(property, cluster, namespace);
+
internalSetAutoSubscriptionCreationAsync(autoSubscriptionCreationOverride)
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully set autoSubscriptionCreation
on namespace {}",
+ clientAppId(), namespaceName);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(e -> {
+ Throwable ex = FutureUtil.unwrapCompletionException(e);
+ log.error("[{}] Failed to set autoSubscriptionCreation on
namespace {}", clientAppId(),
+ namespaceName, ex);
+ if (ex instanceof NotFoundException) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND, "Namespace does not exist"));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
+ return null;
+ });
}
@GET
@@ -739,11 +749,23 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Get autoSubscriptionCreation info in a namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist")})
- public AutoSubscriptionCreationOverride
getAutoSubscriptionCreation(@PathParam("property") String property,
+ public void getAutoSubscriptionCreation(@Suspended final AsyncResponse
asyncResponse,
+
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
- return internalGetAutoSubscriptionCreation();
+ internalGetAutoSubscriptionCreationAsync()
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(e -> {
+ Throwable ex = FutureUtil.unwrapCompletionException(e);
+ log.error("Failed to get autoSubscriptionCreation for
namespace {}", namespaceName, ex);
+ if (ex instanceof NotFoundException) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND, "Namespace does not exist"));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
+ return null;
+ });
}
@DELETE
@@ -754,14 +776,24 @@ public class Namespaces extends NamespacesBase {
public void removeAutoSubscriptionCreation(@Suspended final AsyncResponse
asyncResponse,
@PathParam("property") String
property, @PathParam("cluster") String cluster,
@PathParam("namespace") String
namespace) {
- try {
- validateNamespaceName(property, cluster, namespace);
- internalRemoveAutoSubscriptionCreation(asyncResponse);
- } catch (RestException e) {
- asyncResponse.resume(e);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(property, cluster, namespace);
+ internalSetAutoSubscriptionCreationAsync(null)
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully set autoSubscriptionCreation
on namespace {}",
+ clientAppId(), namespaceName);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(e -> {
+ Throwable ex = FutureUtil.unwrapCompletionException(e);
+ log.error("[{}] Failed to remove autoSubscriptionCreation
on namespace {}", clientAppId(),
+ namespaceName, ex);
+ if (ex instanceof NotFoundException) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND, "Namespace does not exist"));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
+ return null;
+ });
}
@GET
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 6cc786c5c01..ab7388cf3eb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -655,14 +655,24 @@ public class Namespaces extends NamespacesBase {
@PathParam("tenant") String tenant, @PathParam("namespace") String
namespace,
@ApiParam(value = "Settings for automatic subscription creation")
AutoSubscriptionCreationOverride
autoSubscriptionCreationOverride) {
- try {
- validateNamespaceName(tenant, namespace);
- internalSetAutoSubscriptionCreation(asyncResponse,
autoSubscriptionCreationOverride);
- } catch (RestException e) {
- asyncResponse.resume(e);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(tenant, namespace);
+
internalSetAutoSubscriptionCreationAsync(autoSubscriptionCreationOverride)
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully set autoSubscriptionCreation
on namespace {}",
+ clientAppId(), namespaceName);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(e -> {
+ Throwable ex = FutureUtil.unwrapCompletionException(e);
+ log.error("[{}] Failed to set autoSubscriptionCreation on
namespace {}", clientAppId(),
+ namespaceName, ex);
+ if (ex instanceof
MetadataStoreException.NotFoundException) {
+ asyncResponse.resume(new
RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
+ return null;
+ });
}
@GET
@@ -670,10 +680,17 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Get autoSubscriptionCreation info in a namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or namespace doesn't
exist")})
- public AutoSubscriptionCreationOverride
getAutoSubscriptionCreation(@PathParam("tenant") String tenant,
+ public void getAutoSubscriptionCreation(@Suspended final AsyncResponse
asyncResponse,
+
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetAutoSubscriptionCreation();
+ internalGetAutoSubscriptionCreationAsync()
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ log.error("Failed to get autoSubscriptionCreation for
namespace {}", namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@DELETE
@@ -683,14 +700,24 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist") })
public void removeAutoSubscriptionCreation(@Suspended final AsyncResponse
asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
- try {
- validateNamespaceName(tenant, namespace);
- internalRemoveAutoSubscriptionCreation(asyncResponse);
- } catch (RestException e) {
- asyncResponse.resume(e);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(tenant, namespace);
+ internalSetAutoSubscriptionCreationAsync(null)
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully set autoSubscriptionCreation
on namespace {}",
+ clientAppId(), namespaceName);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(e -> {
+ Throwable ex = FutureUtil.unwrapCompletionException(e);
+ log.error("[{}] Failed to set autoSubscriptionCreation on
namespace {}", clientAppId(),
+ namespaceName, ex);
+ if (ex instanceof
MetadataStoreException.NotFoundException) {
+ asyncResponse.resume(new
RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
+ return null;
+ });
}
@GET