michaeljmarshall commented on code in PR #16792:
URL: https://github.com/apache/pulsar/pull/16792#discussion_r960227112
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -324,32 +342,41 @@ protected void internalGrantPermissionsOnTopic(final
AsyncResponse asyncResponse
// This operation should be reading from zookeeper and it should be
allowed without having admin privileges
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ ->
validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
- getPartitionedTopicMetadataAsync(topicName, true, false)
- .thenCompose(metadata -> {
- int numPartitions = metadata.partitions;
- CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
- if (numPartitions > 0) {
- for (int i = 0; i < numPartitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- future = future.thenCompose(unused ->
grantPermissionsAsync(topicNamePartition, role,
- actions));
- }
- }
Review Comment:
In order to make this change backwards compatible, I think we need to leave
this block (at least in the PR that is focused on fixing the historical bug).
Otherwise, custom implementations of the `AuthorizationProvider` interface
would have the behavior broken on them, which shouldn't happen in a patch
release.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -271,6 +271,24 @@ protected CompletableFuture<Map<String, Set<AuthAction>>>
internalGetPermissions
}
}
}
+
+ // If topic is partitioned, add based topic permission
+ if (topicName.isPartitioned() &&
auth.getTopicAuthentication().containsKey(
+ topicName.getPartitionedTopicName())) {
+ for (Map.Entry<String, Set<AuthAction>> entry :
+
auth.getTopicAuthentication().get(topicName.getPartitionedTopicName()).entrySet())
{
+ String role = entry.getKey();
+ Set<AuthAction> topicPermissions = entry.getValue();
+
+ if (!permissions.containsKey(role)) {
+ permissions.put(role, topicPermissions);
+ } else {
+ // Do the union between namespace and topic level
+ Set<AuthAction> union =
Sets.union(permissions.get(role), topicPermissions);
+ permissions.put(role, union);
+ }
+ }
+ }
Review Comment:
I am guessing this is meant to grant permission based on a topic's partition
name. This code block will be insufficient. We'll need to look at the
`AuthorizationProvider` to get a complete implementation.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -324,32 +342,41 @@ protected void internalGrantPermissionsOnTopic(final
AsyncResponse asyncResponse
// This operation should be reading from zookeeper and it should be
allowed without having admin privileges
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ ->
validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
- getPartitionedTopicMetadataAsync(topicName, true, false)
- .thenCompose(metadata -> {
- int numPartitions = metadata.partitions;
- CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
- if (numPartitions > 0) {
- for (int i = 0; i < numPartitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- future = future.thenCompose(unused ->
grantPermissionsAsync(topicNamePartition, role,
- actions));
- }
- }
- return future.thenCompose(unused ->
grantPermissionsAsync(topicName, role, actions))
- .thenAccept(unused ->
asyncResponse.resume(Response.noContent().build()));
- }))).exceptionally(ex -> {
+
grantPermissionsAsync(TopicName.get(topicName.getPartitionedTopicName()), role,
actions)
Review Comment:
Instead of passing the topic name without any of the partition information.
I think we should instead grant permission to the result `topicName`, and then
we can leave it up to the `AuthorizationProvider` to determine how to handle
partitioned topics. The current method already calls `grantPermissionAsync`
with that `topicName`, so we might be able to simply deprecate passing each
partition to the `AuthorizationProvider`.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -324,32 +342,41 @@ protected void internalGrantPermissionsOnTopic(final
AsyncResponse asyncResponse
// This operation should be reading from zookeeper and it should be
allowed without having admin privileges
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ ->
validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
- getPartitionedTopicMetadataAsync(topicName, true, false)
- .thenCompose(metadata -> {
- int numPartitions = metadata.partitions;
- CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
- if (numPartitions > 0) {
- for (int i = 0; i < numPartitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- future = future.thenCompose(unused ->
grantPermissionsAsync(topicNamePartition, role,
- actions));
- }
- }
- return future.thenCompose(unused ->
grantPermissionsAsync(topicName, role, actions))
- .thenAccept(unused ->
asyncResponse.resume(Response.noContent().build()));
- }))).exceptionally(ex -> {
+
grantPermissionsAsync(TopicName.get(topicName.getPartitionedTopicName()), role,
actions)
+ .thenAccept(unused ->
asyncResponse.resume(Response.noContent().build()))))
+ .exceptionally(ex -> {
Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to get permissions for topic {}",
clientAppId(), topicName, realCause);
resumeAsyncResponseExceptionally(asyncResponse, realCause);
return null;
});
}
- private CompletableFuture<Void> revokePermissionsAsync(String topicUri,
String role) {
+ private CompletableFuture<Void> revokePermissionsAsync(TopicName
topicName, String role, int numPartitions) {
+ String topicUri = topicName.toString();
return
namespaceResources().getPoliciesAsync(namespaceName).thenCompose(
policiesOptional -> {
Policies policies = policiesOptional.orElseThrow(() ->
new RestException(Status.NOT_FOUND, "Namespace
does not exist"));
+ CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
+ // do compatible with previous pulsar version
+ // revoke all the partition permissions granted in
previous version
+ future = future.thenComposeAsync(unused ->
+ namespaceResources().setPoliciesAsync(namespaceName, p
-> {
+ if (numPartitions > 0) {
+ for (int i = 0; i < numPartitions; i++) {
+
p.auth_policies.getTopicAuthentication().computeIfPresent(
+
topicName.getPartition(i).toString(), (k, roles) -> {
+ roles.remove(role);
+ if (roles.isEmpty()) {
+ return null;
+ }
+ return roles;
+ });
+ }
+ }
+ return p;
+ }));
if
(!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
||
!policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role))
{
Review Comment:
I wonder if instead of failing here, we should just attempt to remove
permission on every partition, even if it isn't in the map? I think that would
be the expected behavior. This would also be backwards compatible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]