This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dc1429dd9276bd30319012aabfb12a54ed11053f
Author: Omar Yasin <[email protected]>
AuthorDate: Wed Dec 11 04:57:47 2024 -0800

    [improve][admin] Opt-out of topic-existence check (#23709)
    
    Co-authored-by: Ă“mar Yasin <[email protected]>
    Co-authored-by: Lari Hotari <[email protected]>
    (cherry picked from commit f571aa1e6c247bea54d7a10ed508991fee1ea71b)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 ++++
 .../broker/admin/impl/PersistentTopicsBase.java    | 37 ++++++++++++++++++----
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 20 ++++++++++++
 3 files changed, 56 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index cb74396cbf5..6189ad27b5d 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3350,6 +3350,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private String compactionServiceFactoryClassName = 
"org.apache.pulsar.compaction.PulsarCompactionServiceFactory";
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Opt-out of topic-existence check when setting permissions"
+    )
+    private boolean allowAclChangesOnNonExistentTopics = false;
+
     /**** --- KeyStore TLS config variables. --- ****/
     @FieldContext(
             category = CATEGORY_KEYSTORE_TLS,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 36a8394eedb..58a86f1a40f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -205,8 +205,16 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected CompletableFuture<Map<String, Set<AuthAction>>> 
internalGetPermissionsOnTopic() {
         // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
-        return validateAdminAccessForTenantAsync(namespaceName.getTenant())
-                .thenCompose(__ -> internalCheckTopicExists(topicName))
+        CompletableFuture<Void> validateAccessForTenantCf =
+                validateAdminAccessForTenantAsync(namespaceName.getTenant());
+
+        var checkIfTopicExists = 
!pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
+        if (checkIfTopicExists) {
+            validateAccessForTenantCf = validateAccessForTenantCf
+                    .thenCompose(__ -> internalCheckTopicExists(topicName));
+        }
+
+        return validateAccessForTenantCf
                 .thenCompose(__ -> 
getAuthorizationService().getPermissionsAsync(topicName));
     }
 
@@ -257,9 +265,16 @@ public class PersistentTopicsBase extends AdminResource {
     protected void internalGrantPermissionsOnTopic(final AsyncResponse 
asyncResponse, String role,
                                                    Set<AuthAction> actions) {
         // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
-        validateAdminAccessForTenantAsync(namespaceName.getTenant())
-                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
-                .thenCompose(__ -> internalCheckTopicExists(topicName))
+        CompletableFuture<Void> validateAccessForTenantCf = 
validateAdminAccessForTenantAsync(namespaceName.getTenant())
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync());
+
+        var checkIfTopicExists = 
!pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
+        if (checkIfTopicExists) {
+            validateAccessForTenantCf = validateAccessForTenantCf
+                    .thenCompose(__ -> internalCheckTopicExists(topicName));
+        }
+
+        validateAccessForTenantCf
                 .thenCompose(unused1 -> grantPermissionsAsync(topicName, role, 
actions))
                 .thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()))
                 .exceptionally(ex -> {
@@ -272,8 +287,16 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void internalRevokePermissionsOnTopic(AsyncResponse 
asyncResponse, String role) {
         // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
-        validateAdminAccessForTenantAsync(namespaceName.getTenant())
-                .thenCompose(__ -> internalCheckTopicExists(topicName))
+        CompletableFuture<Void> validateAccessForTenantCf =
+                validateAdminAccessForTenantAsync(namespaceName.getTenant());
+
+        var checkIfTopicExists = 
!pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
+        if (checkIfTopicExists) {
+            validateAccessForTenantCf = validateAccessForTenantCf
+                    .thenCompose(__ -> internalCheckTopicExists(topicName));
+        }
+
+        validateAccessForTenantCf
                 .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
true, false)
                 .thenCompose(metadata -> {
                     int numPartitions = metadata.partitions;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 635b2c25bc1..9ebeaeb7853 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -3710,4 +3711,23 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         assertThrows(NotFoundException.class, () -> 
admin.topics().grantPermission(topic, subject, Set.of(AuthAction.produce)));
         assertThrows(NotFoundException.class, () -> 
admin.topics().revokePermissions(topic, subject));
     }
+
+    @Test
+    @SneakyThrows
+    public void testPermissionsAllowAclChangesOnNonExistentTopics() {
+        pulsar.getConfiguration().setAllowAclChangesOnNonExistentTopics(true);
+        try {
+            String namespace = "prop-xyz/ns1/";
+            final String random = UUID.randomUUID().toString();
+            final String topic = "persistent://" + namespace + random;
+            final String subject = UUID.randomUUID().toString();
+            admin.topics().grantPermission(topic, subject, 
Set.of(AuthAction.produce));
+            
assertThat(admin.topics().getPermissions(topic).get(subject)).containsExactly(AuthAction.produce);
+            admin.topics().revokePermissions(topic, subject);
+            
assertThat(admin.topics().getPermissions(topic).get(subject)).isNullOrEmpty();
+        } finally {
+            // reset config
+            
pulsar.getConfiguration().setAllowAclChangesOnNonExistentTopics(false);
+        }
+    }
 }

Reply via email to