This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c692c78b241 [improve][broker][PIP-149]make
getMaxConsumersPerSubscription method async in Namespaces (#16507)
c692c78b241 is described below
commit c692c78b241b005b28a010ea1af0e99a97f0402b
Author: Qiang Huang <[email protected]>
AuthorDate: Mon Jul 11 19:20:14 2022 +0800
[improve][broker][PIP-149]make getMaxConsumersPerSubscription method async
in Namespaces (#16507)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 5 -----
.../org/apache/pulsar/broker/admin/v1/Namespaces.java | 18 ++++++++++++++----
.../org/apache/pulsar/broker/admin/v2/Namespaces.java | 14 ++++++++++++--
3 files changed, 26 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 5bdae87c24f..91b59658ba0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2301,11 +2301,6 @@ public abstract class NamespacesBase extends
AdminResource {
}
}
- protected Integer internalGetMaxConsumersPerSubscription() {
- validateNamespacePolicyOperation(namespaceName,
PolicyName.MAX_CONSUMERS, PolicyOperation.READ);
- return
getNamespacePolicies(namespaceName).max_consumers_per_subscription;
- }
-
protected void internalSetMaxConsumersPerSubscription(Integer
maxConsumersPerSubscription) {
validateNamespacePolicyOperation(namespaceName,
PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index a86357198e0..79efbc988f2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -1325,11 +1325,21 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Get maxConsumersPerSubscription config on a
namespace.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist")})
- public Integer getMaxConsumersPerSubscription(@PathParam("property")
String property,
- @PathParam("cluster") String
cluster,
- @PathParam("namespace") String
namespace) {
+ public void getMaxConsumersPerSubscription(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
- return internalGetMaxConsumersPerSubscription();
+ validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.MAX_CONSUMERS, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(polices ->
asyncResponse.resume(polices.max_consumers_per_subscription))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get maxConsumersPerSubscription
config on namespace {}: {} ",
+ clientAppId(), namespaceName,
ex.getCause().getMessage(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index e4ed338e16d..0150e9fd754 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1515,10 +1515,20 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Get maxConsumersPerSubscription config on a
namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
- public Integer getMaxConsumersPerSubscription(@PathParam("tenant") String
tenant,
+ public void getMaxConsumersPerSubscription(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetMaxConsumersPerSubscription();
+ validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.MAX_CONSUMERS, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(polices ->
asyncResponse.resume(polices.max_consumers_per_subscription))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get maxConsumersPerSubscription
config on namespace {}: {} ",
+ clientAppId(), namespaceName,
ex.getCause().getMessage(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST