This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4e0e772d21a714656b5861ccbdc6a0909521f757
Author: Rui Fu <[email protected]>
AuthorDate: Fri Nov 5 09:30:57 2021 +0800

    allow consume permission to do GetTopics op (#12600)
    
    Fixes #12423
    
    ### Motivation
    Regex subscription requires to get the topics list of given namespace with 
GetTopicsOfNamespace request, but this request requires tenant admin permission 
which will block the regex consumers who only have consume permission.
    
    ### Modifications
    This PR added the consume permission check for GetTopicsOfNamespace, which 
allows consumers get the topics list with consume permission.
    
    (cherry picked from commit 7e078aad5cb0b07f5e0d609025cf13b934cf28eb)
---
 .../authorization/AuthorizationProvider.java       |  10 +
 .../MultiRolesTokenAuthorizationProvider.java      |   5 +
 .../authorization/PulsarAuthorizationProvider.java |   8 +
 .../broker/auth/MockAuthorizationProvider.java     |   6 +
 .../api/AuthorizationProducerConsumerTest.java     |   5 +
 .../impl/PatternTopicsConsumerImplAuthTest.java    |   5 +
 .../admin/GetTopicsOfNamespaceWithAuthTest.java    | 208 +++++++++++++++++++++
 7 files changed, 247 insertions(+)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 443a2a8..6e930a4 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -164,6 +164,16 @@ public interface AuthorizationProvider extends Closeable {
                                                  AuthenticationDataSource 
authenticationData);
 
     /**
+     * Allow consume operations with in this namespace
+     * @param namespaceName The namespace that the consume operations can be 
executed in
+     * @param role The role to check
+     * @param authenticationData authentication data related to the role
+     * @return a boolean to determine whether authorized or not
+     */
+    CompletableFuture<Boolean> allowConsumeOpsAsync(NamespaceName 
namespaceName, String role,
+                                                 AuthenticationDataSource 
authenticationData);
+
+    /**
      *
      * Grant authorization-action permission on a namespace to the given client
      *
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
index 23fb7bc..0f6c2cb 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
@@ -210,6 +210,11 @@ public class MultiRolesTokenAuthorizationProvider extends 
PulsarAuthorizationPro
     }
 
     @Override
+    public CompletableFuture<Boolean> allowConsumeOpsAsync(NamespaceName 
namespaceName, String role, AuthenticationDataSource authenticationData) {
+        return authorize(authenticationData, r -> 
super.allowConsumeOpsAsync(namespaceName, r, authenticationData));
+    }
+
+    @Override
     public CompletableFuture<Boolean> allowTenantOperationAsync(String 
tenantName,
                                                                 String role,
                                                                 
TenantOperation operation,
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 60e02fc..e339987 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -235,6 +235,11 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
         return allowTheSpecifiedActionOpsAsync(namespaceName, role, 
authenticationData, AuthAction.sinks);
     }
 
+    @Override
+    public CompletableFuture<Boolean> allowConsumeOpsAsync(NamespaceName 
namespaceName, String role, AuthenticationDataSource authenticationData) {
+        return allowTheSpecifiedActionOpsAsync(namespaceName, role, 
authenticationData, AuthAction.consume);
+    }
+
     private CompletableFuture<Boolean> 
allowTheSpecifiedActionOpsAsync(NamespaceName namespaceName, String role,
                                                                        
AuthenticationDataSource authenticationData,
                                                                        
AuthAction authAction) {
@@ -540,6 +545,9 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
             case PACKAGES:
                 isAuthorizedFuture = 
allowTheSpecifiedActionOpsAsync(namespaceName, role, authData, 
AuthAction.packages);
                 break;
+            case GET_TOPICS:
+                isAuthorizedFuture = allowConsumeOpsAsync(namespaceName, role, 
authData);
+                break;
             default:
                 isAuthorizedFuture = CompletableFuture.completedFuture(false);
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java
index 421da4f..ae9480b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java
@@ -105,6 +105,12 @@ public class MockAuthorizationProvider implements 
AuthorizationProvider {
     }
 
     @Override
+    public CompletableFuture<Boolean> allowConsumeOpsAsync(NamespaceName 
namespaceName, String role,
+                                                    AuthenticationDataSource 
authenticationData) {
+        return roleAuthorizedAsync(role);
+    }
+
+    @Override
     public CompletableFuture<Void> grantPermissionAsync(NamespaceName 
namespace, Set<AuthAction> actions, String role,
                                                         String authDataJson) {
         return CompletableFuture.completedFuture(null);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index e346086..f94f6b7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -498,6 +498,11 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
         }
 
         @Override
+        public CompletableFuture<Boolean> allowConsumeOpsAsync(NamespaceName 
namespaceName, String role, AuthenticationDataSource authenticationData) {
+            return null;
+        }
+
+        @Override
         public CompletableFuture<Void> grantPermissionAsync(NamespaceName 
namespace, Set<AuthAction> actions,
                 String role, String authenticationData) {
             return CompletableFuture.completedFuture(null);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
index af47c79..adb427d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
@@ -297,6 +297,11 @@ public class PatternTopicsConsumerImplAuthTest extends 
ProducerConsumerBase {
         }
 
         @Override
+        public CompletableFuture<Boolean> allowConsumeOpsAsync(NamespaceName 
namespaceName, String role, AuthenticationDataSource authenticationData) {
+            return null;
+        }
+
+        @Override
         public CompletableFuture<Void> grantPermissionAsync(NamespaceName 
namespace, Set<AuthAction> actions,
                                                             String role, 
String authenticationData) {
             return CompletableFuture.completedFuture(null);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetTopicsOfNamespaceWithAuthTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetTopicsOfNamespaceWithAuthTest.java
new file mode 100644
index 0000000..68de70d
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetTopicsOfNamespaceWithAuthTest.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.auth.admin;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+import com.google.common.io.Files;
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.tests.TestRetrySupport;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.containers.ZKContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.integration.utils.DockerUtils;
+import org.elasticsearch.common.collect.Set;
+import org.testcontainers.containers.Network;
+import org.testcontainers.shaded.org.apache.commons.lang.RandomStringUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * GetTopicsOfNamespaceWithAuthTest will test GetTopics operation with and 
without the proper permission.
+ */
+@Slf4j
+public class GetTopicsOfNamespaceWithAuthTest extends TestRetrySupport {
+
+    private static final String CLUSTER_PREFIX = "get-topics-auth";
+    private static final String PRIVATE_KEY_PATH_INSIDE_CONTAINER = 
"/tmp/private.key";
+    private static final String PUBLIC_KEY_PATH_INSIDE_CONTAINER = 
"/tmp/public.key";
+
+    private static final String SUPER_USER_ROLE = "super-user";
+    private String superUserAuthToken;
+    private static final String PROXY_ROLE = "proxy";
+    private String proxyAuthToken;
+    private static final String REGULAR_USER_ROLE = "client";
+    private String clientAuthToken;
+    private File publicKeyFile;
+
+    private PulsarCluster pulsarCluster;
+    private PulsarContainer cmdContainer;
+
+    @Override
+    @BeforeClass(alwaysRun = true)
+    protected void setup() throws Exception {
+        incrementSetupNumber();
+        // Before starting the cluster, generate the secret key and the token
+        // Use Zk container to have 1 container available before starting the 
cluster
+        final String clusterName = String.format("%s-%s", CLUSTER_PREFIX, 
RandomStringUtils.randomAlphabetic(6));
+        final String cliContainerName = String.format("%s-%s", "cli", 
RandomStringUtils.randomAlphabetic(6));
+        cmdContainer = new ZKContainer<>(cliContainerName);
+        cmdContainer
+                .withNetwork(Network.newNetwork())
+                .withNetworkAliases(ZKContainer.NAME)
+                .withEnv("zkServers", ZKContainer.NAME);
+        cmdContainer.start();
+
+        createKeysAndTokens(cmdContainer);
+
+        PulsarClusterSpec spec = PulsarClusterSpec.builder()
+                .numBookies(2)
+                .numBrokers(2)
+                .numProxies(1)
+                .clusterName(clusterName)
+                .brokerEnvs(getBrokerSettingsEnvs())
+                .proxyEnvs(getProxySettingsEnvs())
+                
.brokerMountFiles(Collections.singletonMap(publicKeyFile.toString(), 
PUBLIC_KEY_PATH_INSIDE_CONTAINER))
+                
.proxyMountFiles(Collections.singletonMap(publicKeyFile.toString(), 
PUBLIC_KEY_PATH_INSIDE_CONTAINER))
+                .build();
+
+        pulsarCluster = PulsarCluster.forSpec(spec);
+        pulsarCluster.start();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true)
+    public void cleanup() {
+        markCurrentSetupNumberCleaned();
+        if (cmdContainer != null) {
+            cmdContainer.stop();
+        }
+        if (pulsarCluster != null) {
+            pulsarCluster.stop();
+        }
+    }
+
+    private Map<String, String> getBrokerSettingsEnvs() {
+        Map<String, String> envs = new HashMap<>();
+        envs.put("authenticationEnabled", "true");
+        envs.put("authenticationProviders", 
AuthenticationProviderToken.class.getName());
+        envs.put("authorizationEnabled", "true");
+        envs.put("superUserRoles", String.format("%s,%s", SUPER_USER_ROLE, 
PROXY_ROLE));
+        envs.put("brokerClientAuthenticationPlugin", 
AuthenticationToken.class.getName());
+        envs.put("brokerClientAuthenticationParameters", 
String.format("token:%s", superUserAuthToken));
+        envs.put("authenticationRefreshCheckSeconds", "1");
+        envs.put("authenticateOriginalAuthData", "true");
+        envs.put("tokenPublicKey", "file://" + 
PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+        return envs;
+    }
+
+    private Map<String, String> getProxySettingsEnvs() {
+        Map<String, String> envs = new HashMap<>();
+        envs.put("authenticationEnabled", "true");
+        envs.put("authenticationProviders", 
AuthenticationProviderToken.class.getName());
+        envs.put("authorizationEnabled", "true");
+        envs.put("brokerClientAuthenticationPlugin", 
AuthenticationToken.class.getName());
+        envs.put("brokerClientAuthenticationParameters", 
String.format("token:%s", proxyAuthToken));
+        envs.put("authenticationRefreshCheckSeconds", "1");
+        envs.put("forwardAuthorizationCredentials", "true");
+        envs.put("tokenPublicKey", "file://" + 
PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+        return envs;
+    }
+
+    protected void createKeysAndTokens(PulsarContainer container) throws 
Exception {
+        container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", 
"create-key-pair",
+                        "--output-private-key", 
PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                        "--output-public-key", 
PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+
+        byte[] publicKeyBytes = DockerUtils
+                .runCommandWithRawOutput(container.getDockerClient(), 
container.getContainerId(),
+                        "/bin/cat", PUBLIC_KEY_PATH_INSIDE_CONTAINER)
+                .getStdout();
+
+        publicKeyFile = File.createTempFile("public-", ".key", new 
File("/tmp"));
+        Files.write(publicKeyBytes, publicKeyFile);
+
+        clientAuthToken = container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", 
"create",
+                        "--private-key", "file://" + 
PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                        "--subject", REGULAR_USER_ROLE)
+                .getStdout().trim();
+        log.info("Created client token: {}", clientAuthToken);
+
+        superUserAuthToken = container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", 
"create",
+                        "--private-key", "file://" + 
PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                        "--subject", SUPER_USER_ROLE)
+                .getStdout().trim();
+        log.info("Created super-user token: {}", superUserAuthToken);
+
+        proxyAuthToken = container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", 
"create",
+                        "--private-key", "file://" + 
PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                        "--subject", PROXY_ROLE)
+                .getStdout().trim();
+        log.info("Created proxy token: {}", proxyAuthToken);
+    }
+
+    @Test
+    public void testGetTopicsOfNamespaceOpsWithConsumePermission() throws 
Exception {
+        @Cleanup
+        PulsarAdmin superUserAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+                
.authentication(AuthenticationFactory.token(superUserAuthToken))
+                .build();
+
+        @Cleanup
+        PulsarAdmin clientAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+                .authentication(AuthenticationFactory.token(clientAuthToken))
+                .build();
+
+        // do some operation without grant any permissions
+        try {
+            clientAdmin.namespaces().getTopics("public/default");
+            fail("list topics operation should fail because the client hasn't 
permission to do");
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(), 401);
+        }
+
+        // grant consume permission to the role
+        
superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+                REGULAR_USER_ROLE, Set.of(AuthAction.consume));
+
+        // then do some get topics operations again, it should success
+        List<String> topics = 
clientAdmin.namespaces().getTopics("public/default");
+        assertEquals(topics.size(), 0);
+    }
+}

Reply via email to