This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ef57c2ac44fe9b76ef5aaea308f0f7b2c9d0ab47
Author: Omar Yasin <[email protected]>
AuthorDate: Wed Dec 11 04:57:47 2024 -0800

    [improve][admin] Opt-out of topic-existence check (#23709)
    
    Co-authored-by: Ă“mar Yasin <[email protected]>
    Co-authored-by: Lari Hotari <[email protected]>
    (cherry picked from commit f571aa1e6c247bea54d7a10ed508991fee1ea71b)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 ++++
 .../broker/admin/impl/PersistentTopicsBase.java    | 39 +++++++++++++++++-----
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 20 +++++++++++
 3 files changed, 57 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 9b0827426a2..12aba309a01 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3255,6 +3255,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private int transactionPendingAckBatchedWriteMaxDelayInMillis = 1;
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Opt-out of topic-existence check when setting permissions"
+    )
+    private boolean allowAclChangesOnNonExistentTopics = false;
+
     /**** --- KeyStore TLS config variables. --- ****/
     @FieldContext(
             category = CATEGORY_KEYSTORE_TLS,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index abd9a37aaf7..62335cebc4a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -218,8 +218,16 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected CompletableFuture<Map<String, Set<AuthAction>>> 
internalGetPermissionsOnTopic() {
         // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
-        return validateAdminAccessForTenantAsync(namespaceName.getTenant())
-                .thenCompose(__ -> internalCheckTopicExists(topicName))
+        CompletableFuture<Void> validateAccessForTenantCf =
+                validateAdminAccessForTenantAsync(namespaceName.getTenant());
+
+        var checkIfTopicExists = 
!pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
+        if (checkIfTopicExists) {
+            validateAccessForTenantCf = validateAccessForTenantCf
+                    .thenCompose(__ -> internalCheckTopicExists(topicName));
+        }
+
+        return validateAccessForTenantCf
                 .thenCompose(__ -> 
namespaceResources().getPoliciesAsync(namespaceName)
             .thenApply(policies -> {
                 if (!policies.isPresent()) {
@@ -299,9 +307,16 @@ public class PersistentTopicsBase extends AdminResource {
     protected void internalGrantPermissionsOnTopic(final AsyncResponse 
asyncResponse, String role,
                                                    Set<AuthAction> actions) {
         // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
-        validateAdminAccessForTenantAsync(namespaceName.getTenant())
-                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
-                .thenCompose(__ -> internalCheckTopicExists(topicName))
+        CompletableFuture<Void> validateAccessForTenantCf = 
validateAdminAccessForTenantAsync(namespaceName.getTenant())
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync());
+
+        var checkIfTopicExists = 
!pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
+        if (checkIfTopicExists) {
+            validateAccessForTenantCf = validateAccessForTenantCf
+                    .thenCompose(__ -> internalCheckTopicExists(topicName));
+        }
+
+        validateAccessForTenantCf
                 .thenCompose(unused1 -> grantPermissionsAsync(topicName, role, 
actions))
                 .thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()))
                 .exceptionally(ex -> {
@@ -348,9 +363,17 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void internalRevokePermissionsOnTopic(AsyncResponse 
asyncResponse, String role) {
         // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
-        validateAdminAccessForTenantAsync(namespaceName.getTenant())
-                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
-                .thenCompose(__ -> internalCheckTopicExists(topicName))
+        CompletableFuture<Void> validateAccessForTenantCf =
+                validateAdminAccessForTenantAsync(namespaceName.getTenant())
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync());
+
+        var checkIfTopicExists = 
!pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
+        if (checkIfTopicExists) {
+            validateAccessForTenantCf = validateAccessForTenantCf
+                    .thenCompose(__ -> internalCheckTopicExists(topicName));
+        }
+
+        validateAccessForTenantCf
                 .thenCompose(unused1 -> 
getPartitionedTopicMetadataAsync(topicName, true, false)
                 .thenCompose(metadata -> {
                     int numPartitions = metadata.partitions;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 697ce784eed..54a34f002d7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -3623,4 +3624,23 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         assertThrows(NotFoundException.class, () -> 
admin.topics().grantPermission(topic, subject, Set.of(AuthAction.produce)));
         assertThrows(NotFoundException.class, () -> 
admin.topics().revokePermissions(topic, subject));
     }
+
+    @Test
+    @SneakyThrows
+    public void testPermissionsAllowAclChangesOnNonExistentTopics() {
+        pulsar.getConfiguration().setAllowAclChangesOnNonExistentTopics(true);
+        try {
+            String namespace = "prop-xyz/ns1/";
+            final String random = UUID.randomUUID().toString();
+            final String topic = "persistent://" + namespace + random;
+            final String subject = UUID.randomUUID().toString();
+            admin.topics().grantPermission(topic, subject, 
Set.of(AuthAction.produce));
+            
assertThat(admin.topics().getPermissions(topic).get(subject)).containsExactly(AuthAction.produce);
+            admin.topics().revokePermissions(topic, subject);
+            
assertThat(admin.topics().getPermissions(topic).get(subject)).isNullOrEmpty();
+        } finally {
+            // reset config
+            
pulsar.getConfiguration().setAllowAclChangesOnNonExistentTopics(false);
+        }
+    }
 }

Reply via email to