zhanghaou commented on a change in pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747#discussion_r469820223



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2156,6 +2158,67 @@ protected void internalRemoveBacklogQuota(AsyncResponse 
asyncResponse,
         internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
     }
 
+    protected void internalGetRetention(AsyncResponse asyncResponse){
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();

Review comment:
       checkTopicLevelPolicyEnable() is also called by getTopicPolicies(), so 
we can delete it here.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2156,6 +2158,67 @@ protected void internalRemoveBacklogQuota(AsyncResponse 
asyncResponse,
         internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
     }
 
+    protected void internalGetRetention(AsyncResponse asyncResponse){
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        Optional<RetentionPolicies> retention = getTopicPolicies(topicName)
+                .map(TopicPolicies::getRetentionPolicies);
+        if (!retention.isPresent()) {
+            asyncResponse.resume(Response.noContent().build());
+        }else {
+            asyncResponse.resume(retention.get());
+        }
+    }
+
+    protected CompletableFuture<Void> internalSetRetention(RetentionPolicies 
retention) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();

Review comment:
       Same to the above.

##########
File path: 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -1476,4 +1477,142 @@ void createSubscription(String topic, String 
subscriptionName, MessageId message
      *             Unexpected error
      */
     void removeMessageTTL(String topic) throws PulsarAdminException;
+
+    /**
+     * Set the retention configuration on a topic.
+     * <p/>
+     * Set the retention configuration on a topic. This operation requires 
Pulsar super-user access.
+     * <p/>
+     * Request parameter example:
+     * <p/>
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "retentionTimeInMinutes" : 60,            // how long to retain 
messages
+     *     "retentionSizeInMB" : 1024,              // retention backlog limit
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Topic does not exist
+     * @throws ConflictException
+     *             Concurrent modification
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setRetention(String topic, RetentionPolicies retention) throws 
PulsarAdminException;
+
+    /**
+     * Set the retention configuration for all the topics on a topic 
asynchronously.

Review comment:
       Need to modify this doc.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2156,6 +2158,67 @@ protected void internalRemoveBacklogQuota(AsyncResponse 
asyncResponse,
         internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
     }
 
+    protected void internalGetRetention(AsyncResponse asyncResponse){
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        Optional<RetentionPolicies> retention = getTopicPolicies(topicName)
+                .map(TopicPolicies::getRetentionPolicies);
+        if (!retention.isPresent()) {
+            asyncResponse.resume(Response.noContent().build());
+        }else {
+            asyncResponse.resume(retention.get());
+        }
+    }
+
+    protected CompletableFuture<Void> internalSetRetention(RetentionPolicies 
retention) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        if (retention == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        TopicPolicies topicPolicies = getTopicPolicies(topicName)
+                .orElseGet(TopicPolicies::new);
+        BacklogQuota backlogQuota =
+                    
topicPolicies.getBackLogQuotaMap().get(BacklogQuota.BacklogQuotaType.destination_storage.name());
+        if (backlogQuota == null){
+            Policies policies = 
getNamespacePolicies(topicName.getNamespaceObject());
+            backlogQuota = 
policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
+        }
+        if(!checkBacklogQuota(backlogQuota, retention)){
+            log.warn(
+                    "[{}] Failed to update retention quota configuration for 
topic {}: conflicts with retention quota",
+                    clientAppId(), topicName);
+            throw new RestException(Status.PRECONDITION_FAILED,
+                    "Retention Quota must exceed configured backlog quota for 
topic. " +
+                            "Please increase retention quota and retry");
+        }
+        topicPolicies.setRetentionPolicies(retention);
+        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+    }
+
+    protected CompletableFuture<Void> internalRemoveRetention() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();

Review comment:
       Same to the above.

##########
File path: 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -1476,4 +1477,142 @@ void createSubscription(String topic, String 
subscriptionName, MessageId message
      *             Unexpected error
      */
     void removeMessageTTL(String topic) throws PulsarAdminException;
+
+    /**
+     * Set the retention configuration on a topic.
+     * <p/>
+     * Set the retention configuration on a topic. This operation requires 
Pulsar super-user access.
+     * <p/>
+     * Request parameter example:
+     * <p/>
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "retentionTimeInMinutes" : 60,            // how long to retain 
messages
+     *     "retentionSizeInMB" : 1024,              // retention backlog limit
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Topic does not exist
+     * @throws ConflictException
+     *             Concurrent modification
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setRetention(String topic, RetentionPolicies retention) throws 
PulsarAdminException;
+
+    /**
+     * Set the retention configuration for all the topics on a topic 
asynchronously.
+     * <p/>
+     * Set the retention configuration on a topic. This operation requires 
Pulsar super-user access.
+     * <p/>
+     * Request parameter example:
+     * <p/>
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "retentionTimeInMinutes" : 60,            // how long to retain 
messages
+     *     "retentionSizeInMB" : 1024,              // retention backlog limit
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     */
+    CompletableFuture<Void> setRetentionAsync(String topic, RetentionPolicies 
retention);
+
+    /**
+     * Get the retention configuration for a topic.
+     * <p/>
+     * Get the retention configuration for a topic.
+     * <p/>
+     * Response example:
+     * <p/>
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "retentionTimeInMinutes" : 60,            // how long to retain 
messages
+     *     "retentionSizeInMB" : 1024,              // retention backlog limit
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Topic does not exist
+     * @throws ConflictException
+     *             Concurrent modification
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    RetentionPolicies getRetention(String topic) throws PulsarAdminException;
+
+    /**
+     * Get the retention configuration for a topic asynchronously.
+     * <p/>
+     * Get the retention configuration for a topic.
+     * <p/>
+     *
+     * @param topic
+     *            Topic name
+     */
+    CompletableFuture<RetentionPolicies> getRetentionAsync(String topic);
+
+    /**
+     * Remove the retention configuration for all the topics on a topic.

Review comment:
       Need to modify this doc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to