codelipenghui commented on code in PR #15608:
URL: https://github.com/apache/pulsar/pull/15608#discussion_r873256073
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java:
##########
@@ -411,32 +411,53 @@ public void
removeSubscriptionExpirationTime(@PathParam("tenant") String tenant,
@ApiOperation(value = "Get broker side deduplication for all topics in a
namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist") })
- public Boolean getDeduplication(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
+ public void getDeduplication(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetDeduplication();
+ internalGetDeduplicationAsync()
+ .thenAccept(deduplication ->
asyncResponse.resume(deduplication))
+ .exceptionally(ex -> {
+ log.error("Fail get broker deduplication config for
namespace {}", namespace, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@Path("/{tenant}/{namespace}/deduplication")
@ApiOperation(value = "Enable or disable broker side deduplication for all
topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist") })
- public void modifyDeduplication(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
+ public void modifyDeduplication(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
@ApiParam(value = "Flag for disabling or
enabling broker side deduplication "
+ "for all topics in the specified
namespace", required = true)
boolean enableDeduplication) {
validateNamespaceName(tenant, namespace);
- internalModifyDeduplication(enableDeduplication);
+ internalModifyDeduplicationAsync(enableDeduplication)
+ .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))
+ .exceptionally(ex -> {
+ log.error("Fail modify broker deduplication config for
namespace {}", namespaceName, ex);
Review Comment:
```suggestion
log.error("Failed to modify broker deduplication config
for namespace {}", namespaceName, ex);
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java:
##########
@@ -513,10 +513,17 @@ public void
removeNamespaceAntiAffinityGroup(@PathParam("property") String prope
@ApiOperation(hidden = true, value = "Enable or disable broker side
deduplication for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist") })
- public void modifyDeduplication(@PathParam("property") String property,
@PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace, boolean
enableDeduplication) {
+ public void modifyDeduplication(@Suspended AsyncResponse asyncResponse,
@PathParam("property") String property,
+ @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
+ boolean enableDeduplication) {
validateNamespaceName(property, cluster, namespace);
- internalModifyDeduplication(enableDeduplication);
+ internalModifyDeduplicationAsync(enableDeduplication)
+ .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))
Review Comment:
```suggestion
.thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java:
##########
@@ -513,10 +513,17 @@ public void
removeNamespaceAntiAffinityGroup(@PathParam("property") String prope
@ApiOperation(hidden = true, value = "Enable or disable broker side
deduplication for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist") })
- public void modifyDeduplication(@PathParam("property") String property,
@PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace, boolean
enableDeduplication) {
+ public void modifyDeduplication(@Suspended AsyncResponse asyncResponse,
@PathParam("property") String property,
+ @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
+ boolean enableDeduplication) {
validateNamespaceName(property, cluster, namespace);
- internalModifyDeduplication(enableDeduplication);
+ internalModifyDeduplicationAsync(enableDeduplication)
+ .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))
+ .exceptionally(ex -> {
+ log.error("Fail modify broker deduplication config for
namespace {}", namespaceName, ex);
Review Comment:
```suggestion
log.error("Failed to modify broker deduplication config
for namespace {}", namespaceName, ex);
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java:
##########
@@ -411,32 +411,53 @@ public void
removeSubscriptionExpirationTime(@PathParam("tenant") String tenant,
@ApiOperation(value = "Get broker side deduplication for all topics in a
namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist") })
- public Boolean getDeduplication(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
+ public void getDeduplication(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetDeduplication();
+ internalGetDeduplicationAsync()
+ .thenAccept(deduplication ->
asyncResponse.resume(deduplication))
+ .exceptionally(ex -> {
+ log.error("Fail get broker deduplication config for
namespace {}", namespace, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@Path("/{tenant}/{namespace}/deduplication")
@ApiOperation(value = "Enable or disable broker side deduplication for all
topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist") })
- public void modifyDeduplication(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
+ public void modifyDeduplication(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
@ApiParam(value = "Flag for disabling or
enabling broker side deduplication "
+ "for all topics in the specified
namespace", required = true)
boolean enableDeduplication) {
validateNamespaceName(tenant, namespace);
- internalModifyDeduplication(enableDeduplication);
+ internalModifyDeduplicationAsync(enableDeduplication)
+ .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))
+ .exceptionally(ex -> {
+ log.error("Fail modify broker deduplication config for
namespace {}", namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@DELETE
@Path("/{tenant}/{namespace}/deduplication")
@ApiOperation(value = "Remove broker side deduplication for all topics in
a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist") })
- public void removeDeduplication(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
+ public void removeDeduplication(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- internalModifyDeduplication(null);
+ internalModifyDeduplicationAsync(null)
+ .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))
+ .exceptionally(ex -> {
+ log.error("Fail remove broker deduplication config for
namespace {}", namespaceName, ex);
Review Comment:
```suggestion
log.error("Failed to remove broker deduplication config
for namespace {}", namespaceName, ex);
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java:
##########
@@ -411,32 +411,53 @@ public void
removeSubscriptionExpirationTime(@PathParam("tenant") String tenant,
@ApiOperation(value = "Get broker side deduplication for all topics in a
namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist") })
- public Boolean getDeduplication(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
+ public void getDeduplication(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetDeduplication();
+ internalGetDeduplicationAsync()
+ .thenAccept(deduplication ->
asyncResponse.resume(deduplication))
+ .exceptionally(ex -> {
+ log.error("Fail get broker deduplication config for
namespace {}", namespace, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@Path("/{tenant}/{namespace}/deduplication")
@ApiOperation(value = "Enable or disable broker side deduplication for all
topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist") })
- public void modifyDeduplication(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
+ public void modifyDeduplication(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
@ApiParam(value = "Flag for disabling or
enabling broker side deduplication "
+ "for all topics in the specified
namespace", required = true)
boolean enableDeduplication) {
validateNamespaceName(tenant, namespace);
- internalModifyDeduplication(enableDeduplication);
+ internalModifyDeduplicationAsync(enableDeduplication)
+ .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))
+ .exceptionally(ex -> {
+ log.error("Fail modify broker deduplication config for
namespace {}", namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@DELETE
@Path("/{tenant}/{namespace}/deduplication")
@ApiOperation(value = "Remove broker side deduplication for all topics in
a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist") })
- public void removeDeduplication(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
+ public void removeDeduplication(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- internalModifyDeduplication(null);
+ internalModifyDeduplicationAsync(null)
+ .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))
Review Comment:
```suggestion
.thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java:
##########
@@ -411,32 +411,53 @@ public void
removeSubscriptionExpirationTime(@PathParam("tenant") String tenant,
@ApiOperation(value = "Get broker side deduplication for all topics in a
namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist") })
- public Boolean getDeduplication(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
+ public void getDeduplication(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetDeduplication();
+ internalGetDeduplicationAsync()
+ .thenAccept(deduplication ->
asyncResponse.resume(deduplication))
+ .exceptionally(ex -> {
+ log.error("Fail get broker deduplication config for
namespace {}", namespace, ex);
Review Comment:
```suggestion
log.error("Failed to get broker deduplication config for
namespace {}", namespace, ex);
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java:
##########
@@ -513,10 +513,17 @@ public void
removeNamespaceAntiAffinityGroup(@PathParam("property") String prope
@ApiOperation(hidden = true, value = "Enable or disable broker side
deduplication for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist") })
- public void modifyDeduplication(@PathParam("property") String property,
@PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace, boolean
enableDeduplication) {
+ public void modifyDeduplication(@Suspended AsyncResponse asyncResponse,
@PathParam("property") String property,
+ @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
+ boolean enableDeduplication) {
validateNamespaceName(property, cluster, namespace);
- internalModifyDeduplication(enableDeduplication);
+ internalModifyDeduplicationAsync(enableDeduplication)
+ .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))
Review Comment:
From the logs, we can see that before this patch, the response HTTP code is
204.
```
2022-05-16T08:47:05,257 - INFO -
[pulsar-web-30-16:Slf4jRequestLogWriter@62] - 127.0.0.1 - -
[16/May/2022:08:47:05 +0800] "POST
/admin/v2/namespaces/my-property/my-ns/deduplication HTTP/1.1" 204 0 "-"
"Pulsar-Java-v2.11.0-SNAPSHOT" 6
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java:
##########
@@ -411,32 +411,53 @@ public void
removeSubscriptionExpirationTime(@PathParam("tenant") String tenant,
@ApiOperation(value = "Get broker side deduplication for all topics in a
namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist") })
- public Boolean getDeduplication(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
+ public void getDeduplication(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetDeduplication();
+ internalGetDeduplicationAsync()
+ .thenAccept(deduplication ->
asyncResponse.resume(deduplication))
+ .exceptionally(ex -> {
+ log.error("Fail get broker deduplication config for
namespace {}", namespace, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@Path("/{tenant}/{namespace}/deduplication")
@ApiOperation(value = "Enable or disable broker side deduplication for all
topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist") })
- public void modifyDeduplication(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
+ public void modifyDeduplication(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
@ApiParam(value = "Flag for disabling or
enabling broker side deduplication "
+ "for all topics in the specified
namespace", required = true)
boolean enableDeduplication) {
validateNamespaceName(tenant, namespace);
- internalModifyDeduplication(enableDeduplication);
+ internalModifyDeduplicationAsync(enableDeduplication)
+ .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))
Review Comment:
```suggestion
.thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]