shibd commented on code in PR #15656:
URL: https://github.com/apache/pulsar/pull/15656#discussion_r876798457
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -1373,43 +1373,41 @@ protected DispatchRate
internalGetSubscriptionDispatchRate() {
return
policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
}
- protected void internalSetSubscribeRate(SubscribeRate subscribeRate) {
- validateSuperUserAccess();
+ protected CompletableFuture<Void>
internalSetSubscribeRateAsync(SubscribeRate subscribeRate) {
log.info("[{}] Set namespace subscribe-rate {}/{}", clientAppId(),
namespaceName, subscribeRate);
- try {
+ return validateSuperUserAccessAsync().thenAccept(__ -> {
updatePolicies(namespaceName, policies -> {
policies.clusterSubscribeRate.put(pulsar().getConfiguration().getClusterName(),
subscribeRate);
return policies;
});
log.info("[{}] Successfully updated the subscribeRate for cluster
on namespace {}", clientAppId(),
namespaceName);
- } catch (Exception e) {
+ }).exceptionally(ex -> {
log.error("[{}] Failed to update the subscribeRate for cluster on
namespace {}", clientAppId(),
- namespaceName, e);
- throw new RestException(e);
- }
+ namespaceName, ex);
+ throw new RestException(ex);
+ });
}
- protected void internalDeleteSubscribeRate() {
- validateSuperUserAccess();
- try {
+ protected CompletableFuture<Void> internalDeleteSubscribeRateAsync() {
+ return validateSuperUserAccessAsync().thenAccept(__ -> {
updatePolicies(namespaceName, policies -> {
policies.clusterSubscribeRate.remove(pulsar().getConfiguration().getClusterName());
return policies;
});
log.info("[{}] Successfully delete the subscribeRate for cluster
on namespace {}", clientAppId(),
namespaceName);
- } catch (Exception e) {
+ }).exceptionally(ex -> {
log.error("[{}] Failed to delete the subscribeRate for cluster on
namespace {}", clientAppId(),
Review Comment:
Exception handling can be handed over to the Rest layer.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -1373,43 +1373,41 @@ protected DispatchRate
internalGetSubscriptionDispatchRate() {
return
policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
}
- protected void internalSetSubscribeRate(SubscribeRate subscribeRate) {
- validateSuperUserAccess();
+ protected CompletableFuture<Void>
internalSetSubscribeRateAsync(SubscribeRate subscribeRate) {
log.info("[{}] Set namespace subscribe-rate {}/{}", clientAppId(),
namespaceName, subscribeRate);
- try {
+ return validateSuperUserAccessAsync().thenAccept(__ -> {
updatePolicies(namespaceName, policies -> {
policies.clusterSubscribeRate.put(pulsar().getConfiguration().getClusterName(),
subscribeRate);
return policies;
});
log.info("[{}] Successfully updated the subscribeRate for cluster
on namespace {}", clientAppId(),
namespaceName);
- } catch (Exception e) {
+ }).exceptionally(ex -> {
log.error("[{}] Failed to update the subscribeRate for cluster on
namespace {}", clientAppId(),
Review Comment:
And keep one error log print
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java:
##########
@@ -783,31 +783,52 @@ public void
deleteSubscriptionDispatchRate(@PathParam("tenant") String tenant,
@DELETE
@Path("/{tenant}/{namespace}/subscribeRate")
@ApiOperation(value = "Delete subscribe-rate throttling for all topics of
the namespace")
- @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission") })
- public void deleteSubscribeRate(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission")})
+ public void deleteSubscribeRate(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- internalDeleteSubscribeRate();
+ internalDeleteSubscribeRateAsync()
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("Failed to delete the subscribeRate for cluster
on namespace {}", namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@Path("/{tenant}/{namespace}/subscribeRate")
@ApiOperation(value = "Set subscribe-rate throttling for all topics of the
namespace")
- @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission") })
- public void setSubscribeRate(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
- @ApiParam(value = "Subscribe rate for all topics of the specified
namespace") SubscribeRate subscribeRate) {
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission")})
+ public void setSubscribeRate(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Subscribe rate for all
topics of the specified namespace")
+ SubscribeRate subscribeRate) {
validateNamespaceName(tenant, namespace);
- internalSetSubscribeRate(subscribeRate);
+ internalSetSubscribeRateAsync(subscribeRate)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("Failed to delete the subscribeRate for cluster
on namespace {}", namespaceName, ex);
Review Comment:
```suggestion
log.error("Failed to update the subscribeRate for
cluster on namespace {}", namespaceName, ex);
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -1373,43 +1373,41 @@ protected DispatchRate
internalGetSubscriptionDispatchRate() {
return
policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
}
- protected void internalSetSubscribeRate(SubscribeRate subscribeRate) {
- validateSuperUserAccess();
+ protected CompletableFuture<Void>
internalSetSubscribeRateAsync(SubscribeRate subscribeRate) {
log.info("[{}] Set namespace subscribe-rate {}/{}", clientAppId(),
namespaceName, subscribeRate);
- try {
+ return validateSuperUserAccessAsync().thenAccept(__ -> {
updatePolicies(namespaceName, policies -> {
policies.clusterSubscribeRate.put(pulsar().getConfiguration().getClusterName(),
subscribeRate);
return policies;
});
log.info("[{}] Successfully updated the subscribeRate for cluster
on namespace {}", clientAppId(),
namespaceName);
- } catch (Exception e) {
+ }).exceptionally(ex -> {
log.error("[{}] Failed to update the subscribeRate for cluster on
namespace {}", clientAppId(),
Review Comment:
Exception handling can be handed over to the Rest layer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]