This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b51b74883fb [improve][broker] Add fine-grain authorization to ns/topic 
management endpoints (#22309)
b51b74883fb is described below

commit b51b74883fb66673161d0b73c6a7257d073c57a5
Author: Jiwei Guo <techno...@apache.org>
AuthorDate: Wed Mar 20 20:37:32 2024 +0800

    [improve][broker] Add fine-grain authorization to ns/topic management 
endpoints (#22309)
---
 .../authorization/PulsarAuthorizationProvider.java |   1 +
 .../apache/pulsar/broker/admin/AdminResource.java  |   7 +-
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 166 +++++-----
 .../broker/admin/impl/PersistentTopicsBase.java    | 161 +++++-----
 .../pulsar/broker/admin/NamespaceAuthZTest.java    | 164 ++++++++++
 .../apache/pulsar/broker/admin/TopicAuthZTest.java | 346 +++++++++++++++++++++
 6 files changed, 680 insertions(+), 165 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 162c44bec38..9e9f2a446de 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -542,6 +542,7 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
                             case COMPACT:
                             case OFFLOAD:
                             case UNLOAD:
+                            case TRIM_TOPIC:
                             case DELETE_METADATA:
                             case UPDATE_METADATA:
                             case ADD_BUNDLE_RANGE:
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index e5806b7bec2..f048d650f8e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -59,8 +59,6 @@ import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.PolicyName;
-import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
@@ -710,10 +708,7 @@ public abstract class AdminResource extends 
PulsarWebResource {
     }
 
     protected CompletableFuture<SchemaCompatibilityStrategy> 
getSchemaCompatibilityStrategyAsync() {
-        return validateTopicPolicyOperationAsync(topicName,
-                PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
-                PolicyOperation.READ)
-                .thenCompose((__) -> 
getSchemaCompatibilityStrategyAsyncWithoutAuth()).whenComplete((__, ex) -> {
+        return 
getSchemaCompatibilityStrategyAsyncWithoutAuth().whenComplete((__, ex) -> {
                     if (ex != null) {
                         log.error("[{}] Failed to get schema compatibility 
strategy of topic {} {}",
                                 clientAppId(), topicName, ex);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 46b1712e7dd..72f5f1439d9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2378,102 +2378,110 @@ public abstract class NamespacesBase extends 
AdminResource {
    }
 
    protected void internalSetProperty(String key, String value, AsyncResponse 
asyncResponse) {
-       validatePoliciesReadOnlyAccess();
-       updatePoliciesAsync(namespaceName, policies -> {
-           policies.properties.put(key, value);
-           return policies;
-       }).thenAccept(v -> {
-           log.info("[{}] Successfully set property for key {} on namespace 
{}", clientAppId(), key,
-                   namespaceName);
-           asyncResponse.resume(Response.noContent().build());
-       }).exceptionally(ex -> {
-           Throwable cause = ex.getCause();
-           log.error("[{}] Failed to set property for key {} on namespace {}", 
clientAppId(), key,
-                   namespaceName, cause);
-           asyncResponse.resume(cause);
-           return null;
-       });
+       validateAdminAccessForTenantAsync(namespaceName.getTenant())
+               .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+               .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
+                   policies.properties.put(key, value);
+                   return policies;
+               }))
+               .thenAccept(v -> {
+                   log.info("[{}] Successfully set property for key {} on 
namespace {}", clientAppId(), key,
+                           namespaceName);
+                   asyncResponse.resume(Response.noContent().build());
+               }).exceptionally(ex -> {
+                   Throwable cause = ex.getCause();
+                   log.error("[{}] Failed to set property for key {} on 
namespace {}", clientAppId(), key,
+                           namespaceName, cause);
+                   asyncResponse.resume(cause);
+                   return null;
+               });
    }
 
    protected void internalSetProperties(Map<String, String> properties, 
AsyncResponse asyncResponse) {
-       validatePoliciesReadOnlyAccess();
-       updatePoliciesAsync(namespaceName, policies -> {
-           policies.properties.putAll(properties);
-           return policies;
-       }).thenAccept(v -> {
-           log.info("[{}] Successfully set {} properties on namespace {}", 
clientAppId(), properties.size(),
-                   namespaceName);
-           asyncResponse.resume(Response.noContent().build());
-       }).exceptionally(ex -> {
-           Throwable cause = ex.getCause();
-           log.error("[{}] Failed to set {} properties on namespace {}", 
clientAppId(), properties.size(),
-                   namespaceName, cause);
-           asyncResponse.resume(cause);
-           return null;
-       });
+       validateAdminAccessForTenantAsync(namespaceName.getTenant())
+               .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+               .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
+                   policies.properties.putAll(properties);
+                   return policies;
+               }))
+               .thenAccept(v -> {
+                   log.info("[{}] Successfully set {} properties on namespace 
{}", clientAppId(), properties.size(),
+                           namespaceName);
+                   asyncResponse.resume(Response.noContent().build());
+               }).exceptionally(ex -> {
+                   Throwable cause = ex.getCause();
+                   log.error("[{}] Failed to set {} properties on namespace 
{}", clientAppId(), properties.size(),
+                           namespaceName, cause);
+                   asyncResponse.resume(cause);
+                   return null;
+               });
    }
 
    protected void internalGetProperty(String key, AsyncResponse asyncResponse) 
{
-        getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
-            asyncResponse.resume(policies.properties.get(key));
-        }).exceptionally(ex -> {
-            Throwable cause = ex.getCause();
-            log.error("[{}] Failed to get property for key {} of namespace 
{}", clientAppId(), key,
-                    namespaceName, cause);
-            asyncResponse.resume(cause);
-            return null;
-        });
+       validateAdminAccessForTenantAsync(namespaceName.getTenant())
+               .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+               .thenAccept(policies -> 
asyncResponse.resume(policies.properties.get(key)))
+               .exceptionally(ex -> {
+                   Throwable cause = ex.getCause();
+                   log.error("[{}] Failed to get property for key {} of 
namespace {}", clientAppId(), key,
+                           namespaceName, cause);
+                   asyncResponse.resume(cause);
+                   return null;
+               });
    }
 
    protected void internalGetProperties(AsyncResponse asyncResponse) {
-       getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
-           asyncResponse.resume(policies.properties);
-       }).exceptionally(ex -> {
-           Throwable cause = ex.getCause();
-           log.error("[{}] Failed to get properties of namespace {}", 
clientAppId(), namespaceName, cause);
-           asyncResponse.resume(cause);
-           return null;
-       });
+       validateAdminAccessForTenantAsync(namespaceName.getTenant())
+               .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+               .thenAccept(policies -> 
asyncResponse.resume(policies.properties))
+               .exceptionally(ex -> {
+                   Throwable cause = ex.getCause();
+                   log.error("[{}] Failed to get properties of namespace {}", 
clientAppId(), namespaceName, cause);
+                   asyncResponse.resume(cause);
+                   return null;
+               });
    }
 
    protected void internalRemoveProperty(String key, AsyncResponse 
asyncResponse) {
-       validatePoliciesReadOnlyAccess();
-
        AtomicReference<String> oldVal = new AtomicReference<>(null);
-       updatePoliciesAsync(namespaceName, policies -> {
-           oldVal.set(policies.properties.remove(key));
-           return policies;
-       }).thenAccept(v -> {
-           asyncResponse.resume(oldVal.get());
-           log.info("[{}] Successfully remove property for key {} on namespace 
{}", clientAppId(), key,
-                   namespaceName);
-       }).exceptionally(ex -> {
-           Throwable cause = ex.getCause();
-           log.error("[{}] Failed to remove property for key {} on namespace 
{}", clientAppId(), key,
-                   namespaceName, cause);
-           asyncResponse.resume(cause);
-          return null;
-       });
+       validateAdminAccessForTenantAsync(namespaceName.getTenant())
+               .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+               .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
+                   oldVal.set(policies.properties.remove(key));
+                   return policies;
+               })).thenAccept(v -> {
+                   asyncResponse.resume(oldVal.get());
+                   log.info("[{}] Successfully remove property for key {} on 
namespace {}", clientAppId(), key,
+                           namespaceName);
+               }).exceptionally(ex -> {
+                   Throwable cause = ex.getCause();
+                   log.error("[{}] Failed to remove property for key {} on 
namespace {}", clientAppId(), key,
+                           namespaceName, cause);
+                   asyncResponse.resume(cause);
+                   return null;
+               });
    }
 
    protected void internalClearProperties(AsyncResponse asyncResponse) {
-       validatePoliciesReadOnlyAccess();
        AtomicReference<Integer> clearedCount = new AtomicReference<>(0);
-       updatePoliciesAsync(namespaceName, policies -> {
-           clearedCount.set(policies.properties.size());
-           policies.properties.clear();
-           return policies;
-       }).thenAccept(v -> {
-           asyncResponse.resume(Response.noContent().build());
-           log.info("[{}] Successfully clear {} properties on namespace {}", 
clientAppId(), clearedCount.get(),
-                   namespaceName);
-       }).exceptionally(ex -> {
-           Throwable cause = ex.getCause();
-           log.error("[{}] Failed to clear {} properties on namespace {}", 
clientAppId(), clearedCount.get(),
-                   namespaceName, cause);
-           asyncResponse.resume(cause);
-           return null;
-       });
+       validateAdminAccessForTenantAsync(namespaceName.getTenant())
+               .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+               .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
+                   clearedCount.set(policies.properties.size());
+                   policies.properties.clear();
+                   return policies;
+               }))
+               .thenAccept(v -> {
+                   asyncResponse.resume(Response.noContent().build());
+                   log.info("[{}] Successfully clear {} properties on 
namespace {}", clientAppId(), clearedCount.get(),
+                           namespaceName);
+               }).exceptionally(ex -> {
+                   Throwable cause = ex.getCause();
+                   log.error("[{}] Failed to clear {} properties on namespace 
{}", clientAppId(), clearedCount.get(),
+                           namespaceName, cause);
+                   asyncResponse.resume(cause);
+                   return null;
+               });
    }
 
    private CompletableFuture<Void> updatePoliciesAsync(NamespaceName ns, 
Function<Policies, Policies> updateFunction) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 0c523726335..9c5c369a7b4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -572,7 +572,9 @@ public class PersistentTopicsBase extends AdminResource {
     protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) 
{
         getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
             if (metadata != null) {
-                tryCreatePartitionsAsync(metadata.partitions).thenAccept(v -> {
+                CompletableFuture<Void> future = 
validateNamespaceOperationAsync(topicName.getNamespaceObject(),
+                        NamespaceOperation.CREATE_TOPIC);
+                future.thenCompose(__ -> 
tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> {
                     asyncResponse.resume(Response.noContent().build());
                 }).exceptionally(e -> {
                     log.error("[{}] Failed to create partitions for topic {}", 
clientAppId(), topicName);
@@ -911,13 +913,13 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean 
authoritative) {
         log.info("[{}] Unloading topic {}", clientAppId(), topicName);
-        CompletableFuture<Void> future;
-        if (topicName.isGlobal()) {
-            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            future = CompletableFuture.completedFuture(null);
-        }
-       future.thenAccept(__ -> {
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.UNLOAD);
+        future.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            }
+            return CompletableFuture.completedFuture(null);
+        }).thenAccept(__ -> {
            // If the topic name is a partition name, no need to get partition 
topic metadata again
            if (topicName.isPartitioned()) {
                if (isTransactionCoordinatorAssign(topicName)) {
@@ -1134,13 +1136,12 @@ public class PersistentTopicsBase extends AdminResource 
{
 
     private void internalUnloadNonPartitionedTopicAsync(AsyncResponse 
asyncResponse, boolean authoritative) {
         validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(unused -> validateTopicOperationAsync(topicName, 
TopicOperation.UNLOAD)
-                        .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
                         .thenCompose(topic -> topic.close(false))
                         .thenRun(() -> {
                             log.info("[{}] Successfully unloaded topic {}", 
clientAppId(), topicName);
                             asyncResponse.resume(Response.noContent().build());
-                        }))
+                        })
                 .exceptionally(ex -> {
                     // If the exception is not redirect exception we need to 
log it.
                     if (!isNot307And404Exception(ex)) {
@@ -1153,8 +1154,7 @@ public class PersistentTopicsBase extends AdminResource {
 
     private void internalUnloadTransactionCoordinatorAsync(AsyncResponse 
asyncResponse, boolean authoritative) {
         validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.UNLOAD)
-                        .thenCompose(v -> pulsar()
+                .thenCompose(v -> pulsar()
                                 .getTransactionMetadataStoreService()
                                 .removeTransactionMetadataStore(
                                         
TransactionCoordinatorID.get(topicName.getPartitionIndex())))
@@ -1162,7 +1162,7 @@ public class PersistentTopicsBase extends AdminResource {
                             log.info("[{}] Successfully unloaded tc {}", 
clientAppId(),
                                     topicName.getPartitionIndex());
                             asyncResponse.resume(Response.noContent().build());
-                        }))
+                        })
                 .exceptionally(ex -> {
                     // If the exception is not redirect exception we need to 
log it.
                     if (!isNot307And404Exception(ex)) {
@@ -1373,13 +1373,13 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, 
boolean authoritative) {
-        CompletableFuture<Void> future;
-        if (topicName.isGlobal()) {
-            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            future = CompletableFuture.completedFuture(null);
-        }
-        future.thenAccept(__ -> {
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
+        future.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            }
+            return CompletableFuture.completedFuture(null);
+        }).thenAccept(__ -> {
             // If the topic name is a partition name, no need to get partition 
topic metadata again
             if (topicName.isPartitioned()) {
                 
internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse);
@@ -1484,13 +1484,13 @@ public class PersistentTopicsBase extends AdminResource 
{
     protected void internalGetPartitionedStats(AsyncResponse asyncResponse, 
boolean authoritative, boolean perPartition,
                                                boolean getPreciseBacklog, 
boolean subscriptionBacklogSize,
                                                boolean 
getEarliestTimeInBacklog) {
-        CompletableFuture<Void> future;
-        if (topicName.isGlobal()) {
-            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            future = CompletableFuture.completedFuture(null);
-        }
-        future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
+        future.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            }
+            return  CompletableFuture.completedFuture(null);
+        }).thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
                 authoritative, false)).thenAccept(partitionMetadata -> {
             if (partitionMetadata.partitions == 0) {
                 asyncResponse.resume(new RestException(Status.NOT_FOUND,
@@ -1570,14 +1570,15 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected void internalGetPartitionedStatsInternal(AsyncResponse 
asyncResponse, boolean authoritative) {
-        CompletableFuture<Void> future;
-        if (topicName.isGlobal()) {
-            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            future = CompletableFuture.completedFuture(null);
-        }
-        future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
authoritative, false))
-                .thenAccept(partitionMetadata -> {
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
+        future.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }).thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
authoritative, false))
+        .thenAccept(partitionMetadata -> {
             if (partitionMetadata.partitions == 0) {
                 asyncResponse.resume(new RestException(Status.NOT_FOUND,
                         
getPartitionedTopicNotFoundErrorMessage(topicName.toString())));
@@ -2330,13 +2331,14 @@ public class PersistentTopicsBase extends AdminResource 
{
 
     protected void internalCreateSubscription(AsyncResponse asyncResponse, 
String subscriptionName,
             MessageIdImpl messageId, boolean authoritative, boolean 
replicated, Map<String, String> properties) {
-        CompletableFuture<Void> ret;
-        if (topicName.isGlobal()) {
-            ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            ret = CompletableFuture.completedFuture(null);
-        }
-        ret.thenAccept(__ -> {
+        CompletableFuture<Void> ret = validateTopicOperationAsync(topicName, 
TopicOperation.SUBSCRIBE,
+                subscriptionName);
+        ret.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            }
+            return CompletableFuture.completedFuture(null);
+        }).thenAccept(__ -> {
             final MessageIdImpl targetMessageId = messageId == null ? 
(MessageIdImpl) MessageId.latest : messageId;
             log.info("[{}][{}] Creating subscription {} at message id {} with 
properties {}", clientAppId(),
                     topicName, subscriptionName, targetMessageId, properties);
@@ -2495,14 +2497,13 @@ public class PersistentTopicsBase extends AdminResource 
{
     protected void internalUpdateSubscriptionProperties(AsyncResponse 
asyncResponse, String subName,
                                                         Map<String, String> 
subscriptionProperties,
                                                         boolean authoritative) 
{
-        CompletableFuture<Void> future;
-        if (topicName.isGlobal()) {
-            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            future = CompletableFuture.completedFuture(null);
-        }
-
-        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative)).thenAccept(__ -> {
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.SUBSCRIBE, subName);
+        future.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            }
+            return CompletableFuture.completedFuture(null);
+        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative)).thenAccept(__ -> {
             if (topicName.isPartitioned()) {
                 
internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, 
subName,
                         subscriptionProperties, authoritative);
@@ -2574,14 +2575,13 @@ public class PersistentTopicsBase extends AdminResource 
{
     protected void internalAnalyzeSubscriptionBacklog(AsyncResponse 
asyncResponse, String subName,
                                                       Optional<Position> 
position,
                                                       boolean authoritative) {
-        CompletableFuture<Void> future;
-        if (topicName.isGlobal()) {
-            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            future = CompletableFuture.completedFuture(null);
-        }
-
-        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName);
+        future.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            }
+            return CompletableFuture.completedFuture(null);
+        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
                 .thenCompose(__ -> {
                     if (topicName.isPartitioned()) {
                         return CompletableFuture.completedFuture(null);
@@ -2613,14 +2613,13 @@ public class PersistentTopicsBase extends AdminResource 
{
 
     protected void internalGetSubscriptionProperties(AsyncResponse 
asyncResponse, String subName,
                                                         boolean authoritative) 
{
-        CompletableFuture<Void> future;
-        if (topicName.isGlobal()) {
-            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            future = CompletableFuture.completedFuture(null);
-        }
-
-        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative)).thenAccept(__ -> {
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName);
+        future.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            }
+            return CompletableFuture.completedFuture(null);
+        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative)).thenAccept(__ -> {
             if (topicName.isPartitioned()) {
                 
internalGetSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
                         authoritative);
@@ -4248,13 +4247,14 @@ public class PersistentTopicsBase extends AdminResource 
{
 
     protected void internalTriggerCompaction(AsyncResponse asyncResponse, 
boolean authoritative) {
         log.info("[{}] Trigger compaction on topic {}", clientAppId(), 
topicName);
-        CompletableFuture<Void> future;
-        if (topicName.isGlobal()) {
-            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            future = CompletableFuture.completedFuture(null);
-        }
-        future.thenAccept(__ -> {
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.COMPACT);
+        future.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }).thenAccept(__ -> {
             // If the topic name is a partition name, no need to get partition 
topic metadata again
             if (topicName.isPartitioned()) {
                 internalTriggerCompactionNonPartitionedTopic(asyncResponse, 
authoritative);
@@ -4734,11 +4734,12 @@ public class PersistentTopicsBase extends AdminResource 
{
                     "Trim on a non-persistent topic is not allowed"));
             return null;
         }
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.TRIM_TOPIC);
         if (topicName.isPartitioned()) {
-            return validateTopicOperationAsync(topicName, 
TopicOperation.TRIM_TOPIC).thenCompose((x)
+            return future.thenCompose((x)
                     -> trimNonPartitionedTopic(asyncResponse, topicName, 
authoritative));
         }
-        return validateTopicOperationAsync(topicName, 
TopicOperation.TRIM_TOPIC)
+        return future
                 .thenCompose(__ -> 
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName))
                 .thenCompose(metadata -> {
                     if (metadata.partitions > 0) {
@@ -5420,12 +5421,12 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected CompletableFuture<SchemaCompatibilityStrategy> 
internalGetSchemaCompatibilityStrategy(boolean applied) {
+        CompletableFuture<Void> future = 
validateTopicPolicyOperationAsync(topicName,
+                PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, 
PolicyOperation.READ);
         if (applied) {
-            return getSchemaCompatibilityStrategyAsync();
+            return future.thenCompose(__ -> 
getSchemaCompatibilityStrategyAsync());
         }
-        return validateTopicPolicyOperationAsync(topicName,
-                PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
-                PolicyOperation.READ)
+        return future
                 .thenCompose(n -> 
getTopicPoliciesAsyncWithRetry(topicName).thenApply(op -> {
                     if (!op.isPresent()) {
                         return null;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
new file mode 100644
index 00000000000..cc905608016
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.admin;
+
+import io.jsonwebtoken.Jwts;
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.security.MockedPulsarStandalone;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+@Test(groups = "broker-admin")
+public class NamespaceAuthZTest extends MockedPulsarStandalone {
+
+    private PulsarAdmin superUserAdmin;
+
+    private PulsarAdmin tenantManagerAdmin;
+
+    private static final String TENANT_ADMIN_SUBJECT =  
UUID.randomUUID().toString();
+    private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
+            .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
+
+    @SneakyThrows
+    @BeforeClass
+    public void before() {
+        configureTokenAuthentication();
+        configureDefaultAuthorization();
+        start();
+        this.superUserAdmin =PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
+                .build();
+        final TenantInfo tenantInfo = 
superUserAdmin.tenants().getTenantInfo("public");
+        tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT);
+        superUserAdmin.tenants().updateTenant("public", tenantInfo);
+        this.tenantManagerAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
+                .build();
+    }
+
+
+    @SneakyThrows
+    @AfterClass
+    public void after() {
+        if (superUserAdmin != null) {
+            superUserAdmin.close();
+        }
+        if (tenantManagerAdmin != null) {
+            tenantManagerAdmin.close();
+        }
+        close();
+    }
+
+
+    @SneakyThrows
+    @Test
+    public void testProperties() {
+        final String random = UUID.randomUUID().toString();
+        final String namespace = "public/default";
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        // test superuser
+        Map<String, String> properties = new HashMap<>();
+        properties.put("key1", "value1");
+        superUserAdmin.namespaces().setProperties(namespace, properties);
+        superUserAdmin.namespaces().setProperty(namespace, "key2", "value2");
+        superUserAdmin.namespaces().getProperties(namespace);
+        superUserAdmin.namespaces().getProperty(namespace, "key2");
+        superUserAdmin.namespaces().removeProperty(namespace, "key2");
+        superUserAdmin.namespaces().clearProperties(namespace);
+
+        // test tenant manager
+        tenantManagerAdmin.namespaces().setProperties(namespace, properties);
+        tenantManagerAdmin.namespaces().setProperty(namespace, "key2", 
"value2");
+        tenantManagerAdmin.namespaces().getProperties(namespace);
+        tenantManagerAdmin.namespaces().getProperty(namespace, "key2");
+        tenantManagerAdmin.namespaces().removeProperty(namespace, "key2");
+        tenantManagerAdmin.namespaces().clearProperties(namespace);
+
+        // test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().setProperties(namespace, 
properties));
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().setProperty(namespace, "key2", 
"value2"));
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().getProperties(namespace));
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().getProperty(namespace, "key2"));
+
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().removeProperty(namespace, "key2"));
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().clearProperties(namespace));
+
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subject, Set.of(action));
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> subAdmin.namespaces().setProperties(namespace, 
properties));
+
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> subAdmin.namespaces().setProperty(namespace, "key2", 
"value2"));
+
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> subAdmin.namespaces().getProperties(namespace));
+
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> subAdmin.namespaces().getProperty(namespace, 
"key2"));
+
+
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> subAdmin.namespaces().removeProperty(namespace, 
"key2"));
+
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> subAdmin.namespaces().clearProperties(namespace));
+
+            
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
+        }
+        superUserAdmin.topics().delete(topic, true);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
new file mode 100644
index 00000000000..370aea270f2
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.admin;
+
+import io.jsonwebtoken.Jwts;
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.security.MockedPulsarStandalone;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Test(groups = "broker-admin")
+public class TopicAuthZTest extends MockedPulsarStandalone {
+
+    private PulsarAdmin superUserAdmin;
+
+    private PulsarAdmin tenantManagerAdmin;
+
+    private static final String TENANT_ADMIN_SUBJECT =  
UUID.randomUUID().toString();
+    private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
+            .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
+
+    @SneakyThrows
+    @BeforeClass
+    public void before() {
+        configureTokenAuthentication();
+        configureDefaultAuthorization();
+        start();
+        this.superUserAdmin =PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
+                .build();
+        final TenantInfo tenantInfo = 
superUserAdmin.tenants().getTenantInfo("public");
+        tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT);
+        superUserAdmin.tenants().updateTenant("public", tenantInfo);
+        this.tenantManagerAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
+                .build();
+    }
+
+
+    @SneakyThrows
+    @AfterClass
+    public void after() {
+        if (superUserAdmin != null) {
+            superUserAdmin.close();
+        }
+        if (tenantManagerAdmin != null) {
+            tenantManagerAdmin.close();
+        }
+        close();
+    }
+
+
+    @SneakyThrows
+    @Test
+    public void testUnloadAndCompactAndTrim() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createPartitionedTopic(topic, 2);
+
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        // test superuser
+        superUserAdmin.topics().unload(topic);
+        superUserAdmin.topics().triggerCompaction(topic);
+        
superUserAdmin.topics().trimTopic(TopicName.get(topic).getPartition(0).getLocalName());
+        superUserAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, 
false);
+
+        // test tenant manager
+        tenantManagerAdmin.topics().unload(topic);
+        tenantManagerAdmin.topics().triggerCompaction(topic);
+        
tenantManagerAdmin.topics().trimTopic(TopicName.get(topic).getPartition(0).getLocalName());
+        
tenantManagerAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false);
+
+        // test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().unload(topic));
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().triggerCompaction(topic));
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topics().trimTopic(TopicName.get(topic).getPartition(0).getLocalName()));
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false));
+
+        // Test only super/admin can do the operation, other auth are not 
permitted.
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
+
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> subAdmin.topics().unload(topic));
+
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> subAdmin.topics().triggerCompaction(topic));
+
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> subAdmin.topics().trimTopic(topic));
+
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> 
subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false));
+
+            superUserAdmin.topics().revokePermissions(topic, subject);
+        }
+        superUserAdmin.topics().deletePartitionedTopic(topic, true);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testGetManagedLedgerInfo() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createPartitionedTopic(topic, 2);
+
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        // test superuser
+        superUserAdmin.topics().getInternalInfo(topic);
+
+        // test tenant manager
+        tenantManagerAdmin.topics().getInternalInfo(topic);
+
+        // test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().getInternalInfo(topic));
+
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
+            if (action == AuthAction.produce || action == AuthAction.consume) {
+                subAdmin.topics().getInternalInfo(topic);
+            } else {
+                
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                        () -> subAdmin.topics().getInternalInfo(topic));
+            }
+            superUserAdmin.topics().revokePermissions(topic, subject);
+        }
+        superUserAdmin.topics().deletePartitionedTopic(topic, true);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testGetPartitionedStatsAndInternalStats() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createPartitionedTopic(topic, 2);
+
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        // test superuser
+        superUserAdmin.topics().getPartitionedStats(topic, false);
+        superUserAdmin.topics().getPartitionedInternalStats(topic);
+
+        // test tenant manager
+        tenantManagerAdmin.topics().getPartitionedStats(topic, false);
+        tenantManagerAdmin.topics().getPartitionedInternalStats(topic);
+
+        // test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().getPartitionedStats(topic, false));
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().getPartitionedInternalStats(topic));
+
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
+            if (action == AuthAction.produce || action == AuthAction.consume) {
+                subAdmin.topics().getPartitionedStats(topic, false);
+                subAdmin.topics().getPartitionedInternalStats(topic);
+            } else {
+                
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                        () -> subAdmin.topics().getPartitionedStats(topic, 
false));
+
+                
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                        () -> 
subAdmin.topics().getPartitionedInternalStats(topic));
+            }
+            superUserAdmin.topics().revokePermissions(topic, subject);
+        }
+        superUserAdmin.topics().deletePartitionedTopic(topic, true);
+    }
+
+    @Test
+    @SneakyThrows
+    public void 
testCreateSubscriptionAndUpdateSubscriptionPropertiesAndAnalyzeSubscriptionBacklog()
 {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createPartitionedTopic(topic, 2);
+        AtomicInteger suffix = new AtomicInteger(1);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        //
+        superUserAdmin.topics().createSubscription(topic, "test-sub" + 
suffix.incrementAndGet(), MessageId.earliest);
+
+        // test tenant manager
+        tenantManagerAdmin.topics().createSubscription(topic, "test-sub" + 
suffix.incrementAndGet(), MessageId.earliest);
+
+        // test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().createSubscription(topic, "test-sub" + 
suffix.incrementAndGet(), MessageId.earliest));
+
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
+            if (action == AuthAction.consume) {
+                subAdmin.topics().createSubscription(topic, "test-sub" + 
suffix.incrementAndGet(), MessageId.earliest);
+            } else {
+                
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                        () -> subAdmin.topics().createSubscription(topic, 
"test-sub" + suffix.incrementAndGet(), MessageId.earliest));
+            }
+            superUserAdmin.topics().revokePermissions(topic, subject);
+        }
+        // test UpdateSubscriptionProperties
+        Map<String, String> properties = new HashMap<>();
+        superUserAdmin.topics().createSubscription(topic, "test-sub", 
MessageId.earliest);
+        // test superuser
+        superUserAdmin.topics().updateSubscriptionProperties(topic, "test-sub" 
, properties);
+        superUserAdmin.topics().getSubscriptionProperties(topic, "test-sub");
+        
superUserAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
 "test-sub", Optional.empty());
+
+        // test tenant manager
+        tenantManagerAdmin.topics().updateSubscriptionProperties(topic, 
"test-sub" , properties);
+        tenantManagerAdmin.topics().getSubscriptionProperties(topic, 
"test-sub");
+        
tenantManagerAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
 "test-sub", Optional.empty());
+
+        // test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().updateSubscriptionProperties(topic, 
"test-sub", properties));
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().getSubscriptionProperties(topic, 
"test-sub"));
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
 "test-sub", Optional.empty()));
+
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
+            if (action == AuthAction.consume) {
+                subAdmin.topics().updateSubscriptionProperties(topic, 
"test-sub", properties);
+                subAdmin.topics().getSubscriptionProperties(topic, "test-sub");
+                
subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
 "test-sub", Optional.empty());
+            } else {
+                
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                        () -> 
subAdmin.topics().updateSubscriptionProperties(topic, "test-sub", properties));
+
+                
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                        () -> 
subAdmin.topics().getSubscriptionProperties(topic, "test-sub"));
+
+                
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                        () -> 
subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
 "test-sub", Optional.empty()));
+            }
+            superUserAdmin.topics().revokePermissions(topic, subject);
+        }
+        superUserAdmin.topics().deletePartitionedTopic(topic, true);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testCreateMissingPartition() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createPartitionedTopic(topic, 2);
+        AtomicInteger suffix = new AtomicInteger(1);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        //
+        superUserAdmin.topics().createMissedPartitions(topic);
+
+        // test tenant manager
+        tenantManagerAdmin.topics().createMissedPartitions(topic);
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().createMissedPartitions(topic));
+
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> subAdmin.topics().createMissedPartitions(topic));
+            superUserAdmin.topics().revokePermissions(topic, subject);
+        }
+        superUserAdmin.topics().deletePartitionedTopic(topic, true);
+    }
+}

Reply via email to