nodece commented on code in PR #16792:
URL: https://github.com/apache/pulsar/pull/16792#discussion_r962444260
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -324,32 +342,41 @@ protected void internalGrantPermissionsOnTopic(final
AsyncResponse asyncResponse
// This operation should be reading from zookeeper and it should be
allowed without having admin privileges
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ ->
validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
- getPartitionedTopicMetadataAsync(topicName, true, false)
- .thenCompose(metadata -> {
- int numPartitions = metadata.partitions;
- CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
- if (numPartitions > 0) {
- for (int i = 0; i < numPartitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- future = future.thenCompose(unused ->
grantPermissionsAsync(topicNamePartition, role,
- actions));
- }
- }
- return future.thenCompose(unused ->
grantPermissionsAsync(topicName, role, actions))
- .thenAccept(unused ->
asyncResponse.resume(Response.noContent().build()));
- }))).exceptionally(ex -> {
+
grantPermissionsAsync(TopicName.get(topicName.getPartitionedTopicName()), role,
actions)
+ .thenAccept(unused ->
asyncResponse.resume(Response.noContent().build()))))
+ .exceptionally(ex -> {
Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to get permissions for topic {}",
clientAppId(), topicName, realCause);
resumeAsyncResponseExceptionally(asyncResponse, realCause);
return null;
});
}
- private CompletableFuture<Void> revokePermissionsAsync(String topicUri,
String role) {
+ private CompletableFuture<Void> revokePermissionsAsync(TopicName
topicName, String role, int numPartitions) {
+ String topicUri = topicName.toString();
return
namespaceResources().getPoliciesAsync(namespaceName).thenCompose(
policiesOptional -> {
Policies policies = policiesOptional.orElseThrow(() ->
new RestException(Status.NOT_FOUND, "Namespace
does not exist"));
+ CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
+ // do compatible with previous pulsar version
+ // revoke all the partition permissions granted in
previous version
+ future = future.thenComposeAsync(unused ->
+ namespaceResources().setPoliciesAsync(namespaceName, p
-> {
+ if (numPartitions > 0) {
+ for (int i = 0; i < numPartitions; i++) {
+
p.auth_policies.getTopicAuthentication().computeIfPresent(
+
topicName.getPartition(i).toString(), (k, roles) -> {
+ roles.remove(role);
+ if (roles.isEmpty()) {
+ return null;
+ }
+ return roles;
+ });
+ }
+ }
+ return p;
+ }));
if
(!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
||
!policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role))
{
Review Comment:
Fixed by #17393.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]