This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 15b655cfea3 [improve][sec] Align some namespace level policy 
authorisation check (#21640)
15b655cfea3 is described below

commit 15b655cfea381c63e754bc8f066c311b097198bb
Author: Qiang Zhao <[email protected]>
AuthorDate: Mon Dec 4 22:15:19 2023 +0800

    [improve][sec] Align some namespace level policy authorisation check 
(#21640)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 30 +++++++++++++---------
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  3 ++-
 2 files changed, 20 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index c5174991298..caaff010439 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1178,7 +1178,8 @@ public abstract class NamespacesBase extends 
AdminResource {
 
     protected CompletableFuture<Void> internalSetPublishRateAsync(PublishRate 
maxPublishMessageRate) {
         log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), 
namespaceName, maxPublishMessageRate);
-        return validateSuperUserAccessAsync().thenCompose(__ -> 
updatePoliciesAsync(namespaceName, policies -> {
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.RATE, PolicyOperation.WRITE)
+                .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
             
policies.publishMaxMessageRate.put(pulsar().getConfiguration().getClusterName(),
 maxPublishMessageRate);
             log.info("[{}] Successfully updated the publish_max_message_rate 
for cluster on namespace {}",
                     clientAppId(), namespaceName);
@@ -1207,7 +1208,8 @@ public abstract class NamespacesBase extends 
AdminResource {
 
     protected CompletableFuture<Void> internalRemovePublishRateAsync() {
         log.info("[{}] Remove namespace publish-rate {}/{}", clientAppId(), 
namespaceName, topicName);
-        return validateSuperUserAccessAsync().thenCompose(__ -> 
updatePoliciesAsync(namespaceName, policies -> {
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.RATE, PolicyOperation.WRITE)
+                .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
             if (policies.publishMaxMessageRate != null) {
                 
policies.publishMaxMessageRate.remove(pulsar().getConfiguration().getClusterName());
             }
@@ -1227,7 +1229,8 @@ public abstract class NamespacesBase extends 
AdminResource {
     @SuppressWarnings("deprecation")
     protected CompletableFuture<Void> 
internalSetTopicDispatchRateAsync(DispatchRateImpl dispatchRate) {
         log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), 
namespaceName, dispatchRate);
-        return validateSuperUserAccessAsync().thenCompose(__ -> 
updatePoliciesAsync(namespaceName, policies -> {
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.RATE, PolicyOperation.WRITE)
+                .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
             
policies.topicDispatchRate.put(pulsar().getConfiguration().getClusterName(), 
dispatchRate);
             
policies.clusterDispatchRate.put(pulsar().getConfiguration().getClusterName(), 
dispatchRate);
             log.info("[{}] Successfully updated the dispatchRate for cluster 
on namespace {}", clientAppId(),
@@ -1237,7 +1240,8 @@ public abstract class NamespacesBase extends 
AdminResource {
     }
 
     protected CompletableFuture<Void> internalDeleteTopicDispatchRateAsync() {
-        return validateSuperUserAccessAsync().thenCompose(__ -> 
updatePoliciesAsync(namespaceName, policies -> {
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.RATE, PolicyOperation.WRITE)
+                .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
             
policies.topicDispatchRate.remove(pulsar().getConfiguration().getClusterName());
             
policies.clusterDispatchRate.remove(pulsar().getConfiguration().getClusterName());
             log.info("[{}] Successfully delete the dispatchRate for cluster on 
namespace {}", clientAppId(),
@@ -1254,7 +1258,7 @@ public abstract class NamespacesBase extends 
AdminResource {
     }
 
     protected CompletableFuture<Void> 
internalSetSubscriptionDispatchRateAsync(DispatchRateImpl dispatchRate) {
-        return validateSuperUserAccessAsync()
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.RATE, PolicyOperation.WRITE)
                 .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
                     
policies.subscriptionDispatchRate.put(pulsar().getConfiguration().getClusterName(),
 dispatchRate);
                     log.info("[{}] Successfully updated the 
subscriptionDispatchRate for cluster on namespace {}",
@@ -1264,7 +1268,7 @@ public abstract class NamespacesBase extends 
AdminResource {
     }
 
     protected CompletableFuture<Void> 
internalDeleteSubscriptionDispatchRateAsync() {
-        return validateSuperUserAccessAsync()
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.RATE, PolicyOperation.WRITE)
                 .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
                     
policies.subscriptionDispatchRate.remove(pulsar().getConfiguration().getClusterName());
                     log.info("[{}] Successfully delete the 
subscriptionDispatchRate for cluster on namespace {}",
@@ -1282,7 +1286,8 @@ public abstract class NamespacesBase extends 
AdminResource {
 
     protected CompletableFuture<Void> 
internalSetSubscribeRateAsync(SubscribeRate subscribeRate) {
         log.info("[{}] Set namespace subscribe-rate {}/{}", clientAppId(), 
namespaceName, subscribeRate);
-        return validateSuperUserAccessAsync().thenCompose(__ -> 
updatePoliciesAsync(namespaceName, policies -> {
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.RATE, PolicyOperation.WRITE)
+                .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
             
policies.clusterSubscribeRate.put(pulsar().getConfiguration().getClusterName(), 
subscribeRate);
             log.info("[{}] Successfully updated the subscribeRate for cluster 
on namespace {}", clientAppId(),
                     namespaceName);
@@ -1291,7 +1296,8 @@ public abstract class NamespacesBase extends 
AdminResource {
     }
 
     protected CompletableFuture<Void> internalDeleteSubscribeRateAsync() {
-        return validateSuperUserAccessAsync().thenCompose(__ -> 
updatePoliciesAsync(namespaceName, policies -> {
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.RATE, PolicyOperation.WRITE)
+                .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
             
policies.clusterSubscribeRate.remove(pulsar().getConfiguration().getClusterName());
             log.info("[{}] Successfully delete the subscribeRate for cluster 
on namespace {}", clientAppId(),
                     namespaceName);
@@ -1624,7 +1630,7 @@ public abstract class NamespacesBase extends 
AdminResource {
     }
 
     protected void internalSetInactiveTopic(InactiveTopicPolicies 
inactiveTopicPolicies) {
-        validateSuperUserAccess();
+        validateNamespacePolicyOperation(namespaceName, 
PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
         internalSetPolicies("inactive_topic_policies", inactiveTopicPolicies);
     }
@@ -2010,7 +2016,7 @@ public abstract class NamespacesBase extends 
AdminResource {
     }
 
     protected void internalSetMaxSubscriptionsPerTopic(Integer 
maxSubscriptionsPerTopic){
-        validateSuperUserAccess();
+        validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
         if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) {
             throw new RestException(Status.PRECONDITION_FAILED,
@@ -2518,7 +2524,7 @@ public abstract class NamespacesBase extends 
AdminResource {
      * Notion: don't re-use this logic.
      */
     protected void internalSetReplicatorDispatchRate(AsyncResponse 
asyncResponse, DispatchRateImpl dispatchRate) {
-        validateSuperUserAccessAsync()
+        validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
                 .thenAccept(__ -> {
                     log.info("[{}] Set namespace replicator dispatch-rate 
{}/{}",
                             clientAppId(), namespaceName, dispatchRate);
@@ -2563,7 +2569,7 @@ public abstract class NamespacesBase extends 
AdminResource {
      * Notion: don't re-use this logic.
      */
     protected void internalRemoveReplicatorDispatchRate(AsyncResponse 
asyncResponse) {
-        validateSuperUserAccessAsync()
+        validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
                 .thenCompose(__ -> 
namespaceResources().setPoliciesAsync(namespaceName, policies -> {
                     String clusterName = 
pulsar().getConfiguration().getClusterName();
                     policies.replicatorDispatchRate.remove(clusterName);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 8c2195a9b9b..1e4ac7d9f5f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -466,7 +466,8 @@ public class Namespaces extends NamespacesBase {
                                                  @PathParam("tenant") String 
tenant,
                                                  @PathParam("namespace") 
String namespace) {
         validateNamespaceName(tenant, namespace);
-        validateAdminAccessForTenantAsync(tenant)
+        validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.SUBSCRIPTION_EXPIRATION_TIME,
+                PolicyOperation.READ)
                 .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
                 .thenAccept(policies -> 
asyncResponse.resume(policies.subscription_expiration_time_minutes))
                 .exceptionally(ex -> {

Reply via email to