This is an automated email from the ASF dual-hosted git repository. mattisonchao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0cc42d9130d6245a3b278ff183cb72be4c1a4927 Author: Zixuan Liu <[email protected]> AuthorDate: Mon Jul 25 17:57:19 2022 +0800 [fix][authorization] Fix multiple roles authorization (#16645) (cherry picked from commit d8483d48cb21e8e99fd56c786e5198f7fe7135f6) --- .../MultiRolesTokenAuthorizationProvider.java | 86 ++++++-- .../authorization/PulsarAuthorizationProvider.java | 3 +- .../MultiRolesTokenAuthorizationProviderTest.java | 231 +++++++++++++++++++++ 3 files changed, 307 insertions(+), 13 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java index b8f46a52483..d72c951c889 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java @@ -26,9 +26,12 @@ import io.jsonwebtoken.RequiredTypeException; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import javax.ws.rs.core.Response; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; @@ -38,9 +41,12 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.PolicyName; import org.apache.pulsar.common.policies.data.PolicyOperation; +import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantOperation; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.RestException; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,56 +85,112 @@ public class MultiRolesTokenAuthorizationProvider extends PulsarAuthorizationPro super.initialize(conf, pulsarResources); } - private List<String> getRoles(AuthenticationDataSource authData) { + @Override + public CompletableFuture<Boolean> isSuperUser(String role, AuthenticationDataSource authenticationData, + ServiceConfiguration serviceConfiguration) { + Set<String> roles = getRoles(authenticationData); + if (roles.isEmpty()) { + return CompletableFuture.completedFuture(false); + } + Set<String> superUserRoles = serviceConfiguration.getSuperUserRoles(); + if (superUserRoles.isEmpty()) { + return CompletableFuture.completedFuture(false); + } + + return CompletableFuture.completedFuture(roles.stream().anyMatch(superUserRoles::contains)); + } + + @Override + public CompletableFuture<Boolean> validateTenantAdminAccess(String tenantName, String role, + AuthenticationDataSource authData) { + return isSuperUser(role, authData, conf) + .thenCompose(isSuperUser -> { + if (isSuperUser) { + return CompletableFuture.completedFuture(true); + } + Set<String> roles = getRoles(authData); + if (roles.isEmpty()) { + return CompletableFuture.completedFuture(false); + } + + return pulsarResources.getTenantResources() + .getTenantAsync(tenantName) + .thenCompose(op -> { + if (op.isPresent()) { + TenantInfo tenantInfo = op.get(); + if (tenantInfo.getAdminRoles() == null || tenantInfo.getAdminRoles().isEmpty()) { + return CompletableFuture.completedFuture(false); + } + + return CompletableFuture.completedFuture(roles.stream() + .anyMatch(n -> tenantInfo.getAdminRoles().contains(n))); + } else { + throw new RestException(Response.Status.NOT_FOUND, "Tenant does not exist"); + } + }).exceptionally(ex -> { + Throwable cause = ex.getCause(); + if (cause instanceof MetadataStoreException.NotFoundException) { + log.warn("Failed to get tenant info data for non existing tenant {}", tenantName); + throw new RestException(Response.Status.NOT_FOUND, "Tenant does not exist"); + } + log.error("Failed to get tenant {}", tenantName, cause); + throw new RestException(cause); + }); + }); + } + + private Set<String> getRoles(AuthenticationDataSource authData) { String token = null; if (authData.hasDataFromCommand()) { // Authenticate Pulsar binary connection token = authData.getCommandData(); if (StringUtils.isBlank(token)) { - return Collections.emptyList(); + return Collections.emptySet(); } } else if (authData.hasDataFromHttp()) { // The format here should be compliant to RFC-6750 // (https://tools.ietf.org/html/rfc6750#section-2.1). Eg: Authorization: Bearer xxxxxxxxxxxxx String httpHeaderValue = authData.getHttpHeader(HTTP_HEADER_NAME); if (httpHeaderValue == null || !httpHeaderValue.startsWith(HTTP_HEADER_VALUE_PREFIX)) { - return Collections.emptyList(); + return Collections.emptySet(); } // Remove prefix token = httpHeaderValue.substring(HTTP_HEADER_VALUE_PREFIX.length()); } - if (token == null) - return Collections.emptyList(); + if (token == null) { + return Collections.emptySet(); + } String[] splitToken = token.split("\\."); if (splitToken.length < 2) { log.warn("Unable to extract additional roles from JWT token"); - return Collections.emptyList(); + return Collections.emptySet(); } String unsignedToken = splitToken[0] + "." + splitToken[1] + "."; Jwt<?, Claims> jwt = parser.parseClaimsJwt(unsignedToken); try { - return Collections.singletonList(jwt.getBody().get(roleClaim, String.class)); + return new HashSet<>(Collections.singletonList(jwt.getBody().get(roleClaim, String.class))); } catch (RequiredTypeException requiredTypeException) { try { List list = jwt.getBody().get(roleClaim, List.class); if (list != null) { - return list; + return new HashSet<String>(list); } } catch (RequiredTypeException requiredTypeException1) { - return Collections.emptyList(); + return Collections.emptySet(); } } - return Collections.emptyList(); + return Collections.emptySet(); } - public CompletableFuture<Boolean> authorize(AuthenticationDataSource authenticationData, Function<String, CompletableFuture<Boolean>> authorizeFunc) { - List<String> roles = getRoles(authenticationData); + public CompletableFuture<Boolean> authorize(AuthenticationDataSource authenticationData, Function<String, + CompletableFuture<Boolean>> authorizeFunc) { + Set<String> roles = getRoles(authenticationData); if (roles.isEmpty()) { return CompletableFuture.completedFuture(false); } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 097464bfb5f..b753d2ed634 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -59,7 +59,8 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider { private static final Logger log = LoggerFactory.getLogger(PulsarAuthorizationProvider.class); public ServiceConfiguration conf; - private PulsarResources pulsarResources; + + protected PulsarResources pulsarResources; public PulsarAuthorizationProvider() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java new file mode 100644 index 00000000000..12d7c71358b --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertThrows; +import com.google.common.collect.Sets; +import io.jsonwebtoken.Jwts; +import io.jsonwebtoken.SignatureAlgorithm; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import javax.crypto.SecretKey; +import lombok.Cleanup; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.broker.authorization.MultiRolesTokenAuthorizationProvider; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class MultiRolesTokenAuthorizationProviderTest extends MockedPulsarServiceBaseTest { + + private final SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + private final String superUserToken; + private final String normalUserToken; + + public MultiRolesTokenAuthorizationProviderTest() { + Map<String, Object> claims = new HashMap<>(); + Set<String> roles = new HashSet<>(); + roles.add("user1"); + roles.add("superUser"); + claims.put("roles", roles); + superUserToken = Jwts.builder() + .setClaims(claims) + .signWith(secretKey) + .compact(); + + roles = new HashSet<>(); + roles.add("normalUser"); + roles.add("user2"); + roles.add("user5"); + claims.put("roles", roles); + normalUserToken = Jwts.builder() + .setClaims(claims) + .signWith(secretKey) + .compact(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + + conf.setAuthenticationEnabled(true); + conf.setAuthorizationEnabled(true); + + Set<String> superUserRoles = new HashSet<>(); + superUserRoles.add("superUser"); + conf.setSuperUserRoles(superUserRoles); + + Properties properties = new Properties(); + properties.setProperty("tokenSecretKey", + "data:;base64," + Base64.getEncoder().encodeToString(secretKey.getEncoded())); + properties.setProperty("tokenAuthClaim", "roles"); + conf.setProperties(properties); + + conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + conf.setBrokerClientAuthenticationParameters(superUserToken); + + Set<String> providers = new HashSet<>(); + providers.add(AuthenticationProviderToken.class.getName()); + conf.setAuthenticationProviders(providers); + conf.setAuthorizationProvider(MultiRolesTokenAuthorizationProvider.class.getName()); + + conf.setClusterName(configClusterName); + conf.setNumExecutorThreadPoolSize(5); + } + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + + admin.clusters().createCluster(configClusterName, + ClusterData.builder() + .brokerServiceUrl(brokerUrl.toString()) + .serviceUrl(getPulsar().getWebServiceAddress()) + .build() + ); + } + + @BeforeClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) { + clientBuilder.authentication(new AuthenticationToken(superUserToken)); + } + + @Override + protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) { + pulsarAdminBuilder.authentication(new AuthenticationToken(superUserToken)); + } + + private PulsarAdmin newPulsarAdmin(String token) throws PulsarClientException { + return PulsarAdmin.builder() + .serviceHttpUrl(pulsar.getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .requestTimeout(3, TimeUnit.SECONDS) + .build(); + } + + private PulsarClient newPulsarClient(String token) throws PulsarClientException { + return PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .authentication(new AuthenticationToken(token)) + .operationTimeout(3, TimeUnit.SECONDS) + .build(); + } + + @Test + public void testAdminRequestWithSuperUserToken() throws Exception { + String tenant = "superuser-admin-tenant"; + @Cleanup + PulsarAdmin admin = newPulsarAdmin(superUserToken); + admin.tenants().createTenant(tenant, TenantInfo.builder() + .allowedClusters(Sets.newHashSet(configClusterName)).build()); + String namespace = "superuser-admin-namespace"; + admin.namespaces().createNamespace(tenant + "/" + namespace); + admin.brokers().getAllDynamicConfigurations(); + admin.tenants().getTenants(); + admin.topics().getList(tenant + "/" + namespace); + } + + @Test + public void testProduceAndConsumeWithSuperUserToken() throws Exception { + String tenant = "superuser-client-tenant"; + @Cleanup + PulsarAdmin admin = newPulsarAdmin(superUserToken); + admin.tenants().createTenant(tenant, TenantInfo.builder() + .allowedClusters(Sets.newHashSet(configClusterName)).build()); + String namespace = "superuser-client-namespace"; + admin.namespaces().createNamespace(tenant + "/" + namespace); + String topic = tenant + "/" + namespace + "/" + "test-topic"; + + @Cleanup + PulsarClient client = newPulsarClient(superUserToken); + @Cleanup + Producer<byte[]> producer = client.newProducer().topic(topic).create(); + byte[] body = "hello".getBytes(StandardCharsets.UTF_8); + producer.send(body); + + @Cleanup + Consumer<byte[]> consumer = client.newConsumer().topic(topic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName("test") + .subscribe(); + Message<byte[]> message = consumer.receive(3, TimeUnit.SECONDS); + assertNotNull(message); + assertEquals(message.getData(), body); + } + + @Test + public void testAdminRequestWithNormalUserToken() throws Exception { + String tenant = "normaluser-admin-tenant"; + @Cleanup + PulsarAdmin admin = newPulsarAdmin(normalUserToken); + + assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> admin.tenants().createTenant(tenant, TenantInfo.builder() + .allowedClusters(Sets.newHashSet(configClusterName)).build())); + } + + @Test + public void testProduceAndConsumeWithNormalUserToken() throws Exception { + String tenant = "normaluser-client-tenant"; + @Cleanup + PulsarAdmin admin = newPulsarAdmin(superUserToken); + admin.tenants().createTenant(tenant, TenantInfo.builder() + .allowedClusters(Sets.newHashSet(configClusterName)).build()); + String namespace = "normaluser-client-namespace"; + admin.namespaces().createNamespace(tenant + "/" + namespace); + String topic = tenant + "/" + namespace + "/" + "test-topic"; + + @Cleanup + PulsarClient client = newPulsarClient(normalUserToken); + assertThrows(PulsarClientException.AuthorizationException.class, () -> { + @Cleanup + Producer<byte[]> ignored = client.newProducer().topic(topic).create(); + }); + + assertThrows(PulsarClientException.AuthorizationException.class, () -> { + @Cleanup + Consumer<byte[]> ignored = client.newConsumer().topic(topic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName("test") + .subscribe(); + }); + } +}
