This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 8afa570  [Authorization] Support UNSUBSCRIBE namespace op after enable 
auth (#12742)
8afa570 is described below

commit 8afa570f1cdf4a4acfa34de330e855f819c2ae63
Author: Ruguo Yu <[email protected]>
AuthorDate: Fri Nov 12 10:29:56 2021 +0800

    [Authorization] Support UNSUBSCRIBE namespace op after enable auth (#12742)
    
    ### Motivation
    Currently, we can `unsubscribe` the given subscription on all topics on a 
namespace through `bin/pulsar-admin namespaces unsubscribe -s sub tn1/ns1`. 
However, role(not super-user or administrator) with `consume` auth action for 
namespace cannot perform `unsubscribe` operation when enable auth.
    
    The root of the problem is that `PulsarAuthorizationProvider` lacks support 
for namespace operation `UNSUBSCRIBE` when verifying the role's authorization, 
code as below:
    
https://github.com/apache/pulsar/blob/8cae63557a318240e95697f382b4f61c22b70d64/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java#L1667-L1669
    
https://github.com/apache/pulsar/blob/8cae63557a318240e95697f382b4f61c22b70d64/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java#L522-L536
    
    The purpose of this PR is to support that role with `consume` namespace 
authorization could `unsubscribe` subscriptions on a namespace.
    
    (cherry picked from commit 8926631db9c9a78341726b53ca119ad4c69720e8)
---
 .../authorization/PulsarAuthorizationProvider.java |   1 +
 .../api/AuthorizationProducerConsumerTest.java     |  42 +++-
 .../auth/admin/GetMetadataOfTopicWithAuthTest.java | 213 ---------------------
 3 files changed, 42 insertions(+), 214 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 641591c..d7d4531 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -531,6 +531,7 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
                 isAuthorizedFuture = 
allowTheSpecifiedActionOpsAsync(namespaceName, role, authData, 
AuthAction.packages);
                 break;
             case GET_TOPICS:
+            case UNSUBSCRIBE:
                 isAuthorizedFuture = allowConsumeOpsAsync(namespaceName, role, 
authData);
                 break;
             default:
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 0a799fb..0987a333 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.client.api;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Maps;
@@ -27,6 +29,7 @@ import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -52,6 +55,7 @@ import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TenantOperation;
@@ -175,6 +179,7 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
         final String tenantRole = "tenant-role";
         final String subscriptionRole = "sub1-role";
         final String subscriptionName = "sub1";
+        final String subscriptionName2 = "sub2";
         final String namespace = "my-property/my-ns-sub-auth";
         final String topicName = "persistent://" + namespace + "/my-topic";
         Authentication adminAuthentication = new 
ClientAuthentication("superUser");
@@ -202,7 +207,18 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
         superAdmin.tenants().createTenant("my-property",
                 new TenantInfoImpl(Sets.newHashSet(tenantRole), 
Sets.newHashSet("test")));
         superAdmin.namespaces().createNamespace(namespace, 
Sets.newHashSet("test"));
-        tenantAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subscriptionRole,
+
+        // subscriptionRole doesn't have topic-level authorization, so it will 
fail to get topic stats-internal info
+        try {
+            sub1Admin.topics().getInternalStats(topicName, true);
+            fail("should have failed with authorization exception");
+        } catch (Exception e) {
+            assertTrue(e.getMessage().startsWith(
+                    "Unauthorized to validateTopicOperation for operation 
[GET_STATS]"));
+        }
+
+        // grant topic consume authorization to the subscriptionRole
+        tenantAdmin.topics().grantPermission(topicName, subscriptionRole,
                 Collections.singleton(AuthAction.consume));
 
         replacePulsarClient(PulsarClient.builder()
@@ -212,7 +228,17 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
         // (1) Create subscription name
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
                 .subscribe();
+        Consumer<byte[]> consumer2 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName2)
+                .subscribe();
         consumer.close();
+        consumer2.close();
+
+        List<String> subscriptions = 
sub1Admin.topics().getSubscriptions(topicName);
+        assertEquals(subscriptions.size(), 2);
+
+        // now, subscriptionRole have consume authorization on topic, so it 
will successfully get topic internal stats
+        PersistentTopicInternalStats internalStats = 
superAdmin.topics().getInternalStats(topicName, true);
+        assertNotNull(internalStats);
 
         // verify tenant is able to perform all subscription-admin api
         tenantAdmin.topics().skipAllMessages(topicName, subscriptionName);
@@ -228,10 +254,24 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
         tenantAdmin.topics().resetCursor(topicName, subscriptionName, 10);
         tenantAdmin.topics().resetCursor(topicName, subscriptionName, 
MessageId.earliest);
 
+        // subscriptionRole doesn't have namespace-level authorization, so it 
will fail to unsubscribe namespace
+        try {
+            sub1Admin.namespaces().unsubscribeNamespace(namespace, 
subscriptionName2);
+            fail("should have failed with authorization exception");
+        } catch (Exception e) {
+            assertTrue(e.getMessage().startsWith(
+                    "Unauthorized to validateNamespaceOperation for operation 
[UNSUBSCRIBE]"));
+        }
+
         // grant namespace-level authorization to the subscriptionRole
         tenantAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subscriptionRole,
                 Collections.singleton(AuthAction.consume));
 
+        // now, subscriptionRole have consume authorization on namespace, so 
it will successfully unsubscribe namespace
+        superAdmin.namespaces().unsubscribeNamespaceBundle(namespace, 
"0x00000000_0xffffffff", subscriptionName2);
+        subscriptions = sub1Admin.topics().getSubscriptions(topicName);
+        assertEquals(subscriptions.size(), 1);
+
         // subscriptionRole has namespace-level authorization
         sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetMetadataOfTopicWithAuthTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetMetadataOfTopicWithAuthTest.java
deleted file mode 100644
index 7578629..0000000
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetMetadataOfTopicWithAuthTest.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.auth.admin;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.fail;
-import com.google.common.io.Files;
-import java.io.File;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import lombok.Cleanup;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.impl.auth.AuthenticationToken;
-import org.apache.pulsar.common.policies.data.AuthAction;
-import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
-import org.apache.pulsar.tests.TestRetrySupport;
-import org.apache.pulsar.tests.integration.containers.PulsarContainer;
-import org.apache.pulsar.tests.integration.containers.ZKContainer;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
-import org.apache.pulsar.tests.integration.utils.DockerUtils;
-import org.elasticsearch.common.collect.Set;
-import org.testcontainers.containers.Network;
-import org.testcontainers.shaded.org.apache.commons.lang.RandomStringUtils;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-/**
- * GetMetadataOfTopicWithAuthTest will test Getmetadata operation with and 
without the proper permission.
- */
-@Slf4j
-public class GetMetadataOfTopicWithAuthTest extends TestRetrySupport {
-
-    private static final String CLUSTER_PREFIX = "get-metadata-auth";
-    private static final String PRIVATE_KEY_PATH_INSIDE_CONTAINER = 
"/tmp/private.key";
-    private static final String PUBLIC_KEY_PATH_INSIDE_CONTAINER = 
"/tmp/public.key";
-
-    private static final String SUPER_USER_ROLE = "super-user";
-    private String superUserAuthToken;
-    private static final String PROXY_ROLE = "proxy";
-    private String proxyAuthToken;
-    private static final String REGULAR_USER_ROLE = "client";
-    private String clientAuthToken;
-    private File publicKeyFile;
-
-    private PulsarCluster pulsarCluster;
-    private PulsarContainer cmdContainer;
-
-    @Override
-    @BeforeClass(alwaysRun = true)
-    protected void setup() throws Exception {
-        incrementSetupNumber();
-        // Before starting the cluster, generate the secret key and the token
-        // Use Zk container to have 1 container available before starting the 
cluster
-        final String clusterName = String.format("%s-%s", CLUSTER_PREFIX, 
RandomStringUtils.randomAlphabetic(6));
-        final String cliContainerName = String.format("%s-%s", "cli", 
RandomStringUtils.randomAlphabetic(6));
-        cmdContainer = new ZKContainer<>(cliContainerName);
-        cmdContainer
-                .withNetwork(Network.newNetwork())
-                .withNetworkAliases(ZKContainer.NAME)
-                .withEnv("zkServers", ZKContainer.NAME);
-        cmdContainer.start();
-
-        createKeysAndTokens(cmdContainer);
-
-        PulsarClusterSpec spec = PulsarClusterSpec.builder()
-                .numBookies(2)
-                .numBrokers(2)
-                .numProxies(1)
-                .clusterName(clusterName)
-                .brokerEnvs(getBrokerSettingsEnvs())
-                .proxyEnvs(getProxySettingsEnvs())
-                
.brokerMountFiles(Collections.singletonMap(publicKeyFile.toString(), 
PUBLIC_KEY_PATH_INSIDE_CONTAINER))
-                
.proxyMountFiles(Collections.singletonMap(publicKeyFile.toString(), 
PUBLIC_KEY_PATH_INSIDE_CONTAINER))
-                .build();
-
-        pulsarCluster = PulsarCluster.forSpec(spec);
-        pulsarCluster.start();
-    }
-
-    @Override
-    @AfterClass(alwaysRun = true)
-    public void cleanup() {
-        markCurrentSetupNumberCleaned();
-        if (cmdContainer != null) {
-            cmdContainer.stop();
-        }
-        if (pulsarCluster != null) {
-            pulsarCluster.stop();
-        }
-    }
-
-    private Map<String, String> getBrokerSettingsEnvs() {
-        Map<String, String> envs = new HashMap<>();
-        envs.put("authenticationEnabled", "true");
-        envs.put("authenticationProviders", 
AuthenticationProviderToken.class.getName());
-        envs.put("authorizationEnabled", "true");
-        envs.put("superUserRoles", String.format("%s,%s", SUPER_USER_ROLE, 
PROXY_ROLE));
-        envs.put("brokerClientAuthenticationPlugin", 
AuthenticationToken.class.getName());
-        envs.put("brokerClientAuthenticationParameters", 
String.format("token:%s", superUserAuthToken));
-        envs.put("authenticationRefreshCheckSeconds", "1");
-        envs.put("authenticateOriginalAuthData", "true");
-        envs.put("tokenPublicKey", "file://" + 
PUBLIC_KEY_PATH_INSIDE_CONTAINER);
-        return envs;
-    }
-
-    private Map<String, String> getProxySettingsEnvs() {
-        Map<String, String> envs = new HashMap<>();
-        envs.put("authenticationEnabled", "true");
-        envs.put("authenticationProviders", 
AuthenticationProviderToken.class.getName());
-        envs.put("authorizationEnabled", "true");
-        envs.put("brokerClientAuthenticationPlugin", 
AuthenticationToken.class.getName());
-        envs.put("brokerClientAuthenticationParameters", 
String.format("token:%s", proxyAuthToken));
-        envs.put("authenticationRefreshCheckSeconds", "1");
-        envs.put("forwardAuthorizationCredentials", "true");
-        envs.put("tokenPublicKey", "file://" + 
PUBLIC_KEY_PATH_INSIDE_CONTAINER);
-        return envs;
-    }
-
-    protected void createKeysAndTokens(PulsarContainer container) throws 
Exception {
-        container
-                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", 
"create-key-pair",
-                        "--output-private-key", 
PRIVATE_KEY_PATH_INSIDE_CONTAINER,
-                        "--output-public-key", 
PUBLIC_KEY_PATH_INSIDE_CONTAINER);
-
-        byte[] publicKeyBytes = DockerUtils
-                .runCommandWithRawOutput(container.getDockerClient(), 
container.getContainerId(),
-                        "/bin/cat", PUBLIC_KEY_PATH_INSIDE_CONTAINER)
-                .getStdout();
-
-        publicKeyFile = File.createTempFile("public-", ".key", new 
File("/tmp"));
-        Files.write(publicKeyBytes, publicKeyFile);
-
-        clientAuthToken = container
-                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", 
"create",
-                        "--private-key", "file://" + 
PRIVATE_KEY_PATH_INSIDE_CONTAINER,
-                        "--subject", REGULAR_USER_ROLE)
-                .getStdout().trim();
-        log.info("Created client token: {}", clientAuthToken);
-
-        superUserAuthToken = container
-                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", 
"create",
-                        "--private-key", "file://" + 
PRIVATE_KEY_PATH_INSIDE_CONTAINER,
-                        "--subject", SUPER_USER_ROLE)
-                .getStdout().trim();
-        log.info("Created super-user token: {}", superUserAuthToken);
-
-        proxyAuthToken = container
-                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", 
"create",
-                        "--private-key", "file://" + 
PRIVATE_KEY_PATH_INSIDE_CONTAINER,
-                        "--subject", PROXY_ROLE)
-                .getStdout().trim();
-        log.info("Created proxy token: {}", proxyAuthToken);
-    }
-
-    @Test
-    public void testGetMetadataOfTopicWithLookupPermission() throws Exception {
-        @Cleanup
-        PulsarAdmin superUserAdmin = PulsarAdmin.builder()
-                .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
-                
.authentication(AuthenticationFactory.token(superUserAuthToken))
-                .build();
-
-        @Cleanup
-        PulsarAdmin clientAdmin = PulsarAdmin.builder()
-                .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
-                .authentication(AuthenticationFactory.token(clientAuthToken))
-                .build();
-
-        // create partitioned topic
-        superUserAdmin.topics().createPartitionedTopic("public/default/test", 
1);
-
-        // do some operation without grant any permissions
-        try {
-            
clientAdmin.topics().getInternalStats("public/default/test-partition-0", true);
-            fail("get internal stats and metadata operation should fail 
because the client hasn't permission to do");
-        } catch (PulsarAdminException e) {
-            assertEquals(e.getStatusCode(), 401);
-        }
-
-        // grant consume/produce permission to the role
-        superUserAdmin.topics().grantPermission("public/default/test",
-                REGULAR_USER_ROLE, Set.of(AuthAction.consume));
-
-        // then do some get internal stats and metadata operations again, it 
should success
-        PersistentTopicInternalStats internalStats = clientAdmin.topics()
-                .getInternalStats("public/default/test-partition-0", true);
-        assertNotNull(internalStats);
-    }
-}

Reply via email to