codelipenghui commented on code in PR #15608:
URL: https://github.com/apache/pulsar/pull/15608#discussion_r873256073


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java:
##########
@@ -411,32 +411,53 @@ public void 
removeSubscriptionExpirationTime(@PathParam("tenant") String tenant,
     @ApiOperation(value = "Get broker side deduplication for all topics in a 
namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist") })
-    public Boolean getDeduplication(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace) {
+    public void getDeduplication(@Suspended AsyncResponse asyncResponse, 
@PathParam("tenant") String tenant,
+                                 @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetDeduplication();
+        internalGetDeduplicationAsync()
+                .thenAccept(deduplication -> 
asyncResponse.resume(deduplication))
+                .exceptionally(ex -> {
+                    log.error("Fail get broker deduplication config for 
namespace {}", namespace, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
     @Path("/{tenant}/{namespace}/deduplication")
     @ApiOperation(value = "Enable or disable broker side deduplication for all 
topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist") })
-    public void modifyDeduplication(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace,
+    public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, 
@PathParam("tenant") String tenant,
+                                    @PathParam("namespace") String namespace,
                                     @ApiParam(value = "Flag for disabling or 
enabling broker side deduplication "
                                             + "for all topics in the specified 
namespace", required = true)
                                             boolean enableDeduplication) {
         validateNamespaceName(tenant, namespace);
-        internalModifyDeduplication(enableDeduplication);
+        internalModifyDeduplicationAsync(enableDeduplication)
+                .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))
+                .exceptionally(ex -> {
+                    log.error("Fail modify broker deduplication config for 
namespace {}", namespaceName, ex);

Review Comment:
   ```suggestion
                       log.error("Failed to modify broker deduplication config 
for namespace {}", namespaceName, ex);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java:
##########
@@ -513,10 +513,17 @@ public void 
removeNamespaceAntiAffinityGroup(@PathParam("property") String prope
     @ApiOperation(hidden = true, value = "Enable or disable broker side 
deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist") })
-    public void modifyDeduplication(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, boolean 
enableDeduplication) {
+    public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, 
@PathParam("property") String property,
+                                    @PathParam("cluster") String cluster, 
@PathParam("namespace") String namespace,
+                                    boolean enableDeduplication) {
         validateNamespaceName(property, cluster, namespace);
-        internalModifyDeduplication(enableDeduplication);
+        internalModifyDeduplicationAsync(enableDeduplication)
+                .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))

Review Comment:
   ```suggestion
                   .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java:
##########
@@ -513,10 +513,17 @@ public void 
removeNamespaceAntiAffinityGroup(@PathParam("property") String prope
     @ApiOperation(hidden = true, value = "Enable or disable broker side 
deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist") })
-    public void modifyDeduplication(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, boolean 
enableDeduplication) {
+    public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, 
@PathParam("property") String property,
+                                    @PathParam("cluster") String cluster, 
@PathParam("namespace") String namespace,
+                                    boolean enableDeduplication) {
         validateNamespaceName(property, cluster, namespace);
-        internalModifyDeduplication(enableDeduplication);
+        internalModifyDeduplicationAsync(enableDeduplication)
+                .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))
+                .exceptionally(ex -> {
+                    log.error("Fail modify broker deduplication config for 
namespace {}", namespaceName, ex);

Review Comment:
   ```suggestion
                       log.error("Failed to modify broker deduplication config 
for namespace {}", namespaceName, ex);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java:
##########
@@ -411,32 +411,53 @@ public void 
removeSubscriptionExpirationTime(@PathParam("tenant") String tenant,
     @ApiOperation(value = "Get broker side deduplication for all topics in a 
namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist") })
-    public Boolean getDeduplication(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace) {
+    public void getDeduplication(@Suspended AsyncResponse asyncResponse, 
@PathParam("tenant") String tenant,
+                                 @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetDeduplication();
+        internalGetDeduplicationAsync()
+                .thenAccept(deduplication -> 
asyncResponse.resume(deduplication))
+                .exceptionally(ex -> {
+                    log.error("Fail get broker deduplication config for 
namespace {}", namespace, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
     @Path("/{tenant}/{namespace}/deduplication")
     @ApiOperation(value = "Enable or disable broker side deduplication for all 
topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist") })
-    public void modifyDeduplication(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace,
+    public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, 
@PathParam("tenant") String tenant,
+                                    @PathParam("namespace") String namespace,
                                     @ApiParam(value = "Flag for disabling or 
enabling broker side deduplication "
                                             + "for all topics in the specified 
namespace", required = true)
                                             boolean enableDeduplication) {
         validateNamespaceName(tenant, namespace);
-        internalModifyDeduplication(enableDeduplication);
+        internalModifyDeduplicationAsync(enableDeduplication)
+                .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))
+                .exceptionally(ex -> {
+                    log.error("Fail modify broker deduplication config for 
namespace {}", namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
     @Path("/{tenant}/{namespace}/deduplication")
     @ApiOperation(value = "Remove broker side deduplication for all topics in 
a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist") })
-    public void removeDeduplication(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace) {
+    public void removeDeduplication(@Suspended AsyncResponse asyncResponse, 
@PathParam("tenant") String tenant,
+                                    @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        internalModifyDeduplication(null);
+        internalModifyDeduplicationAsync(null)
+                .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))
+                .exceptionally(ex -> {
+                    log.error("Fail remove broker deduplication config for 
namespace {}", namespaceName, ex);

Review Comment:
   ```suggestion
                       log.error("Failed to remove broker deduplication config 
for namespace {}", namespaceName, ex);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java:
##########
@@ -411,32 +411,53 @@ public void 
removeSubscriptionExpirationTime(@PathParam("tenant") String tenant,
     @ApiOperation(value = "Get broker side deduplication for all topics in a 
namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist") })
-    public Boolean getDeduplication(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace) {
+    public void getDeduplication(@Suspended AsyncResponse asyncResponse, 
@PathParam("tenant") String tenant,
+                                 @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetDeduplication();
+        internalGetDeduplicationAsync()
+                .thenAccept(deduplication -> 
asyncResponse.resume(deduplication))
+                .exceptionally(ex -> {
+                    log.error("Fail get broker deduplication config for 
namespace {}", namespace, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
     @Path("/{tenant}/{namespace}/deduplication")
     @ApiOperation(value = "Enable or disable broker side deduplication for all 
topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist") })
-    public void modifyDeduplication(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace,
+    public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, 
@PathParam("tenant") String tenant,
+                                    @PathParam("namespace") String namespace,
                                     @ApiParam(value = "Flag for disabling or 
enabling broker side deduplication "
                                             + "for all topics in the specified 
namespace", required = true)
                                             boolean enableDeduplication) {
         validateNamespaceName(tenant, namespace);
-        internalModifyDeduplication(enableDeduplication);
+        internalModifyDeduplicationAsync(enableDeduplication)
+                .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))
+                .exceptionally(ex -> {
+                    log.error("Fail modify broker deduplication config for 
namespace {}", namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
     @Path("/{tenant}/{namespace}/deduplication")
     @ApiOperation(value = "Remove broker side deduplication for all topics in 
a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist") })
-    public void removeDeduplication(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace) {
+    public void removeDeduplication(@Suspended AsyncResponse asyncResponse, 
@PathParam("tenant") String tenant,
+                                    @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        internalModifyDeduplication(null);
+        internalModifyDeduplicationAsync(null)
+                .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))

Review Comment:
   ```suggestion
                   .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java:
##########
@@ -411,32 +411,53 @@ public void 
removeSubscriptionExpirationTime(@PathParam("tenant") String tenant,
     @ApiOperation(value = "Get broker side deduplication for all topics in a 
namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist") })
-    public Boolean getDeduplication(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace) {
+    public void getDeduplication(@Suspended AsyncResponse asyncResponse, 
@PathParam("tenant") String tenant,
+                                 @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetDeduplication();
+        internalGetDeduplicationAsync()
+                .thenAccept(deduplication -> 
asyncResponse.resume(deduplication))
+                .exceptionally(ex -> {
+                    log.error("Fail get broker deduplication config for 
namespace {}", namespace, ex);

Review Comment:
   ```suggestion
                       log.error("Failed to get broker deduplication config for 
namespace {}", namespace, ex);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java:
##########
@@ -513,10 +513,17 @@ public void 
removeNamespaceAntiAffinityGroup(@PathParam("property") String prope
     @ApiOperation(hidden = true, value = "Enable or disable broker side 
deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist") })
-    public void modifyDeduplication(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, boolean 
enableDeduplication) {
+    public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, 
@PathParam("property") String property,
+                                    @PathParam("cluster") String cluster, 
@PathParam("namespace") String namespace,
+                                    boolean enableDeduplication) {
         validateNamespaceName(property, cluster, namespace);
-        internalModifyDeduplication(enableDeduplication);
+        internalModifyDeduplicationAsync(enableDeduplication)
+                .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))

Review Comment:
   From the logs, we can see that before this patch, the response HTTP code is 
204.
   
   ```
   2022-05-16T08:47:05,257 - INFO  - 
[pulsar-web-30-16:Slf4jRequestLogWriter@62] - 127.0.0.1 - - 
[16/May/2022:08:47:05 +0800] "POST 
/admin/v2/namespaces/my-property/my-ns/deduplication HTTP/1.1" 204 0 "-" 
"Pulsar-Java-v2.11.0-SNAPSHOT" 6
   ```
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java:
##########
@@ -411,32 +411,53 @@ public void 
removeSubscriptionExpirationTime(@PathParam("tenant") String tenant,
     @ApiOperation(value = "Get broker side deduplication for all topics in a 
namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist") })
-    public Boolean getDeduplication(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace) {
+    public void getDeduplication(@Suspended AsyncResponse asyncResponse, 
@PathParam("tenant") String tenant,
+                                 @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetDeduplication();
+        internalGetDeduplicationAsync()
+                .thenAccept(deduplication -> 
asyncResponse.resume(deduplication))
+                .exceptionally(ex -> {
+                    log.error("Fail get broker deduplication config for 
namespace {}", namespace, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
     @Path("/{tenant}/{namespace}/deduplication")
     @ApiOperation(value = "Enable or disable broker side deduplication for all 
topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist") })
-    public void modifyDeduplication(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace,
+    public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, 
@PathParam("tenant") String tenant,
+                                    @PathParam("namespace") String namespace,
                                     @ApiParam(value = "Flag for disabling or 
enabling broker side deduplication "
                                             + "for all topics in the specified 
namespace", required = true)
                                             boolean enableDeduplication) {
         validateNamespaceName(tenant, namespace);
-        internalModifyDeduplication(enableDeduplication);
+        internalModifyDeduplicationAsync(enableDeduplication)
+                .thenAccept(__ -> asyncResponse.resume(Response.ok().build()))

Review Comment:
   ```suggestion
                   .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to