codelipenghui commented on a change in pull request #7784:
URL: https://github.com/apache/pulsar/pull/7784#discussion_r467850364



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -577,6 +580,56 @@ protected void 
internalCreateMissedPartitions(AsyncResponse asyncResponse) {
         });
     }
 
+    protected TopicPolicies internalGetTopicPolicies(String properties) {
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new 
TopicPolicies());
+        Map result = new HashMap();
+        try {
+            List<String> propertyList = jsonMapper().readValue(properties, new 
TypeReference<List<String>>() {});
+            Map policiesMap = 
jsonMapper().readValue(jsonMapper().writeValueAsString(topicPolicies), 
Map.class);
+            propertyList.forEach(property -> {
+                if (policiesMap.get(property) != null) {
+                    result.put(property, policiesMap.get(property));
+                }
+            });
+            return 
jsonMapper().readValue(jsonMapper().writeValueAsBytes(result), 
TopicPolicies.class);
+        } catch (JsonProcessingException e) {
+            throw new RestException(Status.NOT_ACCEPTABLE, "incorrect policy 
properties");
+        } catch (IOException e) {
+            throw new RestException(Status.NOT_ACCEPTABLE, "serialize the 
result failed");
+        }
+    }
+
+    protected void internalSetTopicPolicies(AsyncResponse asyncResponse, 
Map<String, String> policies) {
+        TopicPolicies topicPolicies = null;
+        try {
+            topicPolicies = 
pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            log.error("Topic {} policies cache have not init.", topicName);
+            asyncResponse.resume(new RestException(e));
+        }
+        if (topicPolicies == null) {
+            topicPolicies = new TopicPolicies();
+        }
+        try {
+            Map oldPolicies = 
jsonMapper().readValue(jsonMapper().writeValueAsString(topicPolicies), 
Map.class);
+            oldPolicies.putAll(policies);
+            topicPolicies = 
jsonMapper().readValue(jsonMapper().writeValueAsString(oldPolicies), 
TopicPolicies.class);
+        } catch (JsonProcessingException e) {
+            log.error("incorrect policies map", e);
+            throw new RestException(Status.NOT_ACCEPTABLE, "incorrect policies 
map");

Review comment:
       Please complete the `asyncResponse` with exception

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -246,6 +246,41 @@ public void createNonPartitionedTopic(
         internalCreateNonPartitionedTopic(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/{policyProperties}/getTopicPolicies")
+    @ApiOperation(value = "Get delayed delivery messages config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
or topic doesn't exist"),
+            @ApiResponse(code = 500, message = "Internal server error"),})
+    public TopicPolicies getTopicPolicies(

Review comment:
       It's better to keep consistent with the delayed delivery setting of the 
namespace. 
   
   ```
       @GET
       @Path("/{tenant}/{namespace}/delayedDelivery")
       @ApiOperation(value = "Get delayed delivery messages config on a 
namespace.")
       @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
               @ApiResponse(code = 404, message = "Tenant or cluster or 
namespace doesn't exist"),
               @ApiResponse(code = 409, message = "Concurrent modification"), })
       public DelayedDeliveryPolicies 
getDelayedDeliveryPolicies(@PathParam("tenant") String tenant,
                                            @PathParam("namespace") String 
namespace) {
           validateNamespaceName(tenant, namespace);
           return internalGetDelayedDelivery();
       }
   
       @POST
       @Path("/{tenant}/{namespace}/delayedDelivery")
       @ApiOperation(value = "Set delayed delivery messages config on a 
namespace.")
       @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
               @ApiResponse(code = 404, message = "Tenant or cluster or 
namespace doesn't exist"), })
       public void setDelayedDeliveryPolicies(@PathParam("tenant") String 
tenant,
               @PathParam("namespace") String namespace,
               @ApiParam(value = "Delayed delivery policies for the specified 
namespace") DelayedDeliveryPolicies deliveryPolicies) {
           validateNamespaceName(tenant, namespace);
           internalSetDelayedDelivery(deliveryPolicies);
       }
   ```
   
   And we should also add deleteDelayedDeliveryPolicies method for clearing the 
topic level delayed delivery policy.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to