This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e96b3398912 [improve][broker] Support get/remove permissions for
AuthorizationProvider (#20496)
e96b3398912 is described below
commit e96b3398912163eb6e0528c10aed3507c95952fd
Author: Jiwei Guo <[email protected]>
AuthorDate: Mon Jul 10 14:50:09 2023 +0800
[improve][broker] Support get/remove permissions for AuthorizationProvider
(#20496)
---
.../authorization/AuthorizationProvider.java | 45 +++++++
.../broker/authorization/AuthorizationService.java | 17 +++
.../authorization/PulsarAuthorizationProvider.java | 129 +++++++++++++++++++++
.../broker/admin/impl/PersistentTopicsBase.java | 49 +-------
.../apache/pulsar/broker/admin/v1/Namespaces.java | 8 +-
.../apache/pulsar/broker/admin/v2/Namespaces.java | 8 +-
.../pulsar/broker/service/BrokerService.java | 23 +---
.../src/main/resources/findbugsExclude.xml | 5 +
8 files changed, 212 insertions(+), 72 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index b54b2089e1e..9ea49fee45c 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.authorization;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -428,4 +429,48 @@ public interface AuthorizationProvider extends Closeable {
throw new RestException(e.getCause());
}
}
+
+ /**
+ * Remove authorization-action permissions on a topic.
+ * @param topicName
+ * @return CompletableFuture<Void>
+ */
+ default CompletableFuture<Void> removePermissionsAsync(TopicName
topicName) {
+ return FutureUtil.failedFuture(new IllegalStateException(
+ String.format("removePermissionsAsync on topicName %s is not
supported by the Authorization",
+ topicName)));
+ }
+
+ /**
+ * Get authorization-action permissions on a topic.
+ * @param topicName
+ * @return CompletableFuture<Map<String, Set<AuthAction>>>
+ */
+ default CompletableFuture<Map<String, Set<AuthAction>>>
getPermissionsAsync(TopicName topicName) {
+ return FutureUtil.failedFuture(new IllegalStateException(
+ String.format("getPermissionsAsync on topicName %s is not
supported by the Authorization",
+ topicName)));
+ }
+
+ /**
+ * Get authorization-action permissions on a topic.
+ * @param namespaceName
+ * @return CompletableFuture<Map<String, Set<String>>>
+ */
+ default CompletableFuture<Map<String, Set<String>>>
getSubscriptionPermissionsAsync(NamespaceName namespaceName) {
+ return FutureUtil.failedFuture(new IllegalStateException(
+ String.format("getSubscriptionPermissionsAsync on namespace %s
is not supported by the Authorization",
+ namespaceName)));
+ }
+
+ /**
+ * Get authorization-action permissions on a namespace.
+ * @param namespaceName
+ * @return CompletableFuture<Map<String, Set<AuthAction>>>
+ */
+ default CompletableFuture<Map<String, Set<AuthAction>>>
getPermissionsAsync(NamespaceName namespaceName) {
+ return FutureUtil.failedFuture(new IllegalStateException(
+ String.format("getPermissionsAsync on namespaceName %s is not
supported by the Authorization",
+ namespaceName)));
+ }
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 29abcc1eee4..c121d93b9b7 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.authorization;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.net.SocketAddress;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -815,4 +816,20 @@ public class AuthorizationService {
throw new RestException(e.getCause());
}
}
+
+ public CompletableFuture<Void> removePermissionsAsync(TopicName topicName)
{
+ return provider.removePermissionsAsync(topicName);
+ }
+
+ public CompletableFuture<Map<String, Set<AuthAction>>>
getPermissionsAsync(TopicName topicName) {
+ return provider.getPermissionsAsync(topicName);
+ }
+
+ public CompletableFuture<Map<String, Set<AuthAction>>>
getPermissionsAsync(NamespaceName namespaceName) {
+ return provider.getPermissionsAsync(namespaceName);
+ }
+
+ public CompletableFuture<Map<String, Set<String>>>
getSubscriptionPermissionsAsync(NamespaceName namespaceName) {
+ return provider.getSubscriptionPermissionsAsync(namespaceName);
+ }
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index c96e683d8e8..ece22fe223b 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.authorization;
import static java.util.Objects.requireNonNull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@@ -34,6 +35,7 @@ import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AuthPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
@@ -641,4 +643,131 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
});
}
+ @Override
+ public CompletableFuture<Void> removePermissionsAsync(TopicName topicName)
{
+ return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
+ if (readonly) {
+ if (log.isDebugEnabled()) {
+ log.debug("Policies are read-only. Broker cannot do
read-write operations");
+ }
+ throw new IllegalStateException("policies are in readonly
mode");
+ }
+ return
pulsarResources.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject())
+ .thenCompose(policies -> {
+ if (!policies.isPresent()
+ ||
!policies.get().auth_policies.getTopicAuthentication()
+ .containsKey(topicName.toString())) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return pulsarResources.getNamespaceResources().
+
setPoliciesAsync(topicName.getNamespaceObject(), policies2 -> {
+
policies2.auth_policies.getTopicAuthentication().remove(topicName.toString());
+ return policies2;
+ }).whenComplete((__, ex) -> {
+ if (ex != null) {
+ log.error("Failed to remove permissions on
topic {}", topicName, ex);
+ } else {
+ log.info("Successfully remove permissions
on topic {}", topicName);
+ }
+ });
+ });
+ });
+ }
+
+ @Override
+ public CompletableFuture<Map<String, Set<AuthAction>>>
getPermissionsAsync(TopicName topicName) {
+ return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
+ if (readonly) {
+ if (log.isDebugEnabled()) {
+ log.debug("Policies are read-only. Broker cannot do
read-write operations");
+ }
+ throw new IllegalStateException("policies are in readonly
mode");
+ }
+ return
pulsarResources.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject())
+ .thenApply(policies -> {
+ if (!policies.isPresent()) {
+ throw new RestException(Response.Status.NOT_FOUND,
"Namespace does not exist");
+ }
+ Map<String, Set<AuthAction>> permissions = new
HashMap<>();
+ String topicUri = topicName.toString();
+ AuthPolicies auth = policies.get().auth_policies;
+ // First add namespace level permissions
+ permissions.putAll(auth.getNamespaceAuthentication());
+ // Then add topic level permissions
+ if
(auth.getTopicAuthentication().containsKey(topicUri)) {
+ for (Map.Entry<String, Set<AuthAction>> entry :
+
auth.getTopicAuthentication().get(topicUri).entrySet()) {
+ String role = entry.getKey();
+ Set<AuthAction> topicPermissions =
entry.getValue();
+
+ if (!permissions.containsKey(role)) {
+ permissions.put(role, topicPermissions);
+ } else {
+ // Do the union between namespace and
topic level
+ Set<AuthAction> union =
Sets.union(permissions.get(role), topicPermissions);
+ permissions.put(role, union);
+ }
+ }
+ }
+ return permissions;
+ }).whenComplete((__, ex) -> {
+ if (ex != null) {
+ log.error("Failed to get permissions on topic {}",
topicName, ex);
+ } else {
+ log.info("Successfully get permissions on topic
{}", topicName);
+ }
+ });
+ });
+ }
+
+ @Override
+ public CompletableFuture<Map<String, Set<String>>>
getSubscriptionPermissionsAsync(NamespaceName namespaceName) {
+ return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
+ if (readonly) {
+ if (log.isDebugEnabled()) {
+ log.debug("Policies are read-only. Broker cannot do
read-write operations");
+ }
+ throw new IllegalStateException("policies are in readonly
mode");
+ }
+ return
pulsarResources.getNamespaceResources().getPoliciesAsync(namespaceName)
+ .thenApply(policies -> {
+ if (!policies.isPresent()) {
+ throw new RestException(Response.Status.NOT_FOUND,
"Namespace does not exist");
+ }
+
+ return
policies.get().auth_policies.getSubscriptionAuthentication();
+ }).whenComplete((__, ex) -> {
+ if (ex != null) {
+ log.error("Failed to get subscription permissions
on namespace {}", namespaceName, ex);
+ } else {
+ log.info("Successfully get subscription
permissions on namespaceName {}", namespaceName);
+ }
+ });
+ });
+ }
+
+ @Override
+ public CompletableFuture<Map<String, Set<AuthAction>>>
getPermissionsAsync(NamespaceName namespaceName) {
+ return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
+ if (readonly) {
+ if (log.isDebugEnabled()) {
+ log.debug("Policies are read-only. Broker cannot do
read-write operations");
+ }
+ throw new IllegalStateException("policies are in readonly
mode");
+ }
+ return
pulsarResources.getNamespaceResources().getPoliciesAsync(namespaceName)
+ .thenApply(policies -> {
+ if (!policies.isPresent()) {
+ throw new RestException(Response.Status.NOT_FOUND,
"Namespace does not exist");
+ }
+ return
policies.get().auth_policies.getNamespaceAuthentication();
+ }).whenComplete((__, ex) -> {
+ if (ex != null) {
+ log.error("Failed to get permissions on
namespaceName {}", namespaceName, ex);
+ } else {
+ log.info("Successfully get permissions on
namespaceName {}", namespaceName);
+ }
+ });
+ });
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9878cc1592e..a57722be4e1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -113,7 +113,6 @@ import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
-import org.apache.pulsar.common.policies.data.AuthPolicies;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
@@ -218,36 +217,7 @@ public class PersistentTopicsBase extends AdminResource {
protected CompletableFuture<Map<String, Set<AuthAction>>>
internalGetPermissionsOnTopic() {
// This operation should be reading from zookeeper and it should be
allowed without having admin privileges
return validateAdminAccessForTenantAsync(namespaceName.getTenant())
- .thenCompose(__ ->
namespaceResources().getPoliciesAsync(namespaceName)
- .thenApply(policies -> {
- if (!policies.isPresent()) {
- throw new RestException(Status.NOT_FOUND, "Namespace does
not exist");
- }
-
- Map<String, Set<AuthAction>> permissions = new HashMap<>();
- String topicUri = topicName.toString();
- AuthPolicies auth = policies.get().auth_policies;
- // First add namespace level permissions
- permissions.putAll(auth.getNamespaceAuthentication());
-
- // Then add topic level permissions
- if (auth.getTopicAuthentication().containsKey(topicUri)) {
- for (Map.Entry<String, Set<AuthAction>> entry :
-
auth.getTopicAuthentication().get(topicUri).entrySet()) {
- String role = entry.getKey();
- Set<AuthAction> topicPermissions = entry.getValue();
-
- if (!permissions.containsKey(role)) {
- permissions.put(role, topicPermissions);
- } else {
- // Do the union between namespace and topic level
- Set<AuthAction> union =
Sets.union(permissions.get(role), topicPermissions);
- permissions.put(role, union);
- }
- }
- }
- return permissions;
- }));
+ .thenCompose(__ ->
getAuthorizationService().getPermissionsAsync(topicName));
}
protected void validateCreateTopic(TopicName topicName) {
@@ -746,7 +716,7 @@ public class PersistentTopicsBase extends AdminResource {
if (numPartitions < 1) {
return CompletableFuture.completedFuture(null);
}
- return
internalRemovePartitionsAuthenticationPoliciesAsync(numPartitions)
+ return
internalRemovePartitionsAuthenticationPoliciesAsync()
.thenCompose(unused ->
internalRemovePartitionsTopicAsync(numPartitions, force));
})
// Only tries to delete the znode for partitioned topic when
all its partitions are successfully deleted
@@ -788,10 +758,10 @@ public class PersistentTopicsBase extends AdminResource {
private CompletableFuture<Void> internalRemovePartitionsTopicAsync(int
numPartitions, boolean force) {
return
pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.runWithMarkDeleteAsync(topicName,
- () ->
internalRemovePartitionsTopicNoAutocreationDisableAsync(numPartitions, force));
+ () ->
internalRemovePartitionsTopicNoAutoCreationDisableAsync(numPartitions, force));
}
- private CompletableFuture<Void>
internalRemovePartitionsTopicNoAutocreationDisableAsync(int numPartitions,
+ private CompletableFuture<Void>
internalRemovePartitionsTopicNoAutoCreationDisableAsync(int numPartitions,
boolean force) {
return FutureUtil.waitForAll(IntStream.range(0, numPartitions)
.mapToObj(i -> {
@@ -833,16 +803,9 @@ public class PersistentTopicsBase extends AdminResource {
}).collect(Collectors.toList()));
}
- private CompletableFuture<Void>
internalRemovePartitionsAuthenticationPoliciesAsync(int numPartitions) {
+ private CompletableFuture<Void>
internalRemovePartitionsAuthenticationPoliciesAsync() {
CompletableFuture<Void> future = new CompletableFuture<>();
- pulsar().getPulsarResources().getNamespaceResources()
- .setPoliciesAsync(topicName.getNamespaceObject(), p -> {
- IntStream.range(0, numPartitions)
- .forEach(i ->
p.auth_policies.getTopicAuthentication()
-
.remove(topicName.getPartition(i).toString()));
-
p.auth_policies.getTopicAuthentication().remove(topicName.toString());
- return p;
- })
+ getAuthorizationService().removePermissionsAsync(topicName)
.whenComplete((r, ex) -> {
if (ex != null){
Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 153e29506c3..234d7725113 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -293,8 +293,8 @@ public class Namespaces extends NamespacesBase {
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
validateNamespaceOperationAsync(NamespaceName.get(property,
namespace), NamespaceOperation.GET_PERMISSION)
- .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
- .thenAccept(policies ->
response.resume(policies.auth_policies.getNamespaceAuthentication()))
+ .thenCompose(__ ->
getAuthorizationService().getPermissionsAsync(namespaceName))
+ .thenAccept(permissions -> response.resume(permissions))
.exceptionally(ex -> {
log.error("Failed to get permissions for namespace {}",
namespaceName, ex);
resumeAsyncResponseExceptionally(response, ex);
@@ -314,8 +314,8 @@ public class Namespaces extends NamespacesBase {
@PathParam("namespace") String
namespace) {
validateNamespaceName(property, cluster, namespace);
validateNamespaceOperationAsync(NamespaceName.get(property,
namespace), NamespaceOperation.GET_PERMISSION)
- .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
- .thenAccept(policies ->
response.resume(policies.auth_policies.getSubscriptionAuthentication()))
+ .thenCompose(__ ->
getAuthorizationService().getSubscriptionPermissionsAsync(namespaceName))
+ .thenAccept(permissions -> response.resume(permissions))
.exceptionally(ex -> {
log.error("[{}] Failed to get permissions on subscription
for namespace {}: {} ",
clientAppId(), namespaceName,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index b4f1194f92f..dfa040baec5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -240,8 +240,8 @@ public class Namespaces extends NamespacesBase {
@PathParam("namespace")
String namespace) {
validateNamespaceName(tenant, namespace);
validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace),
NamespaceOperation.GET_PERMISSION)
- .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
- .thenAccept(policies ->
response.resume(policies.auth_policies.getNamespaceAuthentication()))
+ .thenCompose(__ ->
getAuthorizationService().getPermissionsAsync(namespaceName))
+ .thenAccept(permissions -> response.resume(permissions))
.exceptionally(ex -> {
log.error("Failed to get permissions for namespace {}",
namespaceName, ex);
resumeAsyncResponseExceptionally(response, ex);
@@ -260,8 +260,8 @@ public class Namespaces extends NamespacesBase {
@PathParam("namespace") String
namespace) {
validateNamespaceName(tenant, namespace);
validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace),
NamespaceOperation.GET_PERMISSION)
- .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
- .thenAccept(policies ->
response.resume(policies.auth_policies.getSubscriptionAuthentication()))
+ .thenCompose(__ ->
getAuthorizationService().getSubscriptionPermissionsAsync(namespaceName))
+ .thenAccept(permissions -> response.resume(permissions))
.exceptionally(ex -> {
log.error("[{}] Failed to get permissions on subscription
for namespace {}: {} ", clientAppId(),
namespaceName, ex.getCause().getMessage(), ex);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 6e3f129ef22..fb9329c5bae 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1186,23 +1186,9 @@ public class BrokerService implements Closeable {
future.completeExceptionally(new MetadataStoreException("The
number of retries has exhausted"));
return;
}
- NamespaceName namespaceName =
TopicName.get(topic).getNamespaceObject();
// Check whether there are auth policies for the topic
-
pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespaceName).thenAccept(optPolicies
-> {
- if (!optPolicies.isPresent() ||
!optPolicies.get().auth_policies.getTopicAuthentication()
- .containsKey(topic)) {
- // if there is no auth policy for the topic, just complete and
return
- if (log.isDebugEnabled()) {
- log.debug("Authentication policies not found for topic
{}", topic);
- }
- future.complete(null);
- return;
- }
- pulsar.getPulsarResources().getNamespaceResources()
-
.setPoliciesAsync(TopicName.get(topic).getNamespaceObject(), p -> {
- p.auth_policies.getTopicAuthentication().remove(topic);
- return p;
- }).thenAccept(v -> {
+ authorizationService.removePermissionsAsync(TopicName.get(topic))
+ .thenAccept(v -> {
log.info("Successfully delete authentication policies
for topic {}", topic);
future.complete(null);
}).exceptionally(ex1 -> {
@@ -1218,11 +1204,6 @@ public class BrokerService implements Closeable {
}
return null;
});
- }).exceptionally(ex -> {
- log.error("Failed to get policies for topic {}", topic, ex);
- future.completeExceptionally(ex);
- return null;
- });
}
private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String
topic) {
diff --git a/pulsar-websocket/src/main/resources/findbugsExclude.xml
b/pulsar-websocket/src/main/resources/findbugsExclude.xml
index b7f6b0bf31d..c96e63cdfcc 100644
--- a/pulsar-websocket/src/main/resources/findbugsExclude.xml
+++ b/pulsar-websocket/src/main/resources/findbugsExclude.xml
@@ -199,4 +199,9 @@
<Method name="<init>"/>
<Bug pattern="EI_EXPOSE_REP2"/>
</Match>
+ <Match>
+ <Class name="org.apache.pulsar.websocket.WebSocketService"/>
+ <Method name="getAuthorizationService"/>
+ <Bug pattern="EI_EXPOSE_REP"/>
+ </Match>
</FindBugsFilter>
\ No newline at end of file