This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ca01cd42ed [improve][admin]  Align the auth and check it at the first 
place for topic related API (#22507)
8ca01cd42ed is described below

commit 8ca01cd42edfd4efd986f752f6f8538ea5bf4f94
Author: Jiwei Guo <techno...@apache.org>
AuthorDate: Wed Apr 17 18:46:22 2024 +0800

    [improve][admin]  Align the auth and check it at the first place for topic 
related API (#22507)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 419 ++++++++++-----------
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  44 ++-
 .../apache/pulsar/broker/admin/TopicAuthZTest.java | 257 +++++++++++--
 3 files changed, 447 insertions(+), 273 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ab74b1e2bcc..1f8d0657190 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -128,8 +128,6 @@ import 
org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.PolicyName;
-import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -2727,14 +2725,14 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected CompletableFuture<MessageId> 
internalGetMessageIdByTimestampAsync(long timestamp, boolean authoritative) {
-        CompletableFuture<Void> future;
-        if (topicName.isGlobal()) {
-            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            future = CompletableFuture.completedFuture(null);
-        }
-
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
         return future.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }).thenCompose(__ -> {
                 if (topicName.isPartitioned()) {
                     return CompletableFuture.completedFuture(null);
                 } else {
@@ -2748,7 +2746,6 @@ public class PersistentTopicsBase extends AdminResource {
                         });
                 }
             }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
-            .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.PEEK_MESSAGES))
             .thenCompose(__ -> getTopicReferenceAsync(topicName))
             .thenCompose(topic -> {
                 if (!(topic instanceof PersistentTopic)) {
@@ -3158,65 +3155,56 @@ public class PersistentTopicsBase extends AdminResource 
{
 
     protected void internalGetBacklogSizeByMessageId(AsyncResponse 
asyncResponse,
                                                      MessageIdImpl messageId, 
boolean authoritative) {
-        CompletableFuture<Void> ret;
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (!topicName.isPartitioned()) {
-            ret = getPartitionedTopicMetadataAsync(topicName, authoritative, 
false)
-                    .thenCompose(topicMetadata -> {
-                        if (topicMetadata.partitions > 0) {
-                            log.warn("[{}] Not supported calculate backlog 
size operation on partitioned-topic {}",
-                                    clientAppId(), topicName);
-                            asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED,
-                                    "calculate backlog size is not allowed for 
partitioned-topic"));
-                        }
-                        return CompletableFuture.completedFuture(null);
-                    });
-        } else {
-            ret = CompletableFuture.completedFuture(null);
-        }
-        CompletableFuture<Void> future;
-        if (topicName.isGlobal()) {
-            future = ret.thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName));
-        } else {
-            future = ret;
-        }
-        future.thenAccept(__ -> validateTopicOwnershipAsync(topicName, 
authoritative)
-                .thenCompose(unused -> validateTopicOperationAsync(topicName,
-                        TopicOperation.GET_BACKLOG_SIZE))
-                .thenCompose(unused -> getTopicReferenceAsync(topicName))
-                .thenAccept(t -> {
-                    PersistentTopic topic = (PersistentTopic) t;
-                    PositionImpl pos = new 
PositionImpl(messageId.getLedgerId(),
-                            messageId.getEntryId());
-                    if (topic == null) {
-                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
-                                
getTopicNotFoundErrorMessage(topicName.toString())));
-                        return;
-                    }
-                    ManagedLedgerImpl managedLedger =
-                            (ManagedLedgerImpl) topic.getManagedLedger();
-                    if (messageId.getLedgerId() == -1) {
-                        asyncResponse.resume(managedLedger.getTotalSize());
-                    } else {
-                        
asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
-                    }
-                }).exceptionally(ex -> {
-                    // If the exception is not redirect exception we need to 
log it.
-                    if (isNot307And404Exception(ex)) {
-                        log.error("[{}] Failed to get backlog size for topic 
{}", clientAppId(),
-                                topicName, ex);
-                    }
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
-                })).exceptionally(ex -> {
-                        // If the exception is not redirect exception we need 
to log it.
-                        if (isNot307And404Exception(ex)) {
-                            log.error("[{}] Failed to validate global 
namespace ownership "
-                                    + "to get backlog size for topic {}", 
clientAppId(), topicName, ex);
-                        }
-                        resumeAsyncResponseExceptionally(asyncResponse, ex);
-                        return null;
-                });
+        CompletableFuture<Void> ret = validateTopicOperationAsync(topicName, 
TopicOperation.GET_BACKLOG_SIZE);
+        ret.thenCompose(__ -> {
+            // If the topic name is a partition name, no need to get partition 
topic metadata again
+            if (!topicName.isPartitioned()) {
+                return getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
+                        .thenCompose(topicMetadata -> {
+                            if (topicMetadata.partitions > 0) {
+                                log.warn("[{}] Not supported calculate backlog 
size operation on partitioned-topic {}",
+                                        clientAppId(), topicName);
+                                asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                        "calculate backlog size is not allowed 
for partitioned-topic"));
+                            }
+                            return CompletableFuture.completedFuture(null);
+                        });
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }).thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        .thenCompose(unused -> getTopicReferenceAsync(topicName))
+        .thenAccept(t -> {
+            PersistentTopic topic = (PersistentTopic) t;
+            PositionImpl pos = new PositionImpl(messageId.getLedgerId(),
+                    messageId.getEntryId());
+            if (topic == null) {
+                asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                        getTopicNotFoundErrorMessage(topicName.toString())));
+                return;
+            }
+            ManagedLedgerImpl managedLedger =
+                    (ManagedLedgerImpl) topic.getManagedLedger();
+            if (messageId.getLedgerId() == -1) {
+                asyncResponse.resume(managedLedger.getTotalSize());
+            } else {
+                
asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
+            }
+        }).exceptionally(ex -> {
+            // If the exception is not redirect exception we need to log it.
+            if (!isNot307And404Exception(ex)) {
+                log.error("[{}] Failed to validate global namespace ownership "
+                        + "to get backlog size for topic {}", clientAppId(), 
topicName, ex);
+            }
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
+            return null;
+        });
     }
 
     protected CompletableFuture<Void> 
internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType,
@@ -3224,8 +3212,7 @@ public class PersistentTopicsBase extends AdminResource {
         BacklogQuota.BacklogQuotaType finalBacklogQuotaType = backlogQuotaType 
== null
                 ? BacklogQuota.BacklogQuotaType.destination_storage : 
backlogQuotaType;
 
-        return validateTopicPolicyOperationAsync(topicName, 
PolicyName.BACKLOG, PolicyOperation.WRITE)
-                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+        return validatePoliciesReadOnlyAccessAsync()
                 .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, 
isGlobal))
                 .thenCompose(op -> {
                     TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);
@@ -3266,9 +3253,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> 
internalSetReplicationClusters(List<String> clusterIds) {
-
-        return validateTopicPolicyOperationAsync(topicName, 
PolicyName.REPLICATION, PolicyOperation.WRITE)
-                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+        return validatePoliciesReadOnlyAccessAsync()
                 .thenCompose(__ -> {
                     if (CollectionUtils.isEmpty(clusterIds)) {
                         throw new RestException(Status.PRECONDITION_FAILED, 
"ClusterIds should not be null or empty");
@@ -3306,22 +3291,21 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected CompletableFuture<Void> internalRemoveReplicationClusters() {
-        return validateTopicPolicyOperationAsync(topicName, 
PolicyName.REPLICATION, PolicyOperation.WRITE)
-                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
-                .thenCompose(__ -> 
getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
-                            TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);
-                            topicPolicies.setReplicationClusters(null);
-                            return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies)
-                                    .thenRun(() -> {
-                                        log.info("[{}] Successfully set 
replication clusters for namespace={}, "
-                                                        + "topic={}, 
clusters={}",
-                                                clientAppId(),
-                                                namespaceName,
-                                                topicName.getLocalName(),
-                                                
topicPolicies.getReplicationClusters());
-                                    });
-                        })
-                );
+        return validatePoliciesReadOnlyAccessAsync()
+                .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setReplicationClusters(null);
+                    return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies)
+                            .thenRun(() -> {
+                                log.info("[{}] Successfully set replication 
clusters for namespace={}, "
+                                                + "topic={}, clusters={}",
+                                        clientAppId(),
+                                        namespaceName,
+                                        topicName.getLocalName(),
+                                        
topicPolicies.getReplicationClusters());
+                            });
+                });
     }
 
     protected CompletableFuture<Boolean> internalGetDeduplication(boolean 
applied, boolean isGlobal) {
@@ -3683,29 +3667,29 @@ public class PersistentTopicsBase extends AdminResource 
{
                     "Termination of a system topic is not allowed"));
         }
 
-        CompletableFuture<Void> ret;
-        if (topicName.isGlobal()) {
-            ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            ret = CompletableFuture.completedFuture(null);
-        }
-        return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.TERMINATE))
-                .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
authoritative, false))
-                .thenAccept(partitionMetadata -> {
-                    if (partitionMetadata.partitions > 0) {
-                        throw new RestException(Status.METHOD_NOT_ALLOWED,
-                                "Termination of a partitioned topic is not 
allowed");
-                    }
-                })
-                .thenCompose(__ -> getTopicReferenceAsync(topicName))
-                .thenCompose(topic -> {
-                    if (!(topic instanceof PersistentTopic)) {
-                        throw new RestException(Status.METHOD_NOT_ALLOWED,
-                                "Termination of a non-persistent topic is not 
allowed");
-                    }
-                    return ((PersistentTopic) topic).terminate();
-                });
+        CompletableFuture<Void> ret = validateTopicOperationAsync(topicName, 
TopicOperation.TERMINATE);
+        return ret.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
authoritative, false))
+        .thenAccept(partitionMetadata -> {
+            if (partitionMetadata.partitions > 0) {
+                throw new RestException(Status.METHOD_NOT_ALLOWED,
+                        "Termination of a partitioned topic is not allowed");
+            }
+        })
+        .thenCompose(__ -> getTopicReferenceAsync(topicName))
+        .thenCompose(topic -> {
+            if (!(topic instanceof PersistentTopic)) {
+                throw new RestException(Status.METHOD_NOT_ALLOWED,
+                        "Termination of a non-persistent topic is not 
allowed");
+            }
+            return ((PersistentTopic) topic).terminate();
+        });
     }
 
     protected void internalTerminatePartitionedTopic(AsyncResponse 
asyncResponse, boolean authoritative) {
@@ -3716,73 +3700,63 @@ public class PersistentTopicsBase extends AdminResource 
{
             asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, 
msg));
             return;
         }
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.TERMINATE);
+        future.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }).thenCompose(unused -> getPartitionedTopicMetadataAsync(topicName, 
authoritative, false))
+        .thenAccept(partitionMetadata -> {
+            if (partitionMetadata.partitions == 0) {
+                String msg = "Termination of a non-partitioned topic is not 
allowed using partitioned-terminate"
+                        + ", please use terminate commands";
+                log.error("[{}] [{}] {}", clientAppId(), topicName, msg);
+                asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED, msg));
+                return;
+            }
+            if (partitionMetadata.partitions > 0) {
+                Map<Integer, MessageId> messageIds = new 
ConcurrentHashMap<>(partitionMetadata.partitions);
+                final List<CompletableFuture<MessageId>> futures =
+                        new ArrayList<>(partitionMetadata.partitions);
 
-        CompletableFuture<Void> future;
-        if (topicName.isGlobal()) {
-            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            future = CompletableFuture.completedFuture(null);
-        }
-
-        future.thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.TERMINATE)
-                .thenCompose(unused -> 
getPartitionedTopicMetadataAsync(topicName, authoritative, false))
-                .thenAccept(partitionMetadata -> {
-                    if (partitionMetadata.partitions == 0) {
-                        String msg = "Termination of a non-partitioned topic 
is not allowed using partitioned-terminate"
-                                + ", please use terminate commands";
-                        log.error("[{}] [{}] {}", clientAppId(), topicName, 
msg);
-                        asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED, msg));
-                        return;
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    TopicName topicNamePartition = topicName.getPartition(i);
+                    try {
+                        int finalI = i;
+                        futures.add(pulsar().getAdminClient().topics()
+                                
.terminateTopicAsync(topicNamePartition.toString())
+                                .whenComplete((messageId, throwable) -> {
+                                    if (throwable != null) {
+                                        log.error("[{}] Failed to terminate 
topic {}", clientAppId(),
+                                                topicNamePartition, throwable);
+                                        asyncResponse.resume(new 
RestException(throwable));
+                                    }
+                                    messageIds.put(finalI, messageId);
+                                }));
+                    } catch (Exception e) {
+                        log.error("[{}] Failed to terminate topic {}", 
clientAppId(), topicNamePartition,
+                                e);
+                        throw new RestException(e);
                     }
-                    if (partitionMetadata.partitions > 0) {
-                        Map<Integer, MessageId> messageIds = new 
ConcurrentHashMap<>(partitionMetadata.partitions);
-                        final List<CompletableFuture<MessageId>> futures =
-                                new ArrayList<>(partitionMetadata.partitions);
-
-                        for (int i = 0; i < partitionMetadata.partitions; i++) 
{
-                            TopicName topicNamePartition = 
topicName.getPartition(i);
-                            try {
-                                int finalI = i;
-                                futures.add(pulsar().getAdminClient().topics()
-                                        
.terminateTopicAsync(topicNamePartition.toString())
-                                        .whenComplete((messageId, throwable) 
-> {
-                                            if (throwable != null) {
-                                                log.error("[{}] Failed to 
terminate topic {}", clientAppId(),
-                                                        topicNamePartition, 
throwable);
-                                                asyncResponse.resume(new 
RestException(throwable));
-                                            }
-                                            messageIds.put(finalI, messageId);
-                                        }));
-                            } catch (Exception e) {
-                                log.error("[{}] Failed to terminate topic {}", 
clientAppId(), topicNamePartition,
-                                        e);
-                                throw new RestException(e);
-                            }
+                }
+                FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                    if (exception != null) {
+                        Throwable t = exception.getCause();
+                        if (t instanceof NotFoundException) {
+                            asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
+                                    
getTopicNotFoundErrorMessage(topicName.toString())));
+                        } else {
+                            log.error("[{}] Failed to terminate topic {}", 
clientAppId(), topicName, t);
+                            asyncResponse.resume(new RestException(t));
                         }
-                        FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
-                            if (exception != null) {
-                                Throwable t = exception.getCause();
-                                if (t instanceof NotFoundException) {
-                                    asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
-                                            
getTopicNotFoundErrorMessage(topicName.toString())));
-                                } else {
-                                    log.error("[{}] Failed to terminate topic 
{}", clientAppId(), topicName, t);
-                                    asyncResponse.resume(new RestException(t));
-                                }
-                            }
-                            asyncResponse.resume(messageIds);
-                            return null;
-                        });
                     }
-                }).exceptionally(ex -> {
-                    // If the exception is not redirect exception we need to 
log it.
-                    if (isNot307And404Exception(ex)) {
-                        log.error("[{}] Failed to terminate topic {}", 
clientAppId(), topicName, ex);
-                    }
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    asyncResponse.resume(messageIds);
                     return null;
-                })
-        ).exceptionally(ex -> {
+                });
+            }
+        }).exceptionally(ex -> {
             // If the exception is not redirect exception we need to log it.
             if (isNot307And404Exception(ex)) {
                 log.error("[{}] Failed to terminate topic {}", clientAppId(), 
topicName, ex);
@@ -4186,16 +4160,16 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected CompletableFuture<LongRunningProcessStatus> 
internalCompactionStatusAsync(boolean authoritative) {
-        return validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.COMPACT))
+        return validateTopicOperationAsync(topicName, TopicOperation.COMPACT)
+                .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
                 .thenCompose(__ -> getTopicReferenceAsync(topicName))
                 .thenApply(topic -> ((PersistentTopic) 
topic).compactionStatus());
     }
 
     protected void internalTriggerOffload(AsyncResponse asyncResponse,
                                           boolean authoritative, MessageIdImpl 
messageId) {
-        validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.OFFLOAD))
+        validateTopicOperationAsync(topicName, TopicOperation.OFFLOAD)
+                .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
                 .thenCompose(__ -> getTopicReferenceAsync(topicName))
                 .thenAccept(topic -> {
                     try {
@@ -4221,8 +4195,8 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalOffloadStatus(AsyncResponse asyncResponse, boolean 
authoritative) {
-        validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.OFFLOAD))
+        validateTopicOperationAsync(topicName, TopicOperation.OFFLOAD)
+                .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
                 .thenCompose(__ -> getTopicReferenceAsync(topicName))
                 .thenAccept(topic -> {
                     OffloadProcessStatus offloadProcessStatus = 
((PersistentTopic) topic).offloadStatus();
@@ -4482,8 +4456,8 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalGetLastMessageId(AsyncResponse asyncResponse, 
boolean authoritative) {
-        validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.PEEK_MESSAGES))
+        validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES)
+                .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
                 .thenCompose(__ -> getTopicReferenceAsync(topicName))
                 .thenAccept(topic -> {
                     if (topic == null) {
@@ -5207,33 +5181,27 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected CompletableFuture<SchemaCompatibilityStrategy> 
internalGetSchemaCompatibilityStrategy(boolean applied) {
-        CompletableFuture<Void> future = 
validateTopicPolicyOperationAsync(topicName,
-                PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, 
PolicyOperation.READ);
         if (applied) {
-            return future.thenCompose(__ -> 
getSchemaCompatibilityStrategyAsync());
+            return getSchemaCompatibilityStrategyAsync();
         }
-        return future
-                .thenCompose(n -> 
getTopicPoliciesAsyncWithRetry(topicName).thenApply(op -> {
+        return getTopicPoliciesAsyncWithRetry(topicName).thenApply(op -> {
                     if (!op.isPresent()) {
                         return null;
                     }
                     SchemaCompatibilityStrategy strategy = 
op.get().getSchemaCompatibilityStrategy();
                     return SchemaCompatibilityStrategy.isUndefined(strategy) ? 
null : strategy;
-                }));
+                });
     }
 
     protected CompletableFuture<Void> 
internalSetSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
-        return validateTopicPolicyOperationAsync(topicName,
-                PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
-                PolicyOperation.WRITE)
-                .thenCompose((__) -> getTopicPoliciesAsyncWithRetry(topicName)
+        return getTopicPoliciesAsyncWithRetry(topicName)
                         .thenCompose(op -> {
                             TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);
                             topicPolicies.setSchemaCompatibilityStrategy(
                                     strategy == 
SchemaCompatibilityStrategy.UNDEFINED ? null : strategy);
                             return pulsar().getTopicPoliciesService()
                                     .updateTopicPoliciesAsync(topicName, 
topicPolicies);
-                        }));
+                        });
     }
 
     protected CompletableFuture<Boolean> 
internalGetSchemaValidationEnforced(boolean applied) {
@@ -5257,54 +5225,47 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected CompletableFuture<EntryFilters> internalGetEntryFilters(boolean 
applied, boolean isGlobal) {
-        return validateTopicPolicyOperationAsync(topicName, 
PolicyName.ENTRY_FILTERS, PolicyOperation.READ)
-                .thenCompose(__ -> {
-                    if (!applied) {
-                        return getTopicPoliciesAsyncWithRetry(topicName, 
isGlobal)
-                                .thenApply(op -> 
op.map(TopicPolicies::getEntryFilters).orElse(null));
-                    }
-                    if 
(!pulsar().getConfiguration().isAllowOverrideEntryFilters()) {
-                        return CompletableFuture.completedFuture(new 
EntryFilters(String.join(",",
-                                
pulsar().getConfiguration().getEntryFilterNames())));
+        if (!applied) {
+            return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+                    .thenApply(op -> 
op.map(TopicPolicies::getEntryFilters).orElse(null));
+        }
+        if (!pulsar().getConfiguration().isAllowOverrideEntryFilters()) {
+            return CompletableFuture.completedFuture(new 
EntryFilters(String.join(",",
+                    pulsar().getConfiguration().getEntryFilterNames())));
+        }
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+                .thenApply(op -> op.map(TopicPolicies::getEntryFilters))
+                .thenCompose(policyEntryFilters -> {
+                    if (policyEntryFilters.isPresent()) {
+                        return 
CompletableFuture.completedFuture(policyEntryFilters.get());
                     }
-                    return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
-                            .thenApply(op -> 
op.map(TopicPolicies::getEntryFilters))
-                            .thenCompose(policyEntryFilters -> {
-                                if (policyEntryFilters.isPresent()) {
-                                    return 
CompletableFuture.completedFuture(policyEntryFilters.get());
+                    return getNamespacePoliciesAsync(namespaceName)
+                            .thenApply(policies -> policies.entryFilters)
+                            .thenCompose(nsEntryFilters -> {
+                                if (nsEntryFilters != null) {
+                                    return 
CompletableFuture.completedFuture(nsEntryFilters);
                                 }
-                                return getNamespacePoliciesAsync(namespaceName)
-                                        .thenApply(policies -> 
policies.entryFilters)
-                                        .thenCompose(nsEntryFilters -> {
-                                            if (nsEntryFilters != null) {
-                                                return 
CompletableFuture.completedFuture(nsEntryFilters);
-                                            }
-                                            return 
CompletableFuture.completedFuture(new EntryFilters(String.join(",",
-                                                    
pulsar().getConfiguration().getEntryFilterNames())));
-                                        });
+                                return CompletableFuture.completedFuture(new 
EntryFilters(String.join(",",
+                                        
pulsar().getConfiguration().getEntryFilterNames())));
                             });
                 });
     }
 
     protected CompletableFuture<Void> internalSetEntryFilters(EntryFilters 
entryFilters,
                                                               boolean 
isGlobal) {
-
-        return validateTopicPolicyOperationAsync(topicName, 
PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE)
-                .thenAccept(__ -> validateEntryFilters(entryFilters))
-                .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, 
isGlobal)
+        validateEntryFilters(entryFilters);
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
                 .thenCompose(op -> {
                     TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);
                     topicPolicies.setEntryFilters(entryFilters);
                     topicPolicies.setIsGlobal(isGlobal);
                     return pulsar().getTopicPoliciesService()
                             .updateTopicPoliciesAsync(topicName, 
topicPolicies);
-                }));
+                });
     }
 
     protected CompletableFuture<Void> internalRemoveEntryFilters(boolean 
isGlobal) {
-        return validateTopicPolicyOperationAsync(topicName, 
PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE)
-                .thenCompose(__ ->
-                        getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
                         .thenCompose(op -> {
                             if (!op.isPresent()) {
                                 return CompletableFuture.completedFuture(null);
@@ -5312,7 +5273,7 @@ public class PersistentTopicsBase extends AdminResource {
                             op.get().setEntryFilters(null);
                             op.get().setIsGlobal(isGlobal);
                             return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
op.get());
-                        }));
+                        });
     }
 
     protected CompletableFuture<Void> validateShadowTopics(List<String> 
shadowTopics) {
@@ -5348,8 +5309,7 @@ public class PersistentTopicsBase extends AdminResource {
             return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
                     "Cannot specify empty shadow topics, please use remove 
command instead."));
         }
-        return validateTopicPolicyOperationAsync(topicName, 
PolicyName.SHADOW_TOPIC, PolicyOperation.WRITE)
-                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+        return validatePoliciesReadOnlyAccessAsync()
                 .thenCompose(__ -> validateShadowTopics(shadowTopics))
                 .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
                 .thenCompose(op -> {
@@ -5361,8 +5321,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalDeleteShadowTopics() {
-        return validateTopicPolicyOperationAsync(topicName, 
PolicyName.SHADOW_TOPIC, PolicyOperation.WRITE)
-                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+        return validatePoliciesReadOnlyAccessAsync()
                 .thenCompose(shadowTopicName -> 
getTopicPoliciesAsyncWithRetry(topicName))
                 .thenCompose(op -> {
                     TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 90f0208c81c..7e138442ae2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2149,7 +2149,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType,
             @ApiParam(value = "backlog quota policies for the specified 
topic") BacklogQuotaImpl backlogQuota) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.BACKLOG, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetBacklogQuota(backlogQuotaType, 
backlogQuota, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -2174,7 +2175,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.BACKLOG, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetBacklogQuota(backlogQuotaType, null, 
isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
@@ -2237,7 +2239,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @ApiParam(value = "List of replication clusters", required = true) 
List<String> clusterIds) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, 
PolicyOperation.WRITE)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> internalSetReplicationClusters(clusterIds))
                 .thenRun(() -> 
asyncResponse.resume(Response.noContent().build()))
                 .exceptionally(ex -> {
@@ -2260,7 +2263,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, 
PolicyOperation.WRITE)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> internalRemoveReplicationClusters())
                 .thenRun(() -> 
asyncResponse.resume(Response.noContent().build()))
                 .exceptionally(ex -> {
@@ -4405,8 +4409,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__-> 
internalGetSchemaCompatibilityStrategy(applied))
                 .thenAccept(asyncResponse::resume)
                 .exceptionally(ex -> {
@@ -4436,8 +4440,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Strategy used to check the compatibility of new 
schema")
                     SchemaCompatibilityStrategy strategy) {
         validateTopicName(tenant, namespace, encodedTopic);
-
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> 
internalSetSchemaCompatibilityStrategy(strategy))
                 .thenRun(() -> {
                     log.info(
@@ -4476,8 +4480,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Strategy used to check the compatibility of new 
schema")
                     SchemaCompatibilityStrategy strategy) {
         validateTopicName(tenant, namespace, encodedTopic);
-
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> 
internalSetSchemaCompatibilityStrategy(null))
                 .thenRun(() -> {
                     log.info(
@@ -4568,7 +4572,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                         + "broker. For internal use.")
                                 @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS, 
PolicyOperation.READ)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> internalGetEntryFilters(applied, isGlobal))
                 .thenAccept(asyncResponse::resume)
                 .exceptionally(ex -> {
@@ -4596,7 +4601,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                             @ApiParam(value = "Entry filters 
for the specified topic")
                                         EntryFilters entryFilters) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS, 
PolicyOperation.WRITE)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> internalSetEntryFilters(entryFilters, 
isGlobal))
                 .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
                 .exceptionally(ex -> {
@@ -4622,7 +4628,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                                             + "call to this broker. For 
internal use.")
                                     @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS, 
PolicyOperation.WRITE)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> internalRemoveEntryFilters(isGlobal))
                 .thenRun(() -> {
                     log.info(
@@ -4655,9 +4662,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
-                .thenCompose(__ -> 
validateTopicPolicyOperationAsync(topicName, PolicyName.SHADOW_TOPIC,
-                        PolicyOperation.READ))
+        validateTopicPolicyOperationAsync(topicName, PolicyName.SHADOW_TOPIC, 
PolicyOperation.READ)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
                 .thenAccept(op -> 
asyncResponse.resume(op.map(TopicPolicies::getShadowTopics).orElse(null)))
                 .exceptionally(ex -> {
@@ -4684,7 +4690,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @ApiParam(value = "List of shadow topics", required = true) 
List<String> shadowTopics) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.SHADOW_TOPIC, 
PolicyOperation.WRITE)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> internalSetShadowTopic(shadowTopics))
                 .thenRun(() -> 
asyncResponse.resume(Response.noContent().build()))
                 .exceptionally(ex -> {
@@ -4710,7 +4717,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.SHADOW_TOPIC, 
PolicyOperation.WRITE)
+                .thenCompose(__ -> preValidation(authoritative))
                 .thenCompose(__ -> internalDeleteShadowTopics())
                 .thenRun(() -> 
asyncResponse.resume(Response.noContent().build()))
                 .exceptionally(ex -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
index e6ff0ce2bb4..3c0596d531f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
@@ -19,48 +19,54 @@
 
 package org.apache.pulsar.broker.admin;
 
+import com.google.common.collect.Lists;
 import io.jsonwebtoken.Jwts;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
 import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.service.plugin.EntryFilterDefinition;
+import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
+import org.apache.pulsar.broker.service.plugin.EntryFilterTest;
+import org.apache.pulsar.broker.testcontext.MockEntryFilterProvider;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
-import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
-import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.security.MockedPulsarStandalone;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
-import org.apache.pulsar.security.MockedPulsarStandalone;
 import org.mockito.Mockito;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
 
 @Test(groups = "broker-admin")
 public class TopicAuthZTest extends MockedPulsarStandalone {
@@ -1105,15 +1111,15 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
         deleteTopic(topic, false);
     }
 
-    @Test(dataProvider = "partitioned", groups = "flaky")
+    @Test
     @SneakyThrows
-    public void testExpireMessage(boolean partitioned) {
+    public void testExpireMessage() {
         final String random = UUID.randomUUID().toString();
         final String topic = "persistent://public/default/" + random;
         final String subject =  UUID.randomUUID().toString();
         final String token = Jwts.builder()
                 .claim("sub", subject).signWith(SECRET_KEY).compact();
-        createTopic(topic, partitioned);
+        createTopic(topic, false);
         @Cleanup
         final PulsarAdmin subAdmin = PulsarAdmin.builder()
                 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
@@ -1153,7 +1159,7 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
             }
             superUserAdmin.topics().revokePermissions(topic, subject);
         }
-        deleteTopic(topic, partitioned);
+        deleteTopic(topic, false);
     }
 
     @Test
@@ -1373,6 +1379,37 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
         };
     }
 
+    @Test
+    @SneakyThrows
+    public void testSchemaCompatibility() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        superUserAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, 
true);
+
+        // test tenant manager
+        
tenantManagerAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, true);
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false));
+
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> 
subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false));
+            superUserAdmin.topics().revokePermissions(topic, subject);
+        }
+        deleteTopic(topic, false);
+    }
+
     @Test(dataProvider = "authFunction")
     public void 
testSchemaAndTransactionAuthorization(ThrowingBiConsumer<PulsarAdmin> 
adminConsumer, OperationAuthType topicOpType)
             throws Exception {
@@ -1380,6 +1417,7 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
         final String token = Jwts.builder()
                 .claim("sub", subject).signWith(SECRET_KEY).compact();
 
+
         @Cleanup
         final PulsarAdmin subAdmin = PulsarAdmin.builder()
                 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
@@ -1419,8 +1457,8 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
         if (execFlag != null) {
             Assert.assertTrue(execFlag.get());
         }
-    }
 
+    }
 
     private boolean authActionMatchOperation(OperationAuthType 
operationAuthType, AuthAction action) {
         switch (operationAuthType) {
@@ -1449,6 +1487,175 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
         return false;
     }
 
+    @Test
+    @SneakyThrows
+    public void testGetEntryFilter() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        //
+        superUserAdmin.topicPolicies().getEntryFiltersPerTopic(topic, true);
+
+        // test tenant manager
+        tenantManagerAdmin.topicPolicies().getEntryFiltersPerTopic(topic, 
true);
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().getEntryFiltersPerTopic(topic, 
false));
+
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> 
subAdmin.topicPolicies().getEntryFiltersPerTopic(topic, false));
+            superUserAdmin.topics().revokePermissions(topic, subject);
+        }
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testSetEntryFilter() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        //
+        final EntryFilterProvider oldEntryFilterProvider = 
getPulsarService().getBrokerService().getEntryFilterProvider();
+        @Cleanup
+        final MockEntryFilterProvider testEntryFilterProvider =
+                new MockEntryFilterProvider(getServiceConfiguration());
+
+        testEntryFilterProvider
+                .setMockEntryFilters(new EntryFilterDefinition(
+                        "test",
+                        null,
+                        EntryFilterTest.class.getName()
+                ));
+        FieldUtils.writeField(getPulsarService().getBrokerService(),
+                "entryFilterProvider", testEntryFilterProvider, true);
+        final EntryFilters entryFilter = new EntryFilters("test");
+        superUserAdmin.topicPolicies().setEntryFiltersPerTopic(topic, 
entryFilter);
+
+        // test tenant manager
+        tenantManagerAdmin.topicPolicies().setEntryFiltersPerTopic(topic, 
entryFilter);
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().setEntryFiltersPerTopic(topic, 
entryFilter));
+
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> 
subAdmin.topicPolicies().setEntryFiltersPerTopic(topic, entryFilter));
+            superUserAdmin.topics().revokePermissions(topic, subject);
+        }
+        deleteTopic(topic, false);
+        FieldUtils.writeField(getPulsarService().getBrokerService(),
+                "entryFilterProvider", oldEntryFilterProvider, true);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testRemoveEntryFilter() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        final EntryFilterProvider oldEntryFilterProvider = 
getPulsarService().getBrokerService().getEntryFilterProvider();
+        @Cleanup
+        final MockEntryFilterProvider testEntryFilterProvider =
+                new MockEntryFilterProvider(getServiceConfiguration());
+
+        testEntryFilterProvider
+                .setMockEntryFilters(new EntryFilterDefinition(
+                        "test",
+                        null,
+                        EntryFilterTest.class.getName()
+                ));
+        FieldUtils.writeField(getPulsarService().getBrokerService(),
+                "entryFilterProvider", testEntryFilterProvider, true);
+        final EntryFilters entryFilter = new EntryFilters("test");
+        superUserAdmin.topicPolicies().removeEntryFiltersPerTopic(topic);
+        // test tenant manager
+        tenantManagerAdmin.topicPolicies().removeEntryFiltersPerTopic(topic);
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().removeEntryFiltersPerTopic(topic));
+
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> 
subAdmin.topicPolicies().removeEntryFiltersPerTopic(topic));
+            superUserAdmin.topics().revokePermissions(topic, subject);
+        }
+        deleteTopic(topic, false);
+        FieldUtils.writeField(getPulsarService().getBrokerService(),
+                "entryFilterProvider", oldEntryFilterProvider, true);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testShadowTopic() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+
+        String shadowTopic = topic + "-shadow-topic";
+        superUserAdmin.topics().createShadowTopic(shadowTopic, topic);
+        superUserAdmin.topics().setShadowTopics(topic, 
Lists.newArrayList(shadowTopic));
+        superUserAdmin.topics().getShadowTopics(topic);
+        superUserAdmin.topics().removeShadowTopics(topic);
+
+
+        // test tenant manager
+        tenantManagerAdmin.topics().setShadowTopics(topic, 
Lists.newArrayList(shadowTopic));
+        tenantManagerAdmin.topics().getShadowTopics(topic);
+        tenantManagerAdmin.topics().removeShadowTopics(topic);
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().setShadowTopics(topic, 
Lists.newArrayList(shadowTopic)));
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().getShadowTopics(topic));
+
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> subAdmin.topics().setShadowTopics(topic, 
Lists.newArrayList(shadowTopic)));
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> subAdmin.topics().getShadowTopics(topic));
+            superUserAdmin.topics().revokePermissions(topic, subject);
+        }
+        deleteTopic(topic, false);
+    }
+
     private void createTopic(String topic, boolean partitioned) throws 
Exception {
         if (partitioned) {
             superUserAdmin.topics().createPartitionedTopic(topic, 2);

Reply via email to