This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 72e9babe4c2 [improve][broker] Add fine-grain authorization to 
retention admin API (#22163)
72e9babe4c2 is described below

commit 72e9babe4c2ba8e2d1f59b296befb538a7c483b9
Author: Qiang Zhao <[email protected]>
AuthorDate: Thu Feb 29 18:57:03 2024 +0800

    [improve][broker] Add fine-grain authorization to retention admin API 
(#22163)
---
 .../pulsar/broker/admin/v2/PersistentTopics.java   |   9 +-
 .../broker/admin/TopicPoliciesAuthZTest.java       | 175 +++++++++++++++++++++
 .../security/{tls => }/MockedPulsarStandalone.java |  60 ++++++-
 .../tls/ec/TlsWithECCertificateFileTest.java       |   2 +-
 .../security/tls/ec/TlsWithECKeyStoreTest.java     |  10 +-
 5 files changed, 246 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index f0f42a8ff4a..9094a4642a0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2435,7 +2435,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, 
PolicyOperation.READ)
+                .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetRetention(applied, isGlobal))
             .thenAccept(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -2462,7 +2463,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Retention policies for the specified topic") 
RetentionPolicies retention) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, 
PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetRetention(retention, isGlobal))
             .thenRun(() -> {
                 try {
@@ -2498,7 +2500,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, 
PolicyOperation.WRITE)
+                .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalRemoveRetention(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove retention: namespace={}, 
topic={}",
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
new file mode 100644
index 00000000000..f07b9a6c2aa
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import io.jsonwebtoken.Jwts;
+import java.util.Set;
+import java.util.UUID;
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.security.MockedPulsarStandalone;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.awaitility.Awaitility.await;
+
+
+public final class TopicPoliciesAuthZTest extends MockedPulsarStandalone {
+
+    private PulsarAdmin superUserAdmin;
+
+    private PulsarAdmin tenantManagerAdmin;
+
+    private static final String TENANT_ADMIN_SUBJECT =  
UUID.randomUUID().toString();
+    private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
+            .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
+
+    @SneakyThrows
+    @BeforeClass
+    public void before() {
+        configureTokenAuthentication();
+        configureDefaultAuthorization();
+        start();
+        this.superUserAdmin =PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
+                .build();
+        final TenantInfo tenantInfo = 
superUserAdmin.tenants().getTenantInfo("public");
+        tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT);
+        superUserAdmin.tenants().updateTenant("public", tenantInfo);
+        this.tenantManagerAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
+                .build();
+    }
+
+
+    @SneakyThrows
+    @AfterClass
+    public void after() {
+        close();
+    }
+
+
+    @SneakyThrows
+    @Test
+    public void testRetention() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        final RetentionPolicies definedRetentionPolicy = new 
RetentionPolicies(1, 1);
+        // test superuser
+        superUserAdmin.topicPolicies().setRetention(topic, 
definedRetentionPolicy);
+
+        // because the topic policies is eventual consistency, we should wait 
here
+        await().untilAsserted(() -> {
+            final RetentionPolicies receivedRetentionPolicy = 
superUserAdmin.topicPolicies().getRetention(topic);
+             Assert.assertEquals(receivedRetentionPolicy, 
definedRetentionPolicy);
+        });
+        superUserAdmin.topicPolicies().removeRetention(topic);
+
+        await().untilAsserted(() -> {
+            final RetentionPolicies retention = 
superUserAdmin.topicPolicies().getRetention(topic);
+            Assert.assertNull(retention);
+        });
+
+        // test tenant manager
+
+        tenantManagerAdmin.topicPolicies().setRetention(topic, 
definedRetentionPolicy);
+        await().untilAsserted(() -> {
+            final RetentionPolicies receivedRetentionPolicy = 
tenantManagerAdmin.topicPolicies().getRetention(topic);
+            Assert.assertEquals(receivedRetentionPolicy, 
definedRetentionPolicy);
+        });
+        tenantManagerAdmin.topicPolicies().removeRetention(topic);
+        await().untilAsserted(() -> {
+            final RetentionPolicies retention = 
tenantManagerAdmin.topicPolicies().getRetention(topic);
+            Assert.assertNull(retention);
+        });
+
+        // test nobody
+
+        try {
+            subAdmin.topicPolicies().getRetention(topic);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+        }
+
+        try {
+
+            subAdmin.topicPolicies().setRetention(topic, 
definedRetentionPolicy);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+        }
+
+        try {
+            subAdmin.topicPolicies().removeRetention(topic);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+        }
+
+        // test sub user with permissions
+        for (AuthAction action : AuthAction.values()) {
+            
superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+                    subject, Set.of(action));
+            try {
+                subAdmin.topicPolicies().getRetention(topic);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+            }
+
+            try {
+
+                subAdmin.topicPolicies().setRetention(topic, 
definedRetentionPolicy);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+            }
+
+            try {
+                subAdmin.topicPolicies().removeRetention(topic);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+            }
+            
superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", 
subject);
+        }
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/MockedPulsarStandalone.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java
similarity index 76%
rename from 
pulsar-broker/src/test/java/org/apache/pulsar/security/tls/MockedPulsarStandalone.java
rename to 
pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java
index 1a7e806f0e6..421727c0ed7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/MockedPulsarStandalone.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java
@@ -16,25 +16,36 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.security.tls;
+package org.apache.pulsar.security;
 
 import static org.apache.pulsar.utils.ResourceUtils.getAbsolutePath;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Sets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import javax.crypto.SecretKey;
 import lombok.Getter;
 import lombok.SneakyThrows;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+
 
 
 public abstract class MockedPulsarStandalone implements AutoCloseable {
@@ -60,6 +71,50 @@ public abstract class MockedPulsarStandalone implements 
AutoCloseable {
         serviceConfiguration.setExposeBundlesMetricsInPrometheus(true);
     }
 
+
+    protected static final SecretKey SECRET_KEY = 
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+    private static final String BROKER_INTERNAL_CLIENT_SUBJECT = 
"broker_internal";
+    private static final String BROKER_INTERNAL_CLIENT_TOKEN = Jwts.builder()
+            .claim("sub", 
BROKER_INTERNAL_CLIENT_SUBJECT).signWith(SECRET_KEY).compact();
+    protected static final String SUPER_USER_SUBJECT = "super-user";
+    protected static final String SUPER_USER_TOKEN = Jwts.builder()
+            .claim("sub", SUPER_USER_SUBJECT).signWith(SECRET_KEY).compact();
+    protected static final String NOBODY_SUBJECT =  "nobody";
+    protected static final String NOBODY_TOKEN = Jwts.builder()
+            .claim("sub", NOBODY_SUBJECT).signWith(SECRET_KEY).compact();
+
+
+    @SneakyThrows
+    protected void configureTokenAuthentication() {
+        serviceConfiguration.setAuthenticationEnabled(true);
+        
serviceConfiguration.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        // internal client
+        
serviceConfiguration.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        final Map<String, String> brokerClientAuthParams = new HashMap<>();
+        brokerClientAuthParams.put("token", BROKER_INTERNAL_CLIENT_TOKEN);
+        final String brokerClientAuthParamStr = 
MAPPER.writeValueAsString(brokerClientAuthParams);
+        
serviceConfiguration.setBrokerClientAuthenticationParameters(brokerClientAuthParamStr);
+
+        Properties properties = serviceConfiguration.getProperties();
+        if (properties == null) {
+            properties = new Properties();
+            serviceConfiguration.setProperties(properties);
+        }
+        properties.put("tokenSecretKey", 
AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+
+    }
+
+
+
+    protected void configureDefaultAuthorization() {
+        serviceConfiguration.setAuthorizationEnabled(true);
+        
serviceConfiguration.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+        serviceConfiguration.setSuperUserRoles(Set.of(SUPER_USER_SUBJECT, 
BROKER_INTERNAL_CLIENT_SUBJECT));
+    }
+
+
+
     @SneakyThrows
     protected void loadECTlsCertificateWithFile() {
         serviceConfiguration.setTlsEnabled(true);
@@ -176,4 +231,7 @@ public abstract class MockedPulsarStandalone implements 
AutoCloseable {
     protected static final String TLS_EC_KS_TRUSTED_STORE =
             getAbsolutePath("certificate-authority/ec/jks/ca.truststore.jks");
     protected static final String TLS_EC_KS_TRUSTED_STORE_PASS = "rootpw";
+
+
+    private static final ObjectMapper MAPPER = 
ObjectMapperFactory.getMapper().getObjectMapper();
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECCertificateFileTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECCertificateFileTest.java
index 39d9b7326d1..b02b10f5996 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECCertificateFileTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECCertificateFileTest.java
@@ -25,7 +25,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.UUID;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
-import org.apache.pulsar.security.tls.MockedPulsarStandalone;
+import org.apache.pulsar.security.MockedPulsarStandalone;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECKeyStoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECKeyStoreTest.java
index e39ad67e4a9..c6ff16d4cc5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECKeyStoreTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECKeyStoreTest.java
@@ -21,9 +21,13 @@ package org.apache.pulsar.security.tls.ec;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
-import org.apache.pulsar.security.tls.MockedPulsarStandalone;
+import org.apache.pulsar.security.MockedPulsarStandalone;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -35,10 +39,6 @@ import 
org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
 
 
 @Test

Reply via email to