zymap commented on a change in pull request #8948:
URL: https://github.com/apache/pulsar/pull/8948#discussion_r542978255



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -1816,6 +1816,91 @@ public void removePersistence(@Suspended final 
AsyncResponse asyncResponse,
         });
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/maxSubscriptionsPerTopic")
+    @ApiOperation(value = "Get maxSubscriptionsPerTopic config for specified 
topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, to enable the 
topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getMaxSubscriptionsPerTopic(@Suspended final AsyncResponse 
asyncResponse,
+                                @PathParam("tenant") String tenant,
+                                @PathParam("namespace") String namespace,
+                                @PathParam("topic") @Encoded String 
encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        try {
+            Optional<Integer> maxSubscriptionsPerTopic = 
internalGetMaxSubscriptionsPerTopic();
+            if (!maxSubscriptionsPerTopic.isPresent()) {
+                asyncResponse.resume(Response.noContent().build());
+            } else {
+                asyncResponse.resume(maxSubscriptionsPerTopic.get());
+            }
+        } catch (RestException e) {
+            asyncResponse.resume(e);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/maxSubscriptionsPerTopic")
+    @ApiOperation(value = "Set maxSubscriptionsPerTopic config for specified 
topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, to enable the 
topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "Invalid value of 
maxSubscriptionsPerTopic")})
+    public void setMaxSubscriptionsPerTopic(@Suspended final AsyncResponse 
asyncResponse,
+                                @PathParam("tenant") String tenant,
+                                @PathParam("namespace") String namespace,
+                                @PathParam("topic") @Encoded String 
encodedTopic,
+                                @ApiParam(value = "The max subscriptions of 
the topic") int maxSubscriptionsPerTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        
internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic).whenComplete((r, 
ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Updating maxSubscriptionsPerTopic failed", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Updating maxSubscriptionsPerTopic failed", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                log.info("[{}] Successfully updated maxSubscriptionsPerTopic: 
namespace={}, topic={}"
+                                + ", maxSubscriptions={}"
+                        , clientAppId(), namespaceName, 
topicName.getLocalName(), maxSubscriptionsPerTopic);
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/maxSubscriptionsPerTopic")
+    @ApiOperation(value = "Remove maxSubscriptionsPerTopic config for 
specified topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, to enable the 
topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void removeMaxSubscriptionsPerTopic(@Suspended final AsyncResponse 
asyncResponse,
+                                   @PathParam("tenant") String tenant,
+                                   @PathParam("namespace") String namespace,
+                                   @PathParam("topic") @Encoded String 
encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalSetMaxSubscriptionsPerTopic(null).whenComplete((r, ex) -> {

Review comment:
       Is this ok to set the value as null? Can we avoid to do that?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to