This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 556e4e03a2f24ab51a4d4de55a1465ec49216bb5
Author: Jiwei Guo <[email protected]>
AuthorDate: Sat Mar 12 11:20:57 2022 +0800

    Fix wrong prompt exception when get non-persistent topic list without 
GET_BUDNLE permission (#14638)
    
    Fixes #14191
    
    ### Motivation
    We have some big issues with the permission part. We only have the 
permission with [doc](https://pulsar.apache.org/docs/en/admin-api-permissions/) 
mentioned. But if user do it according to the doc, they will face the same 
issue that #14191 described. We don't have GET_BUNDLE in the grant interface 
but given the prompt message to the user. And currently, only the admin role 
could have the permission.
    This pr is not solving the permission issue but fixing the prompt message 
first, not giving 500 error to the user. Then I open an issue 
https://github.com/apache/pulsar/issues/14639 to discuss refactoring the 
permission part.
    
    ### Modification
    
    - Return 403 to the user when permission is denied.
    
    (cherry picked from commit ca6e82422ecbc5272d3dda2c90873c5a493764e1)
---
 .../broker/admin/v1/NonPersistentTopics.java       | 25 +++++++-------
 .../broker/admin/v2/NonPersistentTopics.java       | 31 ++++++++---------
 .../pulsar/broker/auth/AuthorizationTest.java      | 39 ++++++++++++++++++++--
 3 files changed, 62 insertions(+), 33 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index c5d5870642b..e2ed1ee8cd2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -196,6 +196,7 @@ public class NonPersistentTopics extends PersistentTopics {
         Policies policies = null;
         NamespaceName nsName = null;
         try {
+            validateNamespaceName(property, cluster, namespace);
             validateNamespaceOperation(namespaceName, 
NamespaceOperation.GET_TOPICS);
             policies = getNamespacePolicies(property, cluster, namespace);
             nsName = NamespaceName.get(property, cluster, namespace);
@@ -230,22 +231,19 @@ public class NonPersistentTopics extends PersistentTopics 
{
             }
         }
 
-        final List<String> topics = Lists.newArrayList();
-        FutureUtil.waitForAll(futures).handle((result, exception) -> {
-            for (int i = 0; i < futures.size(); i++) {
-                try {
-                    if (futures.get(i).isDone() && futures.get(i).get() != 
null) {
-                        topics.addAll(futures.get(i).get());
+        FutureUtil.waitForAll(futures).whenComplete((result, ex) -> {
+            if (ex != null) {
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+            } else {
+                final List<String> topics = Lists.newArrayList();
+                for (int i = 0; i < futures.size(); i++) {
+                    List<String> topicList = futures.get(i).join();
+                    if (topicList != null) {
+                        topics.addAll(topicList);
                     }
-                } catch (InterruptedException | ExecutionException e) {
-                    log.error("[{}] Failed to get list of topics under 
namespace {}/{}/{}", clientAppId(), property,
-                            cluster, namespace, e);
-                    asyncResponse.resume(new RestException(e instanceof 
ExecutionException ? e.getCause() : e));
-                    return null;
                 }
+                asyncResponse.resume(topics);
             }
-            asyncResponse.resume(topics);
-            return null;
         });
     }
 
@@ -262,6 +260,7 @@ public class NonPersistentTopics extends PersistentTopics {
                                           @PathParam("bundle") String 
bundleRange) {
         log.info("[{}] list of topics on namespace bundle {}/{}/{}/{}", 
clientAppId(), property, cluster, namespace,
                 bundleRange);
+        validateNamespaceName(property, cluster, namespace);
         validateNamespaceOperation(namespaceName, 
NamespaceOperation.GET_BUNDLE);
         Policies policies = getNamespacePolicies(property, cluster, namespace);
         if (!cluster.equals(Constants.GLOBAL_CLUSTER)) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index d390049b405..8a0f123d2ce 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -403,26 +403,23 @@ public class NonPersistentTopics extends PersistentTopics 
{
             }
         }
 
-        final List<String> topics = Lists.newArrayList();
-        FutureUtil.waitForAll(futures).handle((result, exception) -> {
-            for (int i = 0; i < futures.size(); i++) {
-                try {
-                    if (futures.get(i).isDone() && futures.get(i).get() != 
null) {
-                        topics.addAll(futures.get(i).get());
+        FutureUtil.waitForAll(futures).whenComplete((result, ex) -> {
+            if (ex != null) {
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+            } else {
+                final List<String> topics = Lists.newArrayList();
+                for (int i = 0; i < futures.size(); i++) {
+                    List<String> topicList = futures.get(i).join();
+                    if (topicList != null) {
+                        topics.addAll(topicList);
                     }
-                } catch (InterruptedException | ExecutionException e) {
-                    log.error("[{}] Failed to get list of topics under 
namespace {}", clientAppId(), namespaceName, e);
-                    asyncResponse.resume(new RestException(e instanceof 
ExecutionException ? e.getCause() : e));
-                    return null;
                 }
+                final List<String> nonPersistentTopics =
+                        topics.stream()
+                                .filter(name -> 
!TopicName.get(name).isPersistent())
+                                .collect(Collectors.toList());
+                asyncResponse.resume(nonPersistentTopics);
             }
-
-            final List<String> nonPersistentTopics =
-                    topics.stream()
-                            .filter(name -> 
!TopicName.get(name).isPersistent())
-                            .collect(Collectors.toList());
-            asyncResponse.resume(nonPersistentTopics);
-            return null;
         });
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
index dcdc602985b..7bbbc9072f3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
@@ -18,14 +18,17 @@
  */
 package org.apache.pulsar.broker.auth;
 
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import java.util.EnumSet;
-
 import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -35,7 +38,6 @@ import 
org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-
 import com.google.common.collect.Sets;
 
 @Test(groups = "flaky")
@@ -224,6 +226,37 @@ public class AuthorizationTest extends 
MockedPulsarServiceBaseTest {
         admin.clusters().deleteCluster("c1");
     }
 
+    @Test
+    public void testGetListWithoutGetBundleOp() throws Exception {
+        String tenant = "p1";
+        String namespaceV1 = "p1/global/ns1";
+        String namespaceV2 = "p1/ns2";
+        admin.clusters().createCluster("c1", ClusterData.builder().build());
+        admin.tenants().createTenant(tenant, new 
TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1")));
+        admin.namespaces().createNamespace(namespaceV1, Sets.newHashSet("c1"));
+        admin.namespaces().grantPermissionOnNamespace(namespaceV1, 
"pass.pass2", EnumSet.of(AuthAction.produce));
+        admin.namespaces().createNamespace(namespaceV2, Sets.newHashSet("c1"));
+        admin.namespaces().grantPermissionOnNamespace(namespaceV2, 
"pass.pass2", EnumSet.of(AuthAction.produce));
+        PulsarAdmin admin2 = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != 
null
+                        ? brokerUrl.toString()
+                        : brokerUrlTls.toString())
+                .authentication(new MockAuthentication("pass.pass2"))
+                .build();
+        when(pulsar.getAdminClient()).thenReturn(admin2);
+        try {
+            admin2.topics().getList(namespaceV1, TopicDomain.non_persistent);
+        } catch (Exception ex) {
+            assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+            assertEquals(ex.getMessage(), "Unauthorized to 
validateNamespaceOperation for operation [GET_BUNDLE] on namespace 
[p1/global/ns1]");
+        }
+        try {
+            admin2.topics().getList(namespaceV2, TopicDomain.non_persistent);
+        } catch (Exception ex) {
+            assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+            assertEquals(ex.getMessage(), "Unauthorized to 
validateNamespaceOperation for operation [GET_BUNDLE] on namespace [p1/ns2]");
+        }
+    }
+
     private static void waitForChange() {
         try {
             Thread.sleep(100);

Reply via email to