This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 441fd68d8a5753682252cad29264f771e4cd1f34
Author: Qiang Zhao <[email protected]>
AuthorDate: Wed Mar 6 22:00:45 2024 +0800

    [improve][broker] Consistently add fine-grain authorization to REST API 
(#22202)
    
    (cherry picked from commit 68c10925df43769eee7265b4af0ac8ee4913e715)
---
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 223 +++++++++------
 .../broker/admin/TopicPoliciesAuthZTest.java       | 308 +++++++++++++++++++++
 2 files changed, 454 insertions(+), 77 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index d4690498de1..337a806d920 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -67,6 +67,7 @@ import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
@@ -388,7 +389,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, 
PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetOffloadPolicies(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -411,7 +413,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Offload policies for the specified topic") 
OffloadPoliciesImpl offloadPolicies) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetOffloadPolicies(offloadPolicies, 
isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -433,7 +436,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetOffloadPolicies(null, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -457,7 +461,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, 
PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> 
internalGetMaxUnackedMessagesOnConsumer(applied, isGlobal))
             .thenApply(asyncResponse::resume).exceptionally(ex -> {
                 handleTopicPolicyException("getMaxUnackedMessagesOnConsumer", 
ex, asyncResponse);
@@ -481,7 +486,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Max unacked messages on consumer policies for 
the specified topic")
                     Integer maxUnackedNum) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> 
internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -503,7 +509,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(null, 
isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -526,7 +533,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, 
isGlobal))
             .thenAccept(op -> {
                 TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
@@ -554,7 +562,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> 
internalSetDeduplicationSnapshotInterval(interval, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -576,7 +585,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetDeduplicationSnapshotInterval(null, 
isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -600,7 +610,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.INACTIVE_TOPIC, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetInactiveTopicPolicies(applied, 
isGlobal))
             .thenApply(asyncResponse::resume).exceptionally(ex -> {
                 handleTopicPolicyException("getInactiveTopicPolicies", ex, 
asyncResponse);
@@ -623,7 +634,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "inactive topic policies for the specified 
topic")
             InactiveTopicPolicies inactiveTopicPolicies) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> 
internalSetInactiveTopicPolicies(inactiveTopicPolicies, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -645,7 +657,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetInactiveTopicPolicies(null, 
isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -669,7 +682,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, 
PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> 
internalGetMaxUnackedMessagesOnSubscription(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -744,7 +758,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.DELAYED_DELIVERY, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetDelayedDeliveryPolicies(applied, 
isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -1847,16 +1862,17 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalExamineMessageAsync(initialPosition, messagePosition, 
authoritative)
-                .thenAccept(asyncResponse::resume)
-                .exceptionally(ex -> {
-                    if (isNot307And404Exception(ex)) {
-                        log.error("[{}] Failed to examine a specific message 
on the topic {}", clientAppId(), topicName,
-                                ex);
-                    }
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
-                });
+        validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES)
+            .thenCompose(__ -> internalExamineMessageAsync(initialPosition, 
messagePosition, authoritative))
+            .thenAccept(asyncResponse::resume)
+            .exceptionally(ex -> {
+                if (isNot307And404Exception(ex)) {
+                    log.error("[{}] Failed to examine a specific message on 
the topic {}", clientAppId(), topicName,
+                            ex);
+                }
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
     }
 
     @GET
@@ -1962,7 +1978,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalGetBacklogAsync(authoritative)
+        validateTopicOperationAsync(topicName, TopicOperation.GET_BACKLOG_SIZE)
+                .thenCompose(__ ->  internalGetBacklogAsync(authoritative))
                 .thenAccept(asyncResponse::resume)
                 .exceptionally(ex -> {
                     Throwable t = FutureUtil.unwrapCompletionException(ex);
@@ -2018,7 +2035,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.BACKLOG, 
PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetBacklogQuota(applied, isGlobal))
             .thenAccept(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -2096,7 +2114,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                       + "For internal use.")
                               @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, 
PolicyOperation.READ)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
                 .thenAccept(op -> {
                     
asyncResponse.resume(op.map(TopicPolicies::getReplicationClustersSet).orElseGet(()
 -> {
@@ -2178,7 +2197,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.TTL, 
PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, 
isGlobal))
             .thenAccept(op -> asyncResponse.resume(op
                 .map(TopicPolicies::getMessageTTLInSeconds)
@@ -2215,7 +2235,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.TTL, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetMessageTTL(messageTTL, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -2242,7 +2263,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.TTL, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetMessageTTL(null, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -2268,7 +2290,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION, 
PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetDeduplication(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -2295,7 +2318,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "DeduplicationEnabled policies for the specified 
topic")
                     Boolean enabled) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetDeduplication(enabled, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -2320,7 +2344,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetDeduplication(null, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -2444,7 +2469,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.PERSISTENCE, 
PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetPersistence(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -2472,7 +2498,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Bookkeeper persistence policies for specified 
topic")
             PersistencePolicies persistencePolicies) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.PERSISTENCE, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetPersistence(persistencePolicies, 
isGlobal))
             .thenRun(() -> {
                 try {
@@ -2508,7 +2535,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.PERSISTENCE, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalRemovePersistence(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove persistence policies: 
namespace={}, topic={}",
@@ -2539,7 +2567,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetMaxSubscriptionsPerTopic(isGlobal))
             .thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
                     : Response.noContent().build()))
@@ -2567,7 +2596,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @ApiParam(value = "The max subscriptions of the topic") int 
maxSubscriptionsPerTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> 
internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic, isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully updated maxSubscriptionsPerTopic: 
namespace={}, topic={}"
@@ -2597,7 +2627,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(null, 
isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove maxSubscriptionsPerTopic: 
namespace={}, topic={}",
@@ -2627,7 +2658,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.REPLICATION_RATE, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetReplicatorDispatchRate(applied, 
isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -2654,7 +2686,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @ApiParam(value = "Replicator dispatch rate of the topic") 
DispatchRateImpl dispatchRate) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetReplicatorDispatchRate(dispatchRate, 
isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully updated replicatorDispatchRate: 
namespace={}, topic={}"
@@ -2684,7 +2717,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetReplicatorDispatchRate(null, 
isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove replicatorDispatchRate 
limit: namespace={}, topic={}",
@@ -2714,7 +2748,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS, 
PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetMaxProducers(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -2741,7 +2776,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "The max producers of the topic") int 
maxProducers) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetMaxProducers(maxProducers, isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully updated max producers: 
namespace={}, topic={}, maxProducers={}",
@@ -2773,7 +2809,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalRemoveMaxProducers(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove max producers: 
namespace={}, topic={}",
@@ -2805,7 +2842,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, 
PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetMaxConsumers(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -2832,7 +2870,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @ApiParam(value = "The max consumers of the topic") int 
maxConsumers) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetMaxConsumers(maxConsumers, isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully updated max consumers: 
namespace={}, topic={}, maxConsumers={}",
@@ -2864,7 +2903,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalRemoveMaxConsumers(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove max consumers: 
namespace={}, topic={}",
@@ -2895,7 +2935,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateAdminAccessForTenantAsync(topicName.getTenant())
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetMaxMessageSize(isGlobal))
             .thenAccept(policies -> {
                 asyncResponse.resume(policies.isPresent() ? policies.get() : 
Response.noContent().build());
@@ -2924,7 +2965,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @ApiParam(value = "The max message size of the topic") int 
maxMessageSize) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateAdminAccessForTenantAsync(topicName.getTenant())
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetMaxMessageSize(maxMessageSize, 
isGlobal))
             .thenRun(() -> {
                 log.info(
@@ -2958,7 +3000,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateAdminAccessForTenantAsync(topicName.getTenant())
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetMaxMessageSize(null, isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove max message size: 
namespace={}, topic={}",
@@ -3228,7 +3271,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, 
PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetDispatchRate(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -3254,7 +3298,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Dispatch rate for the specified topic") 
DispatchRateImpl dispatchRate) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetDispatchRate(dispatchRate, isGlobal))
             .thenRun(() -> {
                 try {
@@ -3290,7 +3335,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalRemoveDispatchRate(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove topic dispatch rate: 
tenant={}, namespace={}, topic={}",
@@ -3323,7 +3369,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, 
PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetSubscriptionDispatchRate(applied, 
isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -3351,7 +3398,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Subscription message dispatch rate for the 
specified topic")
                     DispatchRateImpl dispatchRate) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> 
internalSetSubscriptionDispatchRate(dispatchRate, isGlobal))
             .thenRun(() -> {
                 try {
@@ -3387,7 +3435,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> 
internalRemoveSubscriptionDispatchRate(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove topic subscription dispatch 
rate: tenant={}, namespace={}, topic={}",
@@ -3421,7 +3470,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, 
PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetSubscriptionLevelDispatchRate(
                     Codec.decode(encodedSubscriptionName), applied, isGlobal))
             .thenApply(asyncResponse::resume)
@@ -3451,7 +3501,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Subscription message dispatch rate for the 
specified topic")
                     DispatchRateImpl dispatchRate) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetSubscriptionLevelDispatchRate(
                     Codec.decode(encodedSubscriptionName), dispatchRate, 
isGlobal))
             .thenRun(() -> {
@@ -3489,7 +3540,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalRemoveSubscriptionLevelDispatchRate(
                     Codec.decode(encodedSubscriptionName), isGlobal))
             .thenRun(() -> {
@@ -3521,7 +3573,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.COMPACTION, 
PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetCompactionThreshold(applied, 
isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -3547,7 +3600,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Dispatch rate for the specified topic") long 
compactionThreshold) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.COMPACTION, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> 
internalSetCompactionThreshold(compactionThreshold, isGlobal))
             .thenRun(() -> {
                 try {
@@ -3583,7 +3637,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.COMPACTION, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalRemoveCompactionThreshold(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove topic compaction threshold: 
tenant={}, namespace={}, topic={}",
@@ -3615,7 +3670,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, 
PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> 
internalGetMaxConsumersPerSubscription(isGlobal))
             .thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
                     : Response.noContent().build()))
@@ -3643,7 +3699,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @ApiParam(value = "Dispatch rate for the specified topic") int 
maxConsumersPerSubscription) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> 
internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription, isGlobal))
             .thenRun(() -> {
                 try {
@@ -3679,7 +3736,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> 
internalRemoveMaxConsumersPerSubscription(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove topic max consumers per 
subscription:"
@@ -3712,7 +3770,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, 
PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetPublishRate(isGlobal))
             .thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
                     : Response.noContent().build()))
@@ -3739,7 +3798,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @ApiParam(value = "Dispatch rate for the specified topic") 
PublishRate publishRate) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetPublishRate(publishRate, isGlobal))
             .thenRun(() -> {
                 try {
@@ -3776,7 +3836,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalRemovePublishRate(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove topic publish rate: 
tenant={}, namespace={}, topic={}, isGlobal={}",
@@ -3809,7 +3870,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.READ)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetSubscriptionTypesEnabled(isGlobal))
             .thenAccept(op -> {
                 asyncResponse.resume(op.isPresent() ? op.get()
@@ -3839,7 +3901,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Enable sub types for the specified topic")
             Set<SubscriptionType> subscriptionTypesEnabled) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> 
internalSetSubscriptionTypesEnabled(subscriptionTypesEnabled, isGlobal))
             .thenRun(() -> {
                 try {
@@ -3875,7 +3938,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> 
internalRemoveSubscriptionTypesEnabled(isGlobal))
                 .thenRun(() -> {
                     log.info("[{}] Successfully remove subscription types 
enabled: namespace={}, topic={}",
@@ -3907,7 +3971,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, 
PolicyOperation.READ)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> internalGetSubscribeRate(applied, isGlobal))
                 .thenApply(asyncResponse::resume).exceptionally(ex -> {
             handleTopicPolicyException("getSubscribeRate", ex, asyncResponse);
@@ -3933,7 +3998,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @ApiParam(value = "Subscribe rate for the specified topic") 
SubscribeRate subscribeRate) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetSubscribeRate(subscribeRate, 
isGlobal))
             .thenRun(() -> {
                 try {
@@ -3971,7 +4037,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @ApiParam(value = "Subscribe rate for the specified topic") 
SubscribeRate subscribeRate) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalRemoveSubscribeRate(isGlobal))
             .thenRun(() -> {
                 log.info(
@@ -4215,7 +4282,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                                     + "broker. For internal 
use.")
                                             @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> 
internalGetSchemaValidationEnforced(applied))
                 .thenAccept(asyncResponse::resume)
                 .exceptionally(ex -> {
@@ -4242,7 +4310,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                             @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative,
                                             @ApiParam(required = true) boolean 
schemaValidationEnforced) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> 
internalSetSchemaValidationEnforced(schemaValidationEnforced))
                 .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
                 .exceptionally(ex -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
index b1a1f5bebc8..5966fe81e44 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
@@ -38,6 +38,8 @@ import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -217,4 +219,310 @@ public final class TopicPoliciesAuthZTest extends 
MockedPulsarServiceBaseTest {
             
superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", 
subject);
         }
     }
+    @SneakyThrows
+    @Test
+    public void testOffloadPolicy() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsar().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+
+        // mocked data
+        final OffloadPoliciesImpl definedOffloadPolicies = new 
OffloadPoliciesImpl();
+        definedOffloadPolicies.setManagedLedgerOffloadThresholdInBytes(100L);
+        
definedOffloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(200L);
+        definedOffloadPolicies.setManagedLedgerOffloadDriver("s3");
+        definedOffloadPolicies.setManagedLedgerOffloadBucket("buck");
+
+        // test superuser
+        superUserAdmin.topicPolicies().setOffloadPolicies(topic, 
definedOffloadPolicies);
+
+        // because the topic policies is eventual consistency, we should wait 
here
+        await().untilAsserted(() -> {
+            final OffloadPolicies offloadPolicy = 
superUserAdmin.topicPolicies().getOffloadPolicies(topic);
+            Assert.assertEquals(offloadPolicy, definedOffloadPolicies);
+        });
+        superUserAdmin.topicPolicies().removeOffloadPolicies(topic);
+
+        await().untilAsserted(() -> {
+            final OffloadPolicies offloadPolicy = 
superUserAdmin.topicPolicies().getOffloadPolicies(topic);
+            Assert.assertNull(offloadPolicy);
+        });
+
+        // test tenant manager
+
+        tenantManagerAdmin.topicPolicies().setOffloadPolicies(topic, 
definedOffloadPolicies);
+        await().untilAsserted(() -> {
+            final OffloadPolicies offloadPolicy = 
tenantManagerAdmin.topicPolicies().getOffloadPolicies(topic);
+            Assert.assertEquals(offloadPolicy, definedOffloadPolicies);
+        });
+        tenantManagerAdmin.topicPolicies().removeOffloadPolicies(topic);
+        await().untilAsserted(() -> {
+            final OffloadPolicies offloadPolicy = 
tenantManagerAdmin.topicPolicies().getOffloadPolicies(topic);
+            Assert.assertNull(offloadPolicy);
+        });
+
+        // test nobody
+
+        try {
+            subAdmin.topicPolicies().getOffloadPolicies(topic);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+        }
+
+        try {
+
+            subAdmin.topicPolicies().setOffloadPolicies(topic, 
definedOffloadPolicies);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+        }
+
+        try {
+            subAdmin.topicPolicies().removeOffloadPolicies(topic);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+        }
+
+        // test sub user with permissions
+        for (AuthAction action : AuthAction.values()) {
+            
superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+                    subject, Set.of(action));
+            try {
+                subAdmin.topicPolicies().getOffloadPolicies(topic);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+            }
+
+            try {
+
+                subAdmin.topicPolicies().setOffloadPolicies(topic, 
definedOffloadPolicies);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+            }
+
+            try {
+                subAdmin.topicPolicies().removeOffloadPolicies(topic);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+            }
+            
superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", 
subject);
+        }
+    }
+
+    @SneakyThrows
+    @Test
+    public void testMaxUnackedMessagesOnConsumer() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsar().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+
+        // mocked data
+        int definedUnackedMessagesOnConsumer = 100;
+
+        // test superuser
+        superUserAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, 
definedUnackedMessagesOnConsumer);
+
+        // because the topic policies is eventual consistency, we should wait 
here
+        await().untilAsserted(() -> {
+            final int unackedMessagesOnConsumer = 
superUserAdmin.topicPolicies()
+                    .getMaxUnackedMessagesOnConsumer(topic);
+            Assert.assertEquals(unackedMessagesOnConsumer, 
definedUnackedMessagesOnConsumer);
+        });
+        
superUserAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
+
+        await().untilAsserted(() -> {
+            final Integer unackedMessagesOnConsumer = 
superUserAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+            Assert.assertNull(unackedMessagesOnConsumer);
+        });
+
+        // test tenant manager
+
+        
tenantManagerAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, 
definedUnackedMessagesOnConsumer);
+        await().untilAsserted(() -> {
+            final int unackedMessagesOnConsumer = 
tenantManagerAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+            Assert.assertEquals(unackedMessagesOnConsumer, 
definedUnackedMessagesOnConsumer);
+        });
+        
tenantManagerAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
+        await().untilAsserted(() -> {
+            final Integer unackedMessagesOnConsumer = 
tenantManagerAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+            Assert.assertNull(unackedMessagesOnConsumer);
+        });
+
+        // test nobody
+
+        try {
+            subAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+        }
+
+        try {
+
+            subAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, 
definedUnackedMessagesOnConsumer);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+        }
+
+        try {
+            subAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+        }
+
+        // test sub user with permissions
+        for (AuthAction action : AuthAction.values()) {
+            
superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+                    subject, Set.of(action));
+            try {
+                
subAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+            }
+
+            try {
+
+                
subAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, 
definedUnackedMessagesOnConsumer);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+            }
+
+            try {
+                
subAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+            }
+            
superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", 
subject);
+        }
+    }
+
+    @SneakyThrows
+    @Test
+    public void testMaxUnackedMessagesOnSubscription() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsar().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+
+        // mocked data
+        int definedUnackedMessagesOnConsumer = 100;
+
+        // test superuser
+        
superUserAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, 
definedUnackedMessagesOnConsumer);
+
+        // because the topic policies is eventual consistency, we should wait 
here
+        await().untilAsserted(() -> {
+            final int unackedMessagesOnConsumer = 
superUserAdmin.topicPolicies()
+                    .getMaxUnackedMessagesOnSubscription(topic);
+            Assert.assertEquals(unackedMessagesOnConsumer, 
definedUnackedMessagesOnConsumer);
+        });
+        
superUserAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
+
+        await().untilAsserted(() -> {
+            final Integer unackedMessagesOnConsumer = 
superUserAdmin.topicPolicies()
+                    .getMaxUnackedMessagesOnSubscription(topic);
+            Assert.assertNull(unackedMessagesOnConsumer);
+        });
+
+        // test tenant manager
+
+        
tenantManagerAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, 
definedUnackedMessagesOnConsumer);
+        await().untilAsserted(() -> {
+            final int unackedMessagesOnConsumer = 
tenantManagerAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic);
+            Assert.assertEquals(unackedMessagesOnConsumer, 
definedUnackedMessagesOnConsumer);
+        });
+        
tenantManagerAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
+        await().untilAsserted(() -> {
+            final Integer unackedMessagesOnConsumer = 
tenantManagerAdmin.topicPolicies()
+                    .getMaxUnackedMessagesOnSubscription(topic);
+            Assert.assertNull(unackedMessagesOnConsumer);
+        });
+
+        // test nobody
+
+        try {
+            
subAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+        }
+
+        try {
+
+            
subAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, 
definedUnackedMessagesOnConsumer);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+        }
+
+        try {
+            
subAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+        }
+
+        // test sub user with permissions
+        for (AuthAction action : AuthAction.values()) {
+            
superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+                    subject, Set.of(action));
+            try {
+                
subAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+            }
+
+            try {
+
+                
subAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, 
definedUnackedMessagesOnConsumer);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+            }
+
+            try {
+                
subAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+            }
+            
superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", 
subject);
+        }
+
+    }
 }

Reply via email to