This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 72e9babe4c2 [improve][broker] Add fine-grain authorization to
retention admin API (#22163)
72e9babe4c2 is described below
commit 72e9babe4c2ba8e2d1f59b296befb538a7c483b9
Author: Qiang Zhao <[email protected]>
AuthorDate: Thu Feb 29 18:57:03 2024 +0800
[improve][broker] Add fine-grain authorization to retention admin API
(#22163)
---
.../pulsar/broker/admin/v2/PersistentTopics.java | 9 +-
.../broker/admin/TopicPoliciesAuthZTest.java | 175 +++++++++++++++++++++
.../security/{tls => }/MockedPulsarStandalone.java | 60 ++++++-
.../tls/ec/TlsWithECCertificateFileTest.java | 2 +-
.../security/tls/ec/TlsWithECKeyStoreTest.java | 10 +-
5 files changed, 246 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index f0f42a8ff4a..9094a4642a0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2435,7 +2435,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION,
PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetRetention(applied, isGlobal))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
@@ -2462,7 +2463,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Retention policies for the specified topic")
RetentionPolicies retention) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetRetention(retention, isGlobal))
.thenRun(() -> {
try {
@@ -2498,7 +2500,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveRetention(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove retention: namespace={},
topic={}",
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
new file mode 100644
index 00000000000..f07b9a6c2aa
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import io.jsonwebtoken.Jwts;
+import java.util.Set;
+import java.util.UUID;
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.security.MockedPulsarStandalone;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.awaitility.Awaitility.await;
+
+
+public final class TopicPoliciesAuthZTest extends MockedPulsarStandalone {
+
+ private PulsarAdmin superUserAdmin;
+
+ private PulsarAdmin tenantManagerAdmin;
+
+ private static final String TENANT_ADMIN_SUBJECT =
UUID.randomUUID().toString();
+ private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
+ .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
+
+ @SneakyThrows
+ @BeforeClass
+ public void before() {
+ configureTokenAuthentication();
+ configureDefaultAuthorization();
+ start();
+ this.superUserAdmin =PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
+ .build();
+ final TenantInfo tenantInfo =
superUserAdmin.tenants().getTenantInfo("public");
+ tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT);
+ superUserAdmin.tenants().updateTenant("public", tenantInfo);
+ this.tenantManagerAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
+ .build();
+ }
+
+
+ @SneakyThrows
+ @AfterClass
+ public void after() {
+ close();
+ }
+
+
+ @SneakyThrows
+ @Test
+ public void testRetention() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ final RetentionPolicies definedRetentionPolicy = new
RetentionPolicies(1, 1);
+ // test superuser
+ superUserAdmin.topicPolicies().setRetention(topic,
definedRetentionPolicy);
+
+ // because the topic policies is eventual consistency, we should wait
here
+ await().untilAsserted(() -> {
+ final RetentionPolicies receivedRetentionPolicy =
superUserAdmin.topicPolicies().getRetention(topic);
+ Assert.assertEquals(receivedRetentionPolicy,
definedRetentionPolicy);
+ });
+ superUserAdmin.topicPolicies().removeRetention(topic);
+
+ await().untilAsserted(() -> {
+ final RetentionPolicies retention =
superUserAdmin.topicPolicies().getRetention(topic);
+ Assert.assertNull(retention);
+ });
+
+ // test tenant manager
+
+ tenantManagerAdmin.topicPolicies().setRetention(topic,
definedRetentionPolicy);
+ await().untilAsserted(() -> {
+ final RetentionPolicies receivedRetentionPolicy =
tenantManagerAdmin.topicPolicies().getRetention(topic);
+ Assert.assertEquals(receivedRetentionPolicy,
definedRetentionPolicy);
+ });
+ tenantManagerAdmin.topicPolicies().removeRetention(topic);
+ await().untilAsserted(() -> {
+ final RetentionPolicies retention =
tenantManagerAdmin.topicPolicies().getRetention(topic);
+ Assert.assertNull(retention);
+ });
+
+ // test nobody
+
+ try {
+ subAdmin.topicPolicies().getRetention(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
+ subAdmin.topicPolicies().setRetention(topic,
definedRetentionPolicy);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+ subAdmin.topicPolicies().removeRetention(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ // test sub user with permissions
+ for (AuthAction action : AuthAction.values()) {
+
superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+ subject, Set.of(action));
+ try {
+ subAdmin.topicPolicies().getRetention(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
+ subAdmin.topicPolicies().setRetention(topic,
definedRetentionPolicy);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+ subAdmin.topicPolicies().removeRetention(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default",
subject);
+ }
+ }
+
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/MockedPulsarStandalone.java
b/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java
similarity index 76%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/security/tls/MockedPulsarStandalone.java
rename to
pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java
index 1a7e806f0e6..421727c0ed7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/MockedPulsarStandalone.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java
@@ -16,25 +16,36 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.security.tls;
+package org.apache.pulsar.security;
import static org.apache.pulsar.utils.ResourceUtils.getAbsolutePath;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import javax.crypto.SecretKey;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+
public abstract class MockedPulsarStandalone implements AutoCloseable {
@@ -60,6 +71,50 @@ public abstract class MockedPulsarStandalone implements
AutoCloseable {
serviceConfiguration.setExposeBundlesMetricsInPrometheus(true);
}
+
+ protected static final SecretKey SECRET_KEY =
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+ private static final String BROKER_INTERNAL_CLIENT_SUBJECT =
"broker_internal";
+ private static final String BROKER_INTERNAL_CLIENT_TOKEN = Jwts.builder()
+ .claim("sub",
BROKER_INTERNAL_CLIENT_SUBJECT).signWith(SECRET_KEY).compact();
+ protected static final String SUPER_USER_SUBJECT = "super-user";
+ protected static final String SUPER_USER_TOKEN = Jwts.builder()
+ .claim("sub", SUPER_USER_SUBJECT).signWith(SECRET_KEY).compact();
+ protected static final String NOBODY_SUBJECT = "nobody";
+ protected static final String NOBODY_TOKEN = Jwts.builder()
+ .claim("sub", NOBODY_SUBJECT).signWith(SECRET_KEY).compact();
+
+
+ @SneakyThrows
+ protected void configureTokenAuthentication() {
+ serviceConfiguration.setAuthenticationEnabled(true);
+
serviceConfiguration.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+ // internal client
+
serviceConfiguration.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+ final Map<String, String> brokerClientAuthParams = new HashMap<>();
+ brokerClientAuthParams.put("token", BROKER_INTERNAL_CLIENT_TOKEN);
+ final String brokerClientAuthParamStr =
MAPPER.writeValueAsString(brokerClientAuthParams);
+
serviceConfiguration.setBrokerClientAuthenticationParameters(brokerClientAuthParamStr);
+
+ Properties properties = serviceConfiguration.getProperties();
+ if (properties == null) {
+ properties = new Properties();
+ serviceConfiguration.setProperties(properties);
+ }
+ properties.put("tokenSecretKey",
AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+
+ }
+
+
+
+ protected void configureDefaultAuthorization() {
+ serviceConfiguration.setAuthorizationEnabled(true);
+
serviceConfiguration.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+ serviceConfiguration.setSuperUserRoles(Set.of(SUPER_USER_SUBJECT,
BROKER_INTERNAL_CLIENT_SUBJECT));
+ }
+
+
+
@SneakyThrows
protected void loadECTlsCertificateWithFile() {
serviceConfiguration.setTlsEnabled(true);
@@ -176,4 +231,7 @@ public abstract class MockedPulsarStandalone implements
AutoCloseable {
protected static final String TLS_EC_KS_TRUSTED_STORE =
getAbsolutePath("certificate-authority/ec/jks/ca.truststore.jks");
protected static final String TLS_EC_KS_TRUSTED_STORE_PASS = "rootpw";
+
+
+ private static final ObjectMapper MAPPER =
ObjectMapperFactory.getMapper().getObjectMapper();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECCertificateFileTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECCertificateFileTest.java
index 39d9b7326d1..b02b10f5996 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECCertificateFileTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECCertificateFileTest.java
@@ -25,7 +25,7 @@ import java.nio.charset.StandardCharsets;
import java.util.UUID;
import lombok.Cleanup;
import lombok.SneakyThrows;
-import org.apache.pulsar.security.tls.MockedPulsarStandalone;
+import org.apache.pulsar.security.MockedPulsarStandalone;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECKeyStoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECKeyStoreTest.java
index e39ad67e4a9..c6ff16d4cc5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECKeyStoreTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECKeyStoreTest.java
@@ -21,9 +21,13 @@ package org.apache.pulsar.security.tls.ec;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
import lombok.Cleanup;
import lombok.SneakyThrows;
-import org.apache.pulsar.security.tls.MockedPulsarStandalone;
+import org.apache.pulsar.security.MockedPulsarStandalone;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -35,10 +39,6 @@ import
org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
@Test