This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new fe7ee88 [Issue 10221] Fix authorization error while using proxy and
`Prefix` subscription authentication mode (#10226)
fe7ee88 is described below
commit fe7ee885a153c105a41eb215a4673597b81286d2
Author: Shen Liu <[email protected]>
AuthorDate: Mon Apr 19 11:11:09 2021 +0800
[Issue 10221] Fix authorization error while using proxy and `Prefix`
subscription authentication mode (#10226)
Fixes #10221
### Motivation
If using pulsar proxy and `Prefix` subscription authentication mode,
[org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider#canConsumeAsync](https://github.com/apache/pulsar/blob/master/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java#L135)
will throw exception which cause the consumer error.
### Modifications
Update the
`org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider#allowTopicOperationAsync`
logic, check `isSuperUser` first, then return `isAuthorizedFuture`.
### Verifying this change
- [x] Make sure that the change passes the CI checks.
(cherry picked from commit 889b9b8e5efc62d2d0cbc761205fba5759c97af0)
---
.../authorization/PulsarAuthorizationProvider.java | 18 +-
.../server/ProxyWithJwtAuthorizationTest.java | 286 +++++++++++++++++++++
2 files changed, 297 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index ca7ca9e..e20c89c 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -564,20 +564,24 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
break;
case CONSUME: isAuthorizedFuture = canConsumeAsync(topicName,
role, authData, authData.getSubscription());
break;
- default: isAuthorizedFuture = FutureUtil.failedFuture(
- new IllegalStateException("TopicOperation is not
supported."));
+ default:
+ return FutureUtil.failedFuture(new
IllegalStateException("TopicOperation is not supported."));
}
CompletableFuture<Boolean> isSuperUserFuture = isSuperUser(role,
authData, conf);
+ // check isSuperUser first
return isSuperUserFuture
- .thenCombine(isAuthorizedFuture, (isSuperUser, isAuthorized)
-> {
+ .thenCompose(isSuperUser -> {
if (log.isDebugEnabled()) {
- log.debug("Verify if role {} is allowed to {} to topic
{}:"
- + " isSuperUser={}, isAuthorized={}",
- role, operation, topicName, isSuperUser,
isAuthorized);
+ log.debug("Verify if role {} is allowed to {} to topic
{}: isSuperUser={}",
+ role, operation, topicName, isSuperUser);
+ }
+ if (isSuperUser) {
+ return CompletableFuture.completedFuture(true);
+ } else {
+ return isAuthorizedFuture;
}
- return isSuperUser || isAuthorized;
});
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
new file mode 100644
index 0000000..f683adf
--- /dev/null
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+
+import com.google.common.collect.Sets;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.crypto.SecretKey;
+
+public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase {
+ private static final Logger log =
LoggerFactory.getLogger(ProxyWithAuthorizationTest.class);
+
+ private final String ADMIN_ROLE = "admin";
+ private final String PROXY_ROLE = "proxy";
+ private final String BROKER_ROLE = "broker";
+ private final String CLIENT_ROLE = "client";
+ private final SecretKey SECRET_KEY =
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+ private final String ADMIN_TOKEN =
Jwts.builder().setSubject(ADMIN_ROLE).signWith(SECRET_KEY).compact();
+ private final String PROXY_TOKEN =
Jwts.builder().setSubject(PROXY_ROLE).signWith(SECRET_KEY).compact();
+ private final String BROKER_TOKEN =
Jwts.builder().setSubject(BROKER_ROLE).signWith(SECRET_KEY).compact();
+ private final String CLIENT_TOKEN =
Jwts.builder().setSubject(CLIENT_ROLE).signWith(SECRET_KEY).compact();
+
+ private ProxyService proxyService;
+ private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ // enable auth&auth and use JWT at broker
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthorizationEnabled(true);
+ conf.getProperties().setProperty("tokenSecretKey", "data:;base64," +
Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+
+ Set<String> superUserRoles = new HashSet<>();
+ superUserRoles.add(ADMIN_ROLE);
+ superUserRoles.add(PROXY_ROLE);
+ superUserRoles.add(BROKER_ROLE);
+ conf.setSuperUserRoles(superUserRoles);
+
+
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+ conf.setBrokerClientAuthenticationParameters(BROKER_TOKEN);
+ Set<String> providers = new HashSet<>();
+ providers.add(AuthenticationProviderToken.class.getName());
+ conf.setAuthenticationProviders(providers);
+
+ conf.setClusterName("proxy-authorization");
+ conf.setNumExecutorThreadPoolSize(5);
+
+ super.init();
+
+ // start proxy service
+ proxyConfig.setAuthenticationEnabled(true);
+ proxyConfig.setAuthorizationEnabled(false);
+ proxyConfig.getProperties().setProperty("tokenSecretKey",
"data:;base64," + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+ proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+
+ proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setWebServicePort(Optional.of(0));
+
+ // enable auth&auth and use JWT at proxy
+
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+ proxyConfig.setBrokerClientAuthenticationParameters(PROXY_TOKEN);
+ proxyConfig.setAuthenticationProviders(providers);
+
+ proxyService = Mockito.spy(new ProxyService(proxyConfig,
+ new AuthenticationService(
+ PulsarConfigurationLoader.convertFrom(proxyConfig))));
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ proxyService.close();
+ }
+
+ private void startProxy() throws Exception {
+ proxyService.start();
+ }
+
+ /**
+ * <pre>
+ * It verifies jwt + Authentication + Authorization (client -> proxy ->
broker).
+ *
+ * 1. client connects to proxy over jwt and pass auth-data
+ * 2. proxy authenticate client and retrieve client-role
+ * and send it to broker as originalPrincipal over jwt
+ * 3. client creates producer/consumer via proxy
+ * 4. broker authorize producer/consumer create request using
originalPrincipal
+ *
+ * </pre>
+ */
+ @Test
+ public void testProxyAuthorization() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ startProxy();
+ createAdminClient();
+ PulsarClient proxyClient =
createPulsarClient(proxyService.getServiceUrl(), PulsarClient.builder());
+
+ String namespaceName = "my-property/proxy-authorization/my-ns";
+
+ admin.clusters().createCluster("proxy-authorization", new
ClusterData(brokerUrl.toString()));
+
+ admin.tenants().createTenant("my-property",
+ new TenantInfo(Sets.newHashSet("appid1", "appid2"),
Sets.newHashSet("proxy-authorization")));
+ admin.namespaces().createNamespace(namespaceName);
+
+ Consumer<byte[]> consumer;
+ try {
+ consumer = proxyClient.newConsumer()
+
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1")
+ .subscriptionName("my-subscriber-name").subscribe();
+ Assert.fail("should have failed with authorization error");
+ } catch (Exception ex) {
+ // excepted
+ admin.namespaces().grantPermissionOnNamespace(namespaceName,
CLIENT_ROLE,
+ Sets.newHashSet(AuthAction.consume));
+ log.info("-- Admin permissions {} ---",
admin.namespaces().getPermissions(namespaceName));
+ consumer = proxyClient.newConsumer()
+
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1")
+ .subscriptionName("my-subscriber-name").subscribe();
+ }
+
+ Producer<byte[]> producer;
+ try {
+ producer = proxyClient.newProducer(Schema.BYTES)
+
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1").create();
+ Assert.fail("should have failed with authorization error");
+ } catch (Exception ex) {
+ // excepted
+ admin.namespaces().grantPermissionOnNamespace(namespaceName,
CLIENT_ROLE,
+ Sets.newHashSet(AuthAction.produce, AuthAction.consume));
+ log.info("-- Admin permissions {} ---",
admin.namespaces().getPermissions(namespaceName));
+ producer = proxyClient.newProducer(Schema.BYTES)
+
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1").create();
+ }
+ final int msgs = 10;
+ for (int i = 0; i < msgs; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ Message<byte[]> msg = null;
+ Set<String> messageSet = Sets.newHashSet();
+ int count = 0;
+ for (int i = 0; i < 10; i++) {
+ msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message: [{}]", receivedMessage);
+ String expectedMessage = "my-message-" + i;
+ testMessageOrderAndDuplicates(messageSet, receivedMessage,
expectedMessage);
+ count++;
+ }
+ // Acknowledge the consumption of all messages at once
+ Assert.assertEquals(msgs, count);
+ consumer.acknowledgeCumulative(msg);
+ consumer.close();
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ /**
+ * <pre>
+ * It verifies jwt + Authentication + Authorization (client -> proxy ->
broker).
+ * It also test `SubscriptionAuthMode.Prefix` mode.
+ *
+ * 1. client connects to proxy over jwt and pass auth-data
+ * 2. proxy authenticate client and retrieve client-role
+ * and send it to broker as originalPrincipal over jwt
+ * 3. client creates producer/consumer via proxy
+ * 4. broker authorize producer/consumer create request using
originalPrincipal
+ *
+ * </pre>
+ */
+ @Test
+ public void testProxyAuthorizationWithPrefixSubscriptionAuthMode() throws
Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ startProxy();
+ createAdminClient();
+ PulsarClient proxyClient =
createPulsarClient(proxyService.getServiceUrl(), PulsarClient.builder());
+
+ String namespaceName = "my-property/proxy-authorization/my-ns";
+
+ admin.clusters().createCluster("proxy-authorization", new
ClusterData(brokerUrl.toString()));
+
+ admin.tenants().createTenant("my-property",
+ new TenantInfo(Sets.newHashSet("appid1", "appid2"),
Sets.newHashSet("proxy-authorization")));
+ admin.namespaces().createNamespace(namespaceName);
+ admin.namespaces().grantPermissionOnNamespace(namespaceName,
CLIENT_ROLE,
+ Sets.newHashSet(AuthAction.produce, AuthAction.consume));
+ admin.namespaces().setSubscriptionAuthMode(namespaceName,
SubscriptionAuthMode.Prefix);
+
+ Consumer<byte[]> consumer;
+ try {
+ consumer = proxyClient.newConsumer()
+
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1")
+ .subscriptionName("my-subscriber-name").subscribe();
+ Assert.fail("should have failed with authorization error");
+ } catch (Exception ex) {
+ // excepted
+ consumer = proxyClient.newConsumer()
+
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1")
+ .subscriptionName(CLIENT_ROLE + "-sub1").subscribe();
+ }
+
+ Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES)
+
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1").create();
+ final int msgs = 10;
+ for (int i = 0; i < msgs; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ Message<byte[]> msg = null;
+ Set<String> messageSet = Sets.newHashSet();
+ int count = 0;
+ for (int i = 0; i < 10; i++) {
+ msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message: [{}]", receivedMessage);
+ String expectedMessage = "my-message-" + i;
+ testMessageOrderAndDuplicates(messageSet, receivedMessage,
expectedMessage);
+ count++;
+ }
+ // Acknowledge the consumption of all messages at once
+ Assert.assertEquals(msgs, count);
+ consumer.acknowledgeCumulative(msg);
+ consumer.close();
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ private void createAdminClient() throws Exception {
+ admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+
.authentication(AuthenticationFactory.token(ADMIN_TOKEN)).build());
+ }
+
+ private PulsarClient createPulsarClient(String proxyServiceUrl,
ClientBuilder clientBuilder)
+ throws PulsarClientException {
+ return clientBuilder.serviceUrl(proxyServiceUrl).statsInterval(0,
TimeUnit.SECONDS)
+ .authentication(AuthenticationFactory.token(CLIENT_TOKEN))
+ .operationTimeout(1000, TimeUnit.MILLISECONDS).build();
+ }
+}