This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f571aa1e6c2 [improve][admin] Opt-out of topic-existence check (#23709)
f571aa1e6c2 is described below
commit f571aa1e6c247bea54d7a10ed508991fee1ea71b
Author: Omar Yasin <[email protected]>
AuthorDate: Wed Dec 11 04:57:47 2024 -0800
[improve][admin] Opt-out of topic-existence check (#23709)
Co-authored-by: Ómar Yasin <[email protected]>
Co-authored-by: Lari Hotari <[email protected]>
---
.../apache/pulsar/broker/ServiceConfiguration.java | 6 ++++
.../broker/admin/impl/PersistentTopicsBase.java | 37 ++++++++++++++++++----
.../apache/pulsar/broker/admin/AdminApiTest.java | 20 ++++++++++++
3 files changed, 56 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 8b5a4ef270b..0f7ae00713d 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3535,6 +3535,12 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private String compactionServiceFactoryClassName =
"org.apache.pulsar.compaction.PulsarCompactionServiceFactory";
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Opt-out of topic-existence check when setting permissions"
+ )
+ private boolean allowAclChangesOnNonExistentTopics = false;
+
/**** --- KeyStore TLS config variables. --- ****/
@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 6070093cc35..9a306f6b4ff 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -206,8 +206,16 @@ public class PersistentTopicsBase extends AdminResource {
protected CompletableFuture<Map<String, Set<AuthAction>>>
internalGetPermissionsOnTopic() {
// This operation should be reading from zookeeper and it should be
allowed without having admin privileges
- return validateAdminAccessForTenantAsync(namespaceName.getTenant())
- .thenCompose(__ -> internalCheckTopicExists(topicName))
+ CompletableFuture<Void> validateAccessForTenantCf =
+ validateAdminAccessForTenantAsync(namespaceName.getTenant());
+
+ var checkIfTopicExists =
!pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
+ if (checkIfTopicExists) {
+ validateAccessForTenantCf = validateAccessForTenantCf
+ .thenCompose(__ -> internalCheckTopicExists(topicName));
+ }
+
+ return validateAccessForTenantCf
.thenCompose(__ ->
getAuthorizationService().getPermissionsAsync(topicName));
}
@@ -258,9 +266,16 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalGrantPermissionsOnTopic(final AsyncResponse
asyncResponse, String role,
Set<AuthAction> actions) {
// This operation should be reading from zookeeper and it should be
allowed without having admin privileges
- validateAdminAccessForTenantAsync(namespaceName.getTenant())
- .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
- .thenCompose(__ -> internalCheckTopicExists(topicName))
+ CompletableFuture<Void> validateAccessForTenantCf =
validateAdminAccessForTenantAsync(namespaceName.getTenant())
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync());
+
+ var checkIfTopicExists =
!pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
+ if (checkIfTopicExists) {
+ validateAccessForTenantCf = validateAccessForTenantCf
+ .thenCompose(__ -> internalCheckTopicExists(topicName));
+ }
+
+ validateAccessForTenantCf
.thenCompose(unused1 -> grantPermissionsAsync(topicName, role,
actions))
.thenAccept(unused ->
asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -273,8 +288,16 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalRevokePermissionsOnTopic(AsyncResponse
asyncResponse, String role) {
// This operation should be reading from zookeeper and it should be
allowed without having admin privileges
- validateAdminAccessForTenantAsync(namespaceName.getTenant())
- .thenCompose(__ -> internalCheckTopicExists(topicName))
+ CompletableFuture<Void> validateAccessForTenantCf =
+ validateAdminAccessForTenantAsync(namespaceName.getTenant());
+
+ var checkIfTopicExists =
!pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
+ if (checkIfTopicExists) {
+ validateAccessForTenantCf = validateAccessForTenantCf
+ .thenCompose(__ -> internalCheckTopicExists(topicName));
+ }
+
+ validateAccessForTenantCf
.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
true, false)
.thenCompose(metadata -> {
int numPartitions = metadata.partitions;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 70c2b343ec5..cea43cc9345 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -3668,4 +3669,23 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
assertThrows(NotFoundException.class, () ->
admin.topics().grantPermission(topic, subject, Set.of(AuthAction.produce)));
assertThrows(NotFoundException.class, () ->
admin.topics().revokePermissions(topic, subject));
}
+
+ @Test
+ @SneakyThrows
+ public void testPermissionsAllowAclChangesOnNonExistentTopics() {
+ pulsar.getConfiguration().setAllowAclChangesOnNonExistentTopics(true);
+ try {
+ String namespace = "prop-xyz/ns1/";
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://" + namespace + random;
+ final String subject = UUID.randomUUID().toString();
+ admin.topics().grantPermission(topic, subject,
Set.of(AuthAction.produce));
+
assertThat(admin.topics().getPermissions(topic).get(subject)).containsExactly(AuthAction.produce);
+ admin.topics().revokePermissions(topic, subject);
+
assertThat(admin.topics().getPermissions(topic).get(subject)).isNullOrEmpty();
+ } finally {
+ // reset config
+
pulsar.getConfiguration().setAllowAclChangesOnNonExistentTopics(false);
+ }
+ }
}