This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 15b655cfea3 [improve][sec] Align some namespace level policy
authorisation check (#21640)
15b655cfea3 is described below
commit 15b655cfea381c63e754bc8f066c311b097198bb
Author: Qiang Zhao <[email protected]>
AuthorDate: Mon Dec 4 22:15:19 2023 +0800
[improve][sec] Align some namespace level policy authorisation check
(#21640)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 30 +++++++++++++---------
.../apache/pulsar/broker/admin/v2/Namespaces.java | 3 ++-
2 files changed, 20 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index c5174991298..caaff010439 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1178,7 +1178,8 @@ public abstract class NamespacesBase extends
AdminResource {
protected CompletableFuture<Void> internalSetPublishRateAsync(PublishRate
maxPublishMessageRate) {
log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(),
namespaceName, maxPublishMessageRate);
- return validateSuperUserAccessAsync().thenCompose(__ ->
updatePoliciesAsync(namespaceName, policies -> {
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies
-> {
policies.publishMaxMessageRate.put(pulsar().getConfiguration().getClusterName(),
maxPublishMessageRate);
log.info("[{}] Successfully updated the publish_max_message_rate
for cluster on namespace {}",
clientAppId(), namespaceName);
@@ -1207,7 +1208,8 @@ public abstract class NamespacesBase extends
AdminResource {
protected CompletableFuture<Void> internalRemovePublishRateAsync() {
log.info("[{}] Remove namespace publish-rate {}/{}", clientAppId(),
namespaceName, topicName);
- return validateSuperUserAccessAsync().thenCompose(__ ->
updatePoliciesAsync(namespaceName, policies -> {
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies
-> {
if (policies.publishMaxMessageRate != null) {
policies.publishMaxMessageRate.remove(pulsar().getConfiguration().getClusterName());
}
@@ -1227,7 +1229,8 @@ public abstract class NamespacesBase extends
AdminResource {
@SuppressWarnings("deprecation")
protected CompletableFuture<Void>
internalSetTopicDispatchRateAsync(DispatchRateImpl dispatchRate) {
log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(),
namespaceName, dispatchRate);
- return validateSuperUserAccessAsync().thenCompose(__ ->
updatePoliciesAsync(namespaceName, policies -> {
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies
-> {
policies.topicDispatchRate.put(pulsar().getConfiguration().getClusterName(),
dispatchRate);
policies.clusterDispatchRate.put(pulsar().getConfiguration().getClusterName(),
dispatchRate);
log.info("[{}] Successfully updated the dispatchRate for cluster
on namespace {}", clientAppId(),
@@ -1237,7 +1240,8 @@ public abstract class NamespacesBase extends
AdminResource {
}
protected CompletableFuture<Void> internalDeleteTopicDispatchRateAsync() {
- return validateSuperUserAccessAsync().thenCompose(__ ->
updatePoliciesAsync(namespaceName, policies -> {
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies
-> {
policies.topicDispatchRate.remove(pulsar().getConfiguration().getClusterName());
policies.clusterDispatchRate.remove(pulsar().getConfiguration().getClusterName());
log.info("[{}] Successfully delete the dispatchRate for cluster on
namespace {}", clientAppId(),
@@ -1254,7 +1258,7 @@ public abstract class NamespacesBase extends
AdminResource {
}
protected CompletableFuture<Void>
internalSetSubscriptionDispatchRateAsync(DispatchRateImpl dispatchRate) {
- return validateSuperUserAccessAsync()
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.RATE, PolicyOperation.WRITE)
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies
-> {
policies.subscriptionDispatchRate.put(pulsar().getConfiguration().getClusterName(),
dispatchRate);
log.info("[{}] Successfully updated the
subscriptionDispatchRate for cluster on namespace {}",
@@ -1264,7 +1268,7 @@ public abstract class NamespacesBase extends
AdminResource {
}
protected CompletableFuture<Void>
internalDeleteSubscriptionDispatchRateAsync() {
- return validateSuperUserAccessAsync()
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.RATE, PolicyOperation.WRITE)
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies
-> {
policies.subscriptionDispatchRate.remove(pulsar().getConfiguration().getClusterName());
log.info("[{}] Successfully delete the
subscriptionDispatchRate for cluster on namespace {}",
@@ -1282,7 +1286,8 @@ public abstract class NamespacesBase extends
AdminResource {
protected CompletableFuture<Void>
internalSetSubscribeRateAsync(SubscribeRate subscribeRate) {
log.info("[{}] Set namespace subscribe-rate {}/{}", clientAppId(),
namespaceName, subscribeRate);
- return validateSuperUserAccessAsync().thenCompose(__ ->
updatePoliciesAsync(namespaceName, policies -> {
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies
-> {
policies.clusterSubscribeRate.put(pulsar().getConfiguration().getClusterName(),
subscribeRate);
log.info("[{}] Successfully updated the subscribeRate for cluster
on namespace {}", clientAppId(),
namespaceName);
@@ -1291,7 +1296,8 @@ public abstract class NamespacesBase extends
AdminResource {
}
protected CompletableFuture<Void> internalDeleteSubscribeRateAsync() {
- return validateSuperUserAccessAsync().thenCompose(__ ->
updatePoliciesAsync(namespaceName, policies -> {
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies
-> {
policies.clusterSubscribeRate.remove(pulsar().getConfiguration().getClusterName());
log.info("[{}] Successfully delete the subscribeRate for cluster
on namespace {}", clientAppId(),
namespaceName);
@@ -1624,7 +1630,7 @@ public abstract class NamespacesBase extends
AdminResource {
}
protected void internalSetInactiveTopic(InactiveTopicPolicies
inactiveTopicPolicies) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName,
PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
internalSetPolicies("inactive_topic_policies", inactiveTopicPolicies);
}
@@ -2010,7 +2016,7 @@ public abstract class NamespacesBase extends
AdminResource {
}
protected void internalSetMaxSubscriptionsPerTopic(Integer
maxSubscriptionsPerTopic){
- validateSuperUserAccess();
+ validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
@@ -2518,7 +2524,7 @@ public abstract class NamespacesBase extends
AdminResource {
* Notion: don't re-use this logic.
*/
protected void internalSetReplicatorDispatchRate(AsyncResponse
asyncResponse, DispatchRateImpl dispatchRate) {
- validateSuperUserAccessAsync()
+ validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
.thenAccept(__ -> {
log.info("[{}] Set namespace replicator dispatch-rate
{}/{}",
clientAppId(), namespaceName, dispatchRate);
@@ -2563,7 +2569,7 @@ public abstract class NamespacesBase extends
AdminResource {
* Notion: don't re-use this logic.
*/
protected void internalRemoveReplicatorDispatchRate(AsyncResponse
asyncResponse) {
- validateSuperUserAccessAsync()
+ validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
.thenCompose(__ ->
namespaceResources().setPoliciesAsync(namespaceName, policies -> {
String clusterName =
pulsar().getConfiguration().getClusterName();
policies.replicatorDispatchRate.remove(clusterName);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 8c2195a9b9b..1e4ac7d9f5f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -466,7 +466,8 @@ public class Namespaces extends NamespacesBase {
@PathParam("tenant") String
tenant,
@PathParam("namespace")
String namespace) {
validateNamespaceName(tenant, namespace);
- validateAdminAccessForTenantAsync(tenant)
+ validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.SUBSCRIPTION_EXPIRATION_TIME,
+ PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies ->
asyncResponse.resume(policies.subscription_expiration_time_minutes))
.exceptionally(ex -> {