This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 8afa570 [Authorization] Support UNSUBSCRIBE namespace op after enable
auth (#12742)
8afa570 is described below
commit 8afa570f1cdf4a4acfa34de330e855f819c2ae63
Author: Ruguo Yu <[email protected]>
AuthorDate: Fri Nov 12 10:29:56 2021 +0800
[Authorization] Support UNSUBSCRIBE namespace op after enable auth (#12742)
### Motivation
Currently, we can `unsubscribe` the given subscription on all topics on a
namespace through `bin/pulsar-admin namespaces unsubscribe -s sub tn1/ns1`.
However, role(not super-user or administrator) with `consume` auth action for
namespace cannot perform `unsubscribe` operation when enable auth.
The root of the problem is that `PulsarAuthorizationProvider` lacks support
for namespace operation `UNSUBSCRIBE` when verifying the role's authorization,
code as below:
https://github.com/apache/pulsar/blob/8cae63557a318240e95697f382b4f61c22b70d64/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java#L1667-L1669
https://github.com/apache/pulsar/blob/8cae63557a318240e95697f382b4f61c22b70d64/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java#L522-L536
The purpose of this PR is to support that role with `consume` namespace
authorization could `unsubscribe` subscriptions on a namespace.
(cherry picked from commit 8926631db9c9a78341726b53ca119ad4c69720e8)
---
.../authorization/PulsarAuthorizationProvider.java | 1 +
.../api/AuthorizationProducerConsumerTest.java | 42 +++-
.../auth/admin/GetMetadataOfTopicWithAuthTest.java | 213 ---------------------
3 files changed, 42 insertions(+), 214 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 641591c..d7d4531 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -531,6 +531,7 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
isAuthorizedFuture =
allowTheSpecifiedActionOpsAsync(namespaceName, role, authData,
AuthAction.packages);
break;
case GET_TOPICS:
+ case UNSUBSCRIBE:
isAuthorizedFuture = allowConsumeOpsAsync(namespaceName, role,
authData);
break;
default:
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 0a799fb..0987a333 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.client.api;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Maps;
@@ -27,6 +29,7 @@ import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -52,6 +55,7 @@ import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TenantOperation;
@@ -175,6 +179,7 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
final String tenantRole = "tenant-role";
final String subscriptionRole = "sub1-role";
final String subscriptionName = "sub1";
+ final String subscriptionName2 = "sub2";
final String namespace = "my-property/my-ns-sub-auth";
final String topicName = "persistent://" + namespace + "/my-topic";
Authentication adminAuthentication = new
ClientAuthentication("superUser");
@@ -202,7 +207,18 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
superAdmin.tenants().createTenant("my-property",
new TenantInfoImpl(Sets.newHashSet(tenantRole),
Sets.newHashSet("test")));
superAdmin.namespaces().createNamespace(namespace,
Sets.newHashSet("test"));
- tenantAdmin.namespaces().grantPermissionOnNamespace(namespace,
subscriptionRole,
+
+ // subscriptionRole doesn't have topic-level authorization, so it will
fail to get topic stats-internal info
+ try {
+ sub1Admin.topics().getInternalStats(topicName, true);
+ fail("should have failed with authorization exception");
+ } catch (Exception e) {
+ assertTrue(e.getMessage().startsWith(
+ "Unauthorized to validateTopicOperation for operation
[GET_STATS]"));
+ }
+
+ // grant topic consume authorization to the subscriptionRole
+ tenantAdmin.topics().grantPermission(topicName, subscriptionRole,
Collections.singleton(AuthAction.consume));
replacePulsarClient(PulsarClient.builder()
@@ -212,7 +228,17 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
// (1) Create subscription name
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
+ Consumer<byte[]> consumer2 =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName2)
+ .subscribe();
consumer.close();
+ consumer2.close();
+
+ List<String> subscriptions =
sub1Admin.topics().getSubscriptions(topicName);
+ assertEquals(subscriptions.size(), 2);
+
+ // now, subscriptionRole have consume authorization on topic, so it
will successfully get topic internal stats
+ PersistentTopicInternalStats internalStats =
superAdmin.topics().getInternalStats(topicName, true);
+ assertNotNull(internalStats);
// verify tenant is able to perform all subscription-admin api
tenantAdmin.topics().skipAllMessages(topicName, subscriptionName);
@@ -228,10 +254,24 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
tenantAdmin.topics().resetCursor(topicName, subscriptionName, 10);
tenantAdmin.topics().resetCursor(topicName, subscriptionName,
MessageId.earliest);
+ // subscriptionRole doesn't have namespace-level authorization, so it
will fail to unsubscribe namespace
+ try {
+ sub1Admin.namespaces().unsubscribeNamespace(namespace,
subscriptionName2);
+ fail("should have failed with authorization exception");
+ } catch (Exception e) {
+ assertTrue(e.getMessage().startsWith(
+ "Unauthorized to validateNamespaceOperation for operation
[UNSUBSCRIBE]"));
+ }
+
// grant namespace-level authorization to the subscriptionRole
tenantAdmin.namespaces().grantPermissionOnNamespace(namespace,
subscriptionRole,
Collections.singleton(AuthAction.consume));
+ // now, subscriptionRole have consume authorization on namespace, so
it will successfully unsubscribe namespace
+ superAdmin.namespaces().unsubscribeNamespaceBundle(namespace,
"0x00000000_0xffffffff", subscriptionName2);
+ subscriptions = sub1Admin.topics().getSubscriptions(topicName);
+ assertEquals(subscriptions.size(), 1);
+
// subscriptionRole has namespace-level authorization
sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetMetadataOfTopicWithAuthTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetMetadataOfTopicWithAuthTest.java
deleted file mode 100644
index 7578629..0000000
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetMetadataOfTopicWithAuthTest.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.auth.admin;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.fail;
-import com.google.common.io.Files;
-import java.io.File;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import lombok.Cleanup;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.impl.auth.AuthenticationToken;
-import org.apache.pulsar.common.policies.data.AuthAction;
-import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
-import org.apache.pulsar.tests.TestRetrySupport;
-import org.apache.pulsar.tests.integration.containers.PulsarContainer;
-import org.apache.pulsar.tests.integration.containers.ZKContainer;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
-import org.apache.pulsar.tests.integration.utils.DockerUtils;
-import org.elasticsearch.common.collect.Set;
-import org.testcontainers.containers.Network;
-import org.testcontainers.shaded.org.apache.commons.lang.RandomStringUtils;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-/**
- * GetMetadataOfTopicWithAuthTest will test Getmetadata operation with and
without the proper permission.
- */
-@Slf4j
-public class GetMetadataOfTopicWithAuthTest extends TestRetrySupport {
-
- private static final String CLUSTER_PREFIX = "get-metadata-auth";
- private static final String PRIVATE_KEY_PATH_INSIDE_CONTAINER =
"/tmp/private.key";
- private static final String PUBLIC_KEY_PATH_INSIDE_CONTAINER =
"/tmp/public.key";
-
- private static final String SUPER_USER_ROLE = "super-user";
- private String superUserAuthToken;
- private static final String PROXY_ROLE = "proxy";
- private String proxyAuthToken;
- private static final String REGULAR_USER_ROLE = "client";
- private String clientAuthToken;
- private File publicKeyFile;
-
- private PulsarCluster pulsarCluster;
- private PulsarContainer cmdContainer;
-
- @Override
- @BeforeClass(alwaysRun = true)
- protected void setup() throws Exception {
- incrementSetupNumber();
- // Before starting the cluster, generate the secret key and the token
- // Use Zk container to have 1 container available before starting the
cluster
- final String clusterName = String.format("%s-%s", CLUSTER_PREFIX,
RandomStringUtils.randomAlphabetic(6));
- final String cliContainerName = String.format("%s-%s", "cli",
RandomStringUtils.randomAlphabetic(6));
- cmdContainer = new ZKContainer<>(cliContainerName);
- cmdContainer
- .withNetwork(Network.newNetwork())
- .withNetworkAliases(ZKContainer.NAME)
- .withEnv("zkServers", ZKContainer.NAME);
- cmdContainer.start();
-
- createKeysAndTokens(cmdContainer);
-
- PulsarClusterSpec spec = PulsarClusterSpec.builder()
- .numBookies(2)
- .numBrokers(2)
- .numProxies(1)
- .clusterName(clusterName)
- .brokerEnvs(getBrokerSettingsEnvs())
- .proxyEnvs(getProxySettingsEnvs())
-
.brokerMountFiles(Collections.singletonMap(publicKeyFile.toString(),
PUBLIC_KEY_PATH_INSIDE_CONTAINER))
-
.proxyMountFiles(Collections.singletonMap(publicKeyFile.toString(),
PUBLIC_KEY_PATH_INSIDE_CONTAINER))
- .build();
-
- pulsarCluster = PulsarCluster.forSpec(spec);
- pulsarCluster.start();
- }
-
- @Override
- @AfterClass(alwaysRun = true)
- public void cleanup() {
- markCurrentSetupNumberCleaned();
- if (cmdContainer != null) {
- cmdContainer.stop();
- }
- if (pulsarCluster != null) {
- pulsarCluster.stop();
- }
- }
-
- private Map<String, String> getBrokerSettingsEnvs() {
- Map<String, String> envs = new HashMap<>();
- envs.put("authenticationEnabled", "true");
- envs.put("authenticationProviders",
AuthenticationProviderToken.class.getName());
- envs.put("authorizationEnabled", "true");
- envs.put("superUserRoles", String.format("%s,%s", SUPER_USER_ROLE,
PROXY_ROLE));
- envs.put("brokerClientAuthenticationPlugin",
AuthenticationToken.class.getName());
- envs.put("brokerClientAuthenticationParameters",
String.format("token:%s", superUserAuthToken));
- envs.put("authenticationRefreshCheckSeconds", "1");
- envs.put("authenticateOriginalAuthData", "true");
- envs.put("tokenPublicKey", "file://" +
PUBLIC_KEY_PATH_INSIDE_CONTAINER);
- return envs;
- }
-
- private Map<String, String> getProxySettingsEnvs() {
- Map<String, String> envs = new HashMap<>();
- envs.put("authenticationEnabled", "true");
- envs.put("authenticationProviders",
AuthenticationProviderToken.class.getName());
- envs.put("authorizationEnabled", "true");
- envs.put("brokerClientAuthenticationPlugin",
AuthenticationToken.class.getName());
- envs.put("brokerClientAuthenticationParameters",
String.format("token:%s", proxyAuthToken));
- envs.put("authenticationRefreshCheckSeconds", "1");
- envs.put("forwardAuthorizationCredentials", "true");
- envs.put("tokenPublicKey", "file://" +
PUBLIC_KEY_PATH_INSIDE_CONTAINER);
- return envs;
- }
-
- protected void createKeysAndTokens(PulsarContainer container) throws
Exception {
- container
- .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens",
"create-key-pair",
- "--output-private-key",
PRIVATE_KEY_PATH_INSIDE_CONTAINER,
- "--output-public-key",
PUBLIC_KEY_PATH_INSIDE_CONTAINER);
-
- byte[] publicKeyBytes = DockerUtils
- .runCommandWithRawOutput(container.getDockerClient(),
container.getContainerId(),
- "/bin/cat", PUBLIC_KEY_PATH_INSIDE_CONTAINER)
- .getStdout();
-
- publicKeyFile = File.createTempFile("public-", ".key", new
File("/tmp"));
- Files.write(publicKeyBytes, publicKeyFile);
-
- clientAuthToken = container
- .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens",
"create",
- "--private-key", "file://" +
PRIVATE_KEY_PATH_INSIDE_CONTAINER,
- "--subject", REGULAR_USER_ROLE)
- .getStdout().trim();
- log.info("Created client token: {}", clientAuthToken);
-
- superUserAuthToken = container
- .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens",
"create",
- "--private-key", "file://" +
PRIVATE_KEY_PATH_INSIDE_CONTAINER,
- "--subject", SUPER_USER_ROLE)
- .getStdout().trim();
- log.info("Created super-user token: {}", superUserAuthToken);
-
- proxyAuthToken = container
- .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens",
"create",
- "--private-key", "file://" +
PRIVATE_KEY_PATH_INSIDE_CONTAINER,
- "--subject", PROXY_ROLE)
- .getStdout().trim();
- log.info("Created proxy token: {}", proxyAuthToken);
- }
-
- @Test
- public void testGetMetadataOfTopicWithLookupPermission() throws Exception {
- @Cleanup
- PulsarAdmin superUserAdmin = PulsarAdmin.builder()
- .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
-
.authentication(AuthenticationFactory.token(superUserAuthToken))
- .build();
-
- @Cleanup
- PulsarAdmin clientAdmin = PulsarAdmin.builder()
- .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
- .authentication(AuthenticationFactory.token(clientAuthToken))
- .build();
-
- // create partitioned topic
- superUserAdmin.topics().createPartitionedTopic("public/default/test",
1);
-
- // do some operation without grant any permissions
- try {
-
clientAdmin.topics().getInternalStats("public/default/test-partition-0", true);
- fail("get internal stats and metadata operation should fail
because the client hasn't permission to do");
- } catch (PulsarAdminException e) {
- assertEquals(e.getStatusCode(), 401);
- }
-
- // grant consume/produce permission to the role
- superUserAdmin.topics().grantPermission("public/default/test",
- REGULAR_USER_ROLE, Set.of(AuthAction.consume));
-
- // then do some get internal stats and metadata operations again, it
should success
- PersistentTopicInternalStats internalStats = clientAdmin.topics()
- .getInternalStats("public/default/test-partition-0", true);
- assertNotNull(internalStats);
- }
-}