This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit dc1429dd9276bd30319012aabfb12a54ed11053f Author: Omar Yasin <[email protected]> AuthorDate: Wed Dec 11 04:57:47 2024 -0800 [improve][admin] Opt-out of topic-existence check (#23709) Co-authored-by: Ómar Yasin <[email protected]> Co-authored-by: Lari Hotari <[email protected]> (cherry picked from commit f571aa1e6c247bea54d7a10ed508991fee1ea71b) --- .../apache/pulsar/broker/ServiceConfiguration.java | 6 ++++ .../broker/admin/impl/PersistentTopicsBase.java | 37 ++++++++++++++++++---- .../apache/pulsar/broker/admin/AdminApiTest.java | 20 ++++++++++++ 3 files changed, 56 insertions(+), 7 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index cb74396cbf5..6189ad27b5d 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -3350,6 +3350,12 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private String compactionServiceFactoryClassName = "org.apache.pulsar.compaction.PulsarCompactionServiceFactory"; + @FieldContext( + category = CATEGORY_SERVER, + doc = "Opt-out of topic-existence check when setting permissions" + ) + private boolean allowAclChangesOnNonExistentTopics = false; + /**** --- KeyStore TLS config variables. --- ****/ @FieldContext( category = CATEGORY_KEYSTORE_TLS, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 36a8394eedb..58a86f1a40f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -205,8 +205,16 @@ public class PersistentTopicsBase extends AdminResource { protected CompletableFuture<Map<String, Set<AuthAction>>> internalGetPermissionsOnTopic() { // This operation should be reading from zookeeper and it should be allowed without having admin privileges - return validateAdminAccessForTenantAsync(namespaceName.getTenant()) - .thenCompose(__ -> internalCheckTopicExists(topicName)) + CompletableFuture<Void> validateAccessForTenantCf = + validateAdminAccessForTenantAsync(namespaceName.getTenant()); + + var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics(); + if (checkIfTopicExists) { + validateAccessForTenantCf = validateAccessForTenantCf + .thenCompose(__ -> internalCheckTopicExists(topicName)); + } + + return validateAccessForTenantCf .thenCompose(__ -> getAuthorizationService().getPermissionsAsync(topicName)); } @@ -257,9 +265,16 @@ public class PersistentTopicsBase extends AdminResource { protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse, String role, Set<AuthAction> actions) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges - validateAdminAccessForTenantAsync(namespaceName.getTenant()) - .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) - .thenCompose(__ -> internalCheckTopicExists(topicName)) + CompletableFuture<Void> validateAccessForTenantCf = validateAdminAccessForTenantAsync(namespaceName.getTenant()) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()); + + var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics(); + if (checkIfTopicExists) { + validateAccessForTenantCf = validateAccessForTenantCf + .thenCompose(__ -> internalCheckTopicExists(topicName)); + } + + validateAccessForTenantCf .thenCompose(unused1 -> grantPermissionsAsync(topicName, role, actions)) .thenAccept(unused -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { @@ -272,8 +287,16 @@ public class PersistentTopicsBase extends AdminResource { protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges - validateAdminAccessForTenantAsync(namespaceName.getTenant()) - .thenCompose(__ -> internalCheckTopicExists(topicName)) + CompletableFuture<Void> validateAccessForTenantCf = + validateAdminAccessForTenantAsync(namespaceName.getTenant()); + + var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics(); + if (checkIfTopicExists) { + validateAccessForTenantCf = validateAccessForTenantCf + .thenCompose(__ -> internalCheckTopicExists(topicName)); + } + + validateAccessForTenantCf .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, true, false) .thenCompose(metadata -> { int numPartitions = metadata.partitions; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 635b2c25bc1..9ebeaeb7853 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -3710,4 +3711,23 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { assertThrows(NotFoundException.class, () -> admin.topics().grantPermission(topic, subject, Set.of(AuthAction.produce))); assertThrows(NotFoundException.class, () -> admin.topics().revokePermissions(topic, subject)); } + + @Test + @SneakyThrows + public void testPermissionsAllowAclChangesOnNonExistentTopics() { + pulsar.getConfiguration().setAllowAclChangesOnNonExistentTopics(true); + try { + String namespace = "prop-xyz/ns1/"; + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://" + namespace + random; + final String subject = UUID.randomUUID().toString(); + admin.topics().grantPermission(topic, subject, Set.of(AuthAction.produce)); + assertThat(admin.topics().getPermissions(topic).get(subject)).containsExactly(AuthAction.produce); + admin.topics().revokePermissions(topic, subject); + assertThat(admin.topics().getPermissions(topic).get(subject)).isNullOrEmpty(); + } finally { + // reset config + pulsar.getConfiguration().setAllowAclChangesOnNonExistentTopics(false); + } + } }
