This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new dcaf508f8e3 [improve][broker] Consistently add fine-grain
authorization to REST API (#22202)
dcaf508f8e3 is described below
commit dcaf508f8e381107125e98722f4ddab76f9303ad
Author: Qiang Zhao <[email protected]>
AuthorDate: Wed Mar 6 22:00:45 2024 +0800
[improve][broker] Consistently add fine-grain authorization to REST API
(#22202)
(cherry picked from commit 68c10925df43769eee7265b4af0ac8ee4913e715)
---
.../pulsar/broker/admin/v2/PersistentTopics.java | 233 +++++++++------
.../broker/admin/TopicPoliciesAuthZTest.java | 312 ++++++++++++++++++++-
2 files changed, 462 insertions(+), 83 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 1fc46f9c872..94fb1f53ac7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -75,6 +75,7 @@ import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import
org.apache.pulsar.common.policies.data.impl.AutoSubscriptionCreationOverrideImpl;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
@@ -356,7 +357,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD,
PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetOffloadPolicies(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -379,7 +381,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Offload policies for the specified topic")
OffloadPoliciesImpl offloadPolicies) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetOffloadPolicies(offloadPolicies,
isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -401,7 +404,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetOffloadPolicies(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -425,7 +429,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED,
PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ ->
internalGetMaxUnackedMessagesOnConsumer(applied, isGlobal))
.thenApply(asyncResponse::resume).exceptionally(ex -> {
handleTopicPolicyException("getMaxUnackedMessagesOnConsumer",
ex, asyncResponse);
@@ -449,7 +454,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Max unacked messages on consumer policies for
the specified topic")
Integer maxUnackedNum) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ ->
internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -471,7 +477,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(null,
isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -494,7 +501,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName,
isGlobal))
.thenAccept(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
@@ -522,7 +530,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ ->
internalSetDeduplicationSnapshotInterval(interval, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -544,7 +553,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetDeduplicationSnapshotInterval(null,
isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -568,7 +578,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.INACTIVE_TOPIC, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetInactiveTopicPolicies(applied,
isGlobal))
.thenApply(asyncResponse::resume).exceptionally(ex -> {
handleTopicPolicyException("getInactiveTopicPolicies", ex,
asyncResponse);
@@ -591,7 +602,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "inactive topic policies for the specified
topic")
InactiveTopicPolicies inactiveTopicPolicies) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ ->
internalSetInactiveTopicPolicies(inactiveTopicPolicies, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -613,7 +625,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetInactiveTopicPolicies(null,
isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -637,7 +650,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED,
PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ ->
internalGetMaxUnackedMessagesOnSubscription(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -712,7 +726,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.DELAYED_DELIVERY, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetDelayedDeliveryPolicies(applied,
isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -1909,16 +1924,17 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- internalExamineMessageAsync(initialPosition, messagePosition,
authoritative)
- .thenAccept(asyncResponse::resume)
- .exceptionally(ex -> {
- if (isNot307And404Exception(ex)) {
- log.error("[{}] Failed to examine a specific message
on the topic {}", clientAppId(), topicName,
- ex);
- }
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
- });
+ validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES)
+ .thenCompose(__ -> internalExamineMessageAsync(initialPosition,
messagePosition, authoritative))
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (isNot307And404Exception(ex)) {
+ log.error("[{}] Failed to examine a specific message on
the topic {}", clientAppId(), topicName,
+ ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@@ -2025,7 +2041,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- internalGetBacklogAsync(authoritative)
+ validateTopicOperationAsync(topicName, TopicOperation.GET_BACKLOG_SIZE)
+ .thenCompose(__ -> internalGetBacklogAsync(authoritative))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
Throwable t = FutureUtil.unwrapCompletionException(ex);
@@ -2081,7 +2098,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.BACKLOG,
PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetBacklogQuota(applied, isGlobal))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
@@ -2164,7 +2182,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
+ "For internal use.")
@QueryParam("authoritative")
@DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION,
PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
.thenAccept(op -> {
asyncResponse.resume(op.map(TopicPolicies::getReplicationClustersSet).orElseGet(()
-> {
@@ -2246,7 +2265,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.TTL,
PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName,
isGlobal))
.thenAccept(op -> asyncResponse.resume(op
.map(TopicPolicies::getMessageTTLInSeconds)
@@ -2283,7 +2303,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.TTL,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMessageTTL(messageTTL, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -2310,7 +2331,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.TTL,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMessageTTL(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -2336,7 +2358,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION,
PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetDeduplication(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -2363,7 +2386,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "DeduplicationEnabled policies for the specified
topic")
Boolean enabled) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetDeduplication(enabled, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -2388,7 +2412,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetDeduplication(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -2494,7 +2519,6 @@ public class PersistentTopics extends
PersistentTopicsBase {
return null;
});
}
-
@GET
@Path("/{tenant}/{namespace}/{topic}/persistence")
@ApiOperation(
@@ -2515,7 +2539,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.PERSISTENCE,
PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetPersistence(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -2543,7 +2568,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Bookkeeper persistence policies for specified
topic")
PersistencePolicies
persistencePolicies) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.PERSISTENCE,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetPersistence(persistencePolicies,
isGlobal))
.thenRun(() -> {
try {
@@ -2579,7 +2605,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.PERSISTENCE,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemovePersistence(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove persistence policies:
namespace={}, topic={}",
@@ -2610,7 +2637,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetMaxSubscriptionsPerTopic(isGlobal))
.thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
: Response.noContent().build()))
@@ -2638,7 +2666,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@ApiParam(value = "The max subscriptions of the topic") int
maxSubscriptionsPerTopic) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ ->
internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully updated maxSubscriptionsPerTopic:
namespace={}, topic={}"
@@ -2668,7 +2697,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(null,
isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove maxSubscriptionsPerTopic:
namespace={}, topic={}",
@@ -2698,7 +2728,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.REPLICATION_RATE, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetReplicatorDispatchRate(applied,
isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -2725,7 +2756,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@ApiParam(value = "Replicator dispatch rate of the topic")
DispatchRateImpl dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetReplicatorDispatchRate(dispatchRate,
isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully updated replicatorDispatchRate:
namespace={}, topic={}"
@@ -2755,7 +2787,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetReplicatorDispatchRate(null,
isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove replicatorDispatchRate
limit: namespace={}, topic={}",
@@ -2785,7 +2818,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS,
PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetMaxProducers(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -2812,7 +2846,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "The max producers of the topic") int
maxProducers) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxProducers(maxProducers, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully updated max producers:
namespace={}, topic={}, maxProducers={}",
@@ -2844,7 +2879,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveMaxProducers(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove max producers:
namespace={}, topic={}",
@@ -2876,7 +2912,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS,
PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetMaxConsumers(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -2903,7 +2940,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@ApiParam(value = "The max consumers of the topic") int
maxConsumers) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxConsumers(maxConsumers, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully updated max consumers:
namespace={}, topic={}, maxConsumers={}",
@@ -2935,7 +2973,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveMaxConsumers(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove max consumers:
namespace={}, topic={}",
@@ -2966,7 +3005,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateAdminAccessForTenantAsync(topicName.getTenant())
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetMaxMessageSize(isGlobal))
.thenAccept(policies -> {
asyncResponse.resume(policies.isPresent() ? policies.get() :
Response.noContent().build());
@@ -2995,7 +3035,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@ApiParam(value = "The max message size of the topic") int
maxMessageSize) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateAdminAccessForTenantAsync(topicName.getTenant())
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxMessageSize(maxMessageSize,
isGlobal))
.thenRun(() -> {
log.info(
@@ -3029,7 +3070,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateAdminAccessForTenantAsync(topicName.getTenant())
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxMessageSize(null, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove max message size:
namespace={}, topic={}",
@@ -3337,7 +3379,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE,
PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetDispatchRate(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -3363,7 +3406,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Dispatch rate for the specified topic")
DispatchRateImpl dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetDispatchRate(dispatchRate, isGlobal))
.thenRun(() -> {
try {
@@ -3399,7 +3443,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveDispatchRate(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove topic dispatch rate:
tenant={}, namespace={}, topic={}",
@@ -3435,7 +3480,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE,
PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetSubscriptionDispatchRate(applied,
isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -3463,7 +3509,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Subscription message dispatch rate for the
specified topic")
DispatchRateImpl dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ ->
internalSetSubscriptionDispatchRate(dispatchRate, isGlobal))
.thenRun(() -> {
try {
@@ -3499,7 +3546,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ ->
internalRemoveSubscriptionDispatchRate(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove topic subscription dispatch
rate: tenant={}, namespace={}, topic={}",
@@ -3533,7 +3581,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE,
PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetSubscriptionLevelDispatchRate(
Codec.decode(encodedSubscriptionName), applied, isGlobal))
.thenApply(asyncResponse::resume)
@@ -3563,7 +3612,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Subscription message dispatch rate for the
specified topic")
DispatchRateImpl dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetSubscriptionLevelDispatchRate(
Codec.decode(encodedSubscriptionName), dispatchRate,
isGlobal))
.thenRun(() -> {
@@ -3601,7 +3651,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveSubscriptionLevelDispatchRate(
Codec.decode(encodedSubscriptionName), isGlobal))
.thenRun(() -> {
@@ -3633,7 +3684,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.COMPACTION,
PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetCompactionThreshold(applied,
isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -3659,7 +3711,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Dispatch rate for the specified topic") long
compactionThreshold) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.COMPACTION,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ ->
internalSetCompactionThreshold(compactionThreshold, isGlobal))
.thenRun(() -> {
try {
@@ -3695,7 +3748,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.COMPACTION,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveCompactionThreshold(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove topic compaction threshold:
tenant={}, namespace={}, topic={}",
@@ -3730,7 +3784,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS,
PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ ->
internalGetMaxConsumersPerSubscription(isGlobal))
.thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
: Response.noContent().build()))
@@ -3758,7 +3813,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@ApiParam(value = "Dispatch rate for the specified topic") int
maxConsumersPerSubscription) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ ->
internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription, isGlobal))
.thenRun(() -> {
try {
@@ -3794,7 +3850,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ ->
internalRemoveMaxConsumersPerSubscription(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove topic max consumers per
subscription:"
@@ -3827,7 +3884,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE,
PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetPublishRate(isGlobal))
.thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
: Response.noContent().build()))
@@ -3854,7 +3912,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@ApiParam(value = "Dispatch rate for the specified topic")
PublishRate publishRate) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetPublishRate(publishRate, isGlobal))
.thenRun(() -> {
try {
@@ -3891,7 +3950,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemovePublishRate(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove topic publish rate:
tenant={}, namespace={}, topic={}, isGlobal={}",
@@ -3928,7 +3988,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetSubscriptionTypesEnabled(isGlobal))
.thenAccept(op -> {
asyncResponse.resume(op.isPresent() ? op.get()
@@ -3958,7 +4019,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Enable sub types for the specified topic")
Set<SubscriptionType> subscriptionTypesEnabled) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ ->
internalSetSubscriptionTypesEnabled(subscriptionTypesEnabled, isGlobal))
.thenRun(() -> {
try {
@@ -3994,7 +4056,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ ->
internalRemoveSubscriptionTypesEnabled(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove subscription types
enabled: namespace={}, topic={}",
@@ -4026,7 +4089,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE,
PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetSubscribeRate(applied, isGlobal))
.thenApply(asyncResponse::resume).exceptionally(ex -> {
handleTopicPolicyException("getSubscribeRate", ex, asyncResponse);
@@ -4052,7 +4116,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@ApiParam(value = "Subscribe rate for the specified topic")
SubscribeRate subscribeRate) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetSubscribeRate(subscribeRate,
isGlobal))
.thenRun(() -> {
try {
@@ -4090,7 +4155,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@ApiParam(value = "Subscribe rate for the specified topic")
SubscribeRate subscribeRate) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveSubscribeRate(isGlobal))
.thenRun(() -> {
log.info(
@@ -4338,7 +4404,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
+ "broker. For internal
use.")
@QueryParam("authoritative")
@DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ ->
internalGetSchemaValidationEnforced(applied))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
@@ -4365,7 +4432,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("authoritative")
@DefaultValue("false") boolean authoritative,
@ApiParam(required = true) boolean
schemaValidationEnforced) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ ->
internalSetSchemaValidationEnforced(schemaValidationEnforced))
.thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -4561,7 +4629,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Settings for automatic subscription creation")
AutoSubscriptionCreationOverrideImpl
autoSubscriptionCreationOverride) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ ->
internalSetAutoSubscriptionCreation(autoSubscriptionCreationOverride, isGlobal))
.thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -4587,7 +4656,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ ->
internalGetAutoSubscriptionCreation(applied, isGlobal))
.thenApply(asyncResponse::resume).exceptionally(ex -> {
handleTopicPolicyException("getAutoSubscriptionCreation",
ex, asyncResponse);
@@ -4612,7 +4682,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetAutoSubscriptionCreation(null,
isGlobal))
.thenRun(() -> {
log.info(
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
index f07b9a6c2aa..bcb8e3233a0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin;
+import static org.awaitility.Awaitility.await;
import io.jsonwebtoken.Jwts;
import java.util.Set;
import java.util.UUID;
@@ -27,6 +28,8 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.security.MockedPulsarStandalone;
@@ -35,8 +38,6 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.awaitility.Awaitility.await;
-
public final class TopicPoliciesAuthZTest extends MockedPulsarStandalone {
@@ -172,4 +173,311 @@ public final class TopicPoliciesAuthZTest extends
MockedPulsarStandalone {
}
}
+ @SneakyThrows
+ @Test
+ public void testOffloadPolicy() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+
+ // mocked data
+ final OffloadPoliciesImpl definedOffloadPolicies = new
OffloadPoliciesImpl();
+ definedOffloadPolicies.setManagedLedgerOffloadThresholdInBytes(100L);
+ definedOffloadPolicies.setManagedLedgerOffloadThresholdInSeconds(100L);
+
definedOffloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(200L);
+ definedOffloadPolicies.setManagedLedgerOffloadDriver("s3");
+ definedOffloadPolicies.setManagedLedgerOffloadBucket("buck");
+
+ // test superuser
+ superUserAdmin.topicPolicies().setOffloadPolicies(topic,
definedOffloadPolicies);
+
+ // because the topic policies is eventual consistency, we should wait
here
+ await().untilAsserted(() -> {
+ final OffloadPolicies offloadPolicy =
superUserAdmin.topicPolicies().getOffloadPolicies(topic);
+ Assert.assertEquals(offloadPolicy, definedOffloadPolicies);
+ });
+ superUserAdmin.topicPolicies().removeOffloadPolicies(topic);
+
+ await().untilAsserted(() -> {
+ final OffloadPolicies offloadPolicy =
superUserAdmin.topicPolicies().getOffloadPolicies(topic);
+ Assert.assertNull(offloadPolicy);
+ });
+
+ // test tenant manager
+
+ tenantManagerAdmin.topicPolicies().setOffloadPolicies(topic,
definedOffloadPolicies);
+ await().untilAsserted(() -> {
+ final OffloadPolicies offloadPolicy =
tenantManagerAdmin.topicPolicies().getOffloadPolicies(topic);
+ Assert.assertEquals(offloadPolicy, definedOffloadPolicies);
+ });
+ tenantManagerAdmin.topicPolicies().removeOffloadPolicies(topic);
+ await().untilAsserted(() -> {
+ final OffloadPolicies offloadPolicy =
tenantManagerAdmin.topicPolicies().getOffloadPolicies(topic);
+ Assert.assertNull(offloadPolicy);
+ });
+
+ // test nobody
+
+ try {
+ subAdmin.topicPolicies().getOffloadPolicies(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
+ subAdmin.topicPolicies().setOffloadPolicies(topic,
definedOffloadPolicies);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+ subAdmin.topicPolicies().removeOffloadPolicies(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ // test sub user with permissions
+ for (AuthAction action : AuthAction.values()) {
+
superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+ subject, Set.of(action));
+ try {
+ subAdmin.topicPolicies().getOffloadPolicies(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
+ subAdmin.topicPolicies().setOffloadPolicies(topic,
definedOffloadPolicies);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+ subAdmin.topicPolicies().removeOffloadPolicies(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default",
subject);
+ }
+ }
+
+ @SneakyThrows
+ @Test
+ public void testMaxUnackedMessagesOnConsumer() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+
+ // mocked data
+ int definedUnackedMessagesOnConsumer = 100;
+
+ // test superuser
+ superUserAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic,
definedUnackedMessagesOnConsumer);
+
+ // because the topic policies is eventual consistency, we should wait
here
+ await().untilAsserted(() -> {
+ final int unackedMessagesOnConsumer =
superUserAdmin.topicPolicies()
+ .getMaxUnackedMessagesOnConsumer(topic);
+ Assert.assertEquals(unackedMessagesOnConsumer,
definedUnackedMessagesOnConsumer);
+ });
+
superUserAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
+
+ await().untilAsserted(() -> {
+ final Integer unackedMessagesOnConsumer =
superUserAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+ Assert.assertNull(unackedMessagesOnConsumer);
+ });
+
+ // test tenant manager
+
+
tenantManagerAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic,
definedUnackedMessagesOnConsumer);
+ await().untilAsserted(() -> {
+ final int unackedMessagesOnConsumer =
tenantManagerAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+ Assert.assertEquals(unackedMessagesOnConsumer,
definedUnackedMessagesOnConsumer);
+ });
+
tenantManagerAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
+ await().untilAsserted(() -> {
+ final Integer unackedMessagesOnConsumer =
tenantManagerAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+ Assert.assertNull(unackedMessagesOnConsumer);
+ });
+
+ // test nobody
+
+ try {
+ subAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
+ subAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic,
definedUnackedMessagesOnConsumer);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+ subAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ // test sub user with permissions
+ for (AuthAction action : AuthAction.values()) {
+
superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+ subject, Set.of(action));
+ try {
+
subAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
+
subAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic,
definedUnackedMessagesOnConsumer);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
subAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default",
subject);
+ }
+ }
+
+ @SneakyThrows
+ @Test
+ public void testMaxUnackedMessagesOnSubscription() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+
+ // mocked data
+ int definedUnackedMessagesOnConsumer = 100;
+
+ // test superuser
+
superUserAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic,
definedUnackedMessagesOnConsumer);
+
+ // because the topic policies is eventual consistency, we should wait
here
+ await().untilAsserted(() -> {
+ final int unackedMessagesOnConsumer =
superUserAdmin.topicPolicies()
+ .getMaxUnackedMessagesOnSubscription(topic);
+ Assert.assertEquals(unackedMessagesOnConsumer,
definedUnackedMessagesOnConsumer);
+ });
+
superUserAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
+
+ await().untilAsserted(() -> {
+ final Integer unackedMessagesOnConsumer =
superUserAdmin.topicPolicies()
+ .getMaxUnackedMessagesOnSubscription(topic);
+ Assert.assertNull(unackedMessagesOnConsumer);
+ });
+
+ // test tenant manager
+
+
tenantManagerAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic,
definedUnackedMessagesOnConsumer);
+ await().untilAsserted(() -> {
+ final int unackedMessagesOnConsumer =
tenantManagerAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic);
+ Assert.assertEquals(unackedMessagesOnConsumer,
definedUnackedMessagesOnConsumer);
+ });
+
tenantManagerAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
+ await().untilAsserted(() -> {
+ final Integer unackedMessagesOnConsumer =
tenantManagerAdmin.topicPolicies()
+ .getMaxUnackedMessagesOnSubscription(topic);
+ Assert.assertNull(unackedMessagesOnConsumer);
+ });
+
+ // test nobody
+
+ try {
+
subAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
+
subAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic,
definedUnackedMessagesOnConsumer);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
subAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ // test sub user with permissions
+ for (AuthAction action : AuthAction.values()) {
+
superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+ subject, Set.of(action));
+ try {
+
subAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
+
subAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic,
definedUnackedMessagesOnConsumer);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
subAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default",
subject);
+ }
+
+ }
}