This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new da9a5ac [Authorization] Support CLEAR_BACKLOG namespace op after
enable auth (#12963)
da9a5ac is described below
commit da9a5acba373f5fb5dfcae3decc57e5d8d2b8846
Author: Ruguo Yu <[email protected]>
AuthorDate: Fri Nov 26 18:42:31 2021 +0800
[Authorization] Support CLEAR_BACKLOG namespace op after enable auth
(#12963)
(cherry picked from commit 64af8df83b7463d7e9231ddabc603705f15d30d6)
---
.../authorization/PulsarAuthorizationProvider.java | 1 +
.../api/AuthorizationProducerConsumerTest.java | 78 +++++++++++++++++++++-
2 files changed, 78 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index d7d4531..774aa38 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -532,6 +532,7 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
break;
case GET_TOPICS:
case UNSUBSCRIBE:
+ case CLEAR_BACKLOG:
isAuthorizedFuture = allowConsumeOpsAsync(namespaceName, role,
authData);
break;
default:
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 0987a333..d852640 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -268,7 +268,7 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
Collections.singleton(AuthAction.consume));
// now, subscriptionRole have consume authorization on namespace, so
it will successfully unsubscribe namespace
- superAdmin.namespaces().unsubscribeNamespaceBundle(namespace,
"0x00000000_0xffffffff", subscriptionName2);
+ sub1Admin.namespaces().unsubscribeNamespaceBundle(namespace,
"0x00000000_0xffffffff", subscriptionName2);
subscriptions = sub1Admin.topics().getSubscriptions(topicName);
assertEquals(subscriptions.size(), 1);
@@ -324,6 +324,82 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
}
@Test
+ public void testClearBacklogPermission() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+ setup();
+
+ final String subscriptionRole = "sub-role";
+ final String subscriptionName = "sub1";
+ final String namespace = "my-property/my-ns-sub-auth";
+ final String topicName = "persistent://" + namespace + "/my-topic";
+ Authentication adminAuthentication = new
ClientAuthentication("superUser");
+
+ clientAuthProviderSupportedRoles.add(subscriptionRole);
+
+ @Cleanup
+ PulsarAdmin superAdmin =
spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+ .authentication(adminAuthentication).build());
+
+ Authentication subAdminAuthentication = new
ClientAuthentication(subscriptionRole);
+ @Cleanup
+ PulsarAdmin sub1Admin =
spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+ .authentication(subAdminAuthentication).build());
+
+ superAdmin.clusters().createCluster("test",
+
ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+ superAdmin.tenants().createTenant("my-property",
+ new TenantInfoImpl(Sets.newHashSet(),
Sets.newHashSet("test")));
+ superAdmin.namespaces().createNamespace(namespace,
Sets.newHashSet("test"));
+ superAdmin.topics().createPartitionedTopic(topicName, 1);
+
+ // grant topic consume&produce authorization to the subscriptionRole
+ superAdmin.topics().grantPermission(topicName, subscriptionRole,
+ Sets.newHashSet(AuthAction.produce, AuthAction.consume));
+ replacePulsarClient(PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .authentication(subAdminAuthentication));
+
+ @Cleanup
+ Producer<byte[]> batchProducer =
pulsarClient.newProducer().topic(topicName)
+ .enableBatching(false)
+ .create();
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionName(subscriptionName)
+ .subscribe();
+
+ CompletableFuture<MessageId> completableFuture = new
CompletableFuture<>();
+ for (int i = 0; i < 10; i++) {
+ completableFuture = batchProducer.sendAsync("a".getBytes());
+ }
+ completableFuture.get();
+ assertEquals(sub1Admin.topics().getStats(topicName +
"-partition-0").getSubscriptions()
+ .get(subscriptionName).getMsgBacklog(), 10);
+
+ // subscriptionRole doesn't have namespace-level authorization, so it
will fail to clear backlog
+ try {
+ sub1Admin.namespaces().clearNamespaceBundleBacklog(namespace,
"0x00000000_0xffffffff");
+ fail("should have failed with authorization exception");
+ } catch (Exception e) {
+ assertTrue(e.getMessage().startsWith(
+ "Unauthorized to validateNamespaceOperation for operation
[CLEAR_BACKLOG]"));
+ }
+
+ superAdmin.namespaces().grantPermissionOnNamespace(namespace,
subscriptionRole,
+ Sets.newHashSet(AuthAction.consume));
+ // now, subscriptionRole have consume authorization on namespace, so
it will successfully clear backlog
+ sub1Admin.namespaces().clearNamespaceBundleBacklog(namespace,
"0x00000000_0xffffffff");
+ assertEquals(sub1Admin.topics().getStats(topicName +
"-partition-0").getSubscriptions()
+ .get(subscriptionName).getMsgBacklog(), 0);
+
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ @Test
public void testSubscriptionPrefixAuthorization() throws Exception {
log.info("-- Starting {} test --", methodName);