This is an automated email from the ASF dual-hosted git repository.
jai1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 15e6655 Proxy roles enforcement (#1168)
15e6655 is described below
commit 15e665545b1610df362bf58d730bf3b8a2699308
Author: Jai Asher <[email protected]>
AuthorDate: Mon Feb 5 18:50:50 2018 -0800
Proxy roles enforcement (#1168)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 14 +-
.../broker/authorization/AuthorizationManager.java | 22 +-
.../apache/pulsar/broker/service/ServerCnx.java | 52 +++-
.../pulsar/broker/service/ServerCnxTest.java | 1 +
.../proxy/server/ProxyRolesEnforcementTest.java | 266 +++++++++++++++++++++
5 files changed, 343 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 0cc4001..b542ee4 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -193,6 +193,10 @@ public class ServiceConfiguration implements
PulsarConfiguration {
// do all admin operations and publish/consume from all topics
private Set<String> superUserRoles = Sets.newTreeSet();
+ // Role names that are treated as "proxy roles". If the broker sees a
request with
+ // role as proxyRoles - it will demand to see the original client role or
certificate.
+ private Set<String> proxyRoles = Sets.newTreeSet();
+
// Allow wildcard matching in authorization
// (wildcard matching only applicable if wildcard-char:
// * presents at first or last position eg: *.pulsar.service,
pulsar.service.*)
@@ -794,7 +798,15 @@ public class ServiceConfiguration implements
PulsarConfiguration {
public Set<String> getSuperUserRoles() {
return superUserRoles;
}
-
+
+ public Set<String> getProxyRoles() {
+ return proxyRoles;
+ }
+
+ public void setProxyRoles(Set<String> proxyRoles) {
+ this.proxyRoles = proxyRoles;
+ }
+
public boolean getAuthorizationAllowWildcardsMatching() {
return authorizationAllowWildcardsMatching;
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
index 2bf7ce6..0950ae2 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
@@ -171,9 +171,12 @@ public class AuthorizationManager {
finalResult.complete(produceAuthorized);
return;
}
- } else if (log.isDebugEnabled()) {
- log.debug("Destination [{}] Role [{}] exception occured while
trying to check Produce permissions. {}",
- destination.toString(), role, ex.getMessage());
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Destination [{}] Role [{}] exception occured
while trying to check Produce permissions. {}",
+ destination.toString(), role, ex.getMessage());
+ }
}
canConsumeAsync(destination, role,
null).whenComplete((consumeAuthorized, e) -> {
if (e == null) {
@@ -181,10 +184,15 @@ public class AuthorizationManager {
finalResult.complete(consumeAuthorized);
return;
}
- } else if (log.isDebugEnabled()) {
- log.debug(
- "Destination [{}] Role [{}] exception occured
while trying to check Consume permissions. {}",
- destination.toString(), role, e.getMessage());
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Destination [{}] Role [{}] exception occured
while trying to check Consume permissions. {}",
+ destination.toString(), role, e.getMessage());
+
+ }
+ finalResult.completeExceptionally(e);
+ return;
}
finalResult.complete(false);
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 8888c97..dfa4fd7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -27,6 +27,7 @@ import static
org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
import java.net.SocketAddress;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -37,6 +38,7 @@ import javax.net.ssl.SSLSession;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
@@ -77,6 +79,8 @@ import
org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Sets;
+
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
@@ -89,7 +93,7 @@ public class ServerCnx extends PulsarHandler {
private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
private State state;
private volatile boolean isActive = true;
- private String authRole = null;
+ String authRole = null;
// Max number of pending requests per connections. If multiple producers
are sharing the same connection the flow
// control done by a single producer might not be enough to prevent write
spikes on the broker.
@@ -101,7 +105,8 @@ public class ServerCnx extends PulsarHandler {
private int nonPersistentPendingMessages = 0;
private final int MaxNonPersistentPendingMessages;
private String originalPrincipal = null;
-
+ private Set<String> proxyRoles = Sets.newHashSet();
+
enum State {
Start, Connected, Failed
}
@@ -117,6 +122,7 @@ public class ServerCnx extends PulsarHandler {
this.replicatorPrefix =
service.pulsar().getConfiguration().getReplicatorPrefix();
this.MaxNonPersistentPendingMessages =
service.pulsar().getConfiguration()
.getMaxConcurrentNonPersistentMessagePerConnection();
+ this.proxyRoles = service.pulsar().getConfiguration().getProxyRoles();
}
@Override
@@ -180,6 +186,19 @@ public class ServerCnx extends PulsarHandler {
ctx.close();
}
+ private boolean validateOriginalPrincipal(String originalPrincipal,
ByteBuf errorResponse, String topicName,
+ String msg) {
+ if (service.isAuthenticationEnabled() &&
service.isAuthorizationEnabled() && proxyRoles.contains(authRole)
+ && (StringUtils.isBlank(originalPrincipal) ||
proxyRoles.contains(originalPrincipal))) {
+ log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic
{}", remoteAddress, msg, authRole,
+ originalPrincipal, topicName);
+ ctx.writeAndFlush(errorResponse);
+ return false;
+ }
+
+ return true;
+ }
+
// ////
// // Incoming commands handling
// ////
@@ -196,6 +215,13 @@ public class ServerCnx extends PulsarHandler {
if (lookupSemaphore.tryAcquire()) {
final String originalPrincipal = lookup.hasOriginalPrincipal() ?
lookup.getOriginalPrincipal()
: this.originalPrincipal;
+ if (!validateOriginalPrincipal(originalPrincipal,
+ newLookupErrorResponse(ServerError.AuthorizationError,
+ "Valid Proxy Client role should be provided for
lookup ", requestId),
+ topicName, "Valid Proxy Client role should be provided for
lookup ")) {
+ lookupSemaphore.release();
+ return;
+ }
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null)
{
isProxyAuthorizedFuture = service.getAuthorizationManager()
@@ -253,9 +279,15 @@ public class ServerCnx extends PulsarHandler {
}
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
-
final String originalPrincipal =
partitionMetadata.hasOriginalPrincipal()
? partitionMetadata.getOriginalPrincipal() :
this.originalPrincipal;
+ if (!validateOriginalPrincipal(originalPrincipal,
+
Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
+ "Valid Proxy Client role should be provided for
getPartitionMetadataRequest ", requestId),
+ topicName, "Valid Proxy Client role should be provided for
getPartitionMetadataRequest ")) {
+ lookupSemaphore.release();
+ return;
+ }
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null)
{
isProxyAuthorizedFuture = service.getAuthorizationManager()
@@ -417,7 +449,12 @@ public class ServerCnx extends PulsarHandler {
final String topicName = subscribe.getTopic();
final long requestId = subscribe.getRequestId();
final long consumerId = subscribe.getConsumerId();
-
+ if (!validateOriginalPrincipal(originalPrincipal,
+ Commands.newError(requestId, ServerError.AuthorizationError,
+ "Valid Proxy Client role should be provided while
subscribing "),
+ topicName, "Valid Proxy Client role should be provided while
subscribing ")) {
+ return;
+ }
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
isProxyAuthorizedFuture =
service.getAuthorizationManager().canConsumeAsync(DestinationName.get(topicName),
@@ -570,6 +607,13 @@ public class ServerCnx extends PulsarHandler {
final long producerId = cmdProducer.getProducerId();
final long requestId = cmdProducer.getRequestId();
+ if (!validateOriginalPrincipal(originalPrincipal,
+ Commands.newError(requestId, ServerError.AuthorizationError,
+ "Valid Proxy Client role should be provided while
creating producer "),
+ topicName, "Valid Proxy Client role should be provided while
creating producer ")) {
+ return;
+ }
+
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
isProxyAuthorizedFuture =
service.getAuthorizationManager().canProduceAsync(DestinationName.get(topicName),
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 058a4da..b604cfd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1341,6 +1341,7 @@ public class ServerCnxTest {
channel.close().get();
}
serverCnx = new ServerCnx(brokerService);
+ serverCnx.authRole = "";
channel = new EmbeddedChannel(new
LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4), serverCnx);
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
new file mode 100644
index 0000000..73bf11f
--- /dev/null
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import javax.naming.AuthenticationException;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class ProxyRolesEnforcementTest extends ProducerConsumerBase {
+ private static final Logger log =
LoggerFactory.getLogger(ProxyRolesEnforcementTest.class);
+
+ public static class BasicAuthenticationData implements
AuthenticationDataProvider {
+ private String authParam;
+
+ public BasicAuthenticationData(String authParam) {
+ this.authParam = authParam;
+ }
+
+ public boolean hasDataFromCommand() {
+ return true;
+ }
+
+ public String getCommandData() {
+ return authParam;
+ }
+
+ public boolean hasDataForHttp() {
+ return true;
+ }
+
+ @Override
+ public Set<Entry<String, String>> getHttpHeaders() {
+ Map<String, String> headers = new HashMap<>();
+ headers.put("BasicAuthentication", authParam);
+ return headers.entrySet();
+ }
+ }
+
+ public static class BasicAuthentication implements Authentication {
+
+ private String authParam;
+
+ @Override
+ public void close() throws IOException {
+ // noop
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return "BasicAuthentication";
+ }
+
+ @Override
+ public AuthenticationDataProvider getAuthData() throws
PulsarClientException {
+ try {
+ return new BasicAuthenticationData(authParam);
+ } catch (Exception e) {
+ throw new PulsarClientException(e);
+ }
+ }
+
+ @Override
+ public void configure(Map<String, String> authParams) {
+ this.authParam = authParams.get("authParam");
+ }
+
+ @Override
+ public void start() throws PulsarClientException {
+ // noop
+ }
+ }
+
+ public static class BasicAuthenticationProvider implements
AuthenticationProvider {
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void initialize(ServiceConfiguration config) throws IOException
{
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return "BasicAuthentication";
+ }
+
+ @Override
+ public String authenticate(AuthenticationDataSource authData) throws
AuthenticationException {
+ if (authData.hasDataFromCommand()) {
+ return authData.getCommandData();
+ } else if (authData.hasDataFromHttp()) {
+ return authData.getHttpHeader("BasicAuthentication");
+ }
+
+ return null;
+ }
+ }
+
+ private int webServicePort;
+ private int servicePort;
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ webServicePort = PortManager.nextFreePort();
+ servicePort = PortManager.nextFreePort();
+ // enable tls and auth&auth at broker
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthorizationEnabled(true);
+ conf.setTlsEnabled(false);
+
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+ conf.setBrokerClientAuthenticationParameters("authParam:broker");
+
+ Set<String> superUserRoles = new HashSet<String>();
+ superUserRoles.add("admin");
+ conf.setSuperUserRoles(superUserRoles);
+
+ Set<String> providers = new HashSet<String>();
+ providers.add(BasicAuthenticationProvider.class.getName());
+ conf.setAuthenticationProviders(providers);
+
+ conf.setClusterName("use");
+ Set<String> proxyRoles = new HashSet<String>();
+ proxyRoles.add("proxy");
+ conf.setProxyRoles(proxyRoles);
+
+ super.init();
+ }
+
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ void testIncorrectRoles() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ // Step 1: Create Admin Client
+ createAdminClient();
+ final String proxyServiceUrl = "pulsar://localhost:" + servicePort;
+ // create a client which connects to proxy and pass authData
+ String namespaceName = "my-property/use/my-ns";
+ String topicName = "persistent://my-property/use/my-ns/my-topic1";
+ String subscriptionName = "my-subscriber-name";
+ String clientAuthParams = "authParam:client";
+ String proxyAuthParams = "authParam:proxy";
+
+ admin.properties().createProperty("my-property",
+ new PropertyAdmin(Lists.newArrayList("appid1", "appid2"),
Sets.newHashSet("use")));
+ admin.namespaces().createNamespace(namespaceName);
+
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "client",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+ // Step 2: Try to use proxy Client as a normal Client - expect
exception
+ PulsarClient proxyClient = createPulsarClient("pulsar://localhost:" +
BROKER_PORT, proxyAuthParams);
+ ConsumerConfiguration consumerConf = new ConsumerConfiguration();
+ consumerConf.setSubscriptionType(SubscriptionType.Exclusive);
+ Consumer consumer;
+ boolean exceptionOccured = false;
+ try {
+ consumer = proxyClient.subscribe(topicName, subscriptionName,
+ consumerConf);
+ } catch(Exception ex) {
+ exceptionOccured = true;
+ }
+ Assert.assertTrue(exceptionOccured);
+
+ // Step 4: Run Pulsar Proxy and pass proxy params as client params -
expect exception
+ ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ proxyConfig.setAuthenticationEnabled(true);
+
+ proxyConfig.setServicePort(servicePort);
+ proxyConfig.setWebServicePort(webServicePort);
+ proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
+
+
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+ proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
+
+ Set<String> providers = new HashSet<>();
+ providers.add(BasicAuthenticationProvider.class.getName());
+ proxyConfig.setAuthenticationProviders(providers);
+ ProxyService proxyService = new ProxyService(proxyConfig);
+
+ proxyService.start();
+ proxyClient = createPulsarClient(proxyServiceUrl, proxyAuthParams);
+ try {
+ consumer = proxyClient.subscribe(topicName, subscriptionName,
+ consumerConf);
+ } catch(Exception ex) {
+ exceptionOccured = true;
+ }
+
+ Assert.assertTrue(exceptionOccured);
+
+ // Step 4: Pass correct client params
+ proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams);
+ consumer = proxyClient.subscribe(topicName, subscriptionName,
+ consumerConf);
+ proxyService.close();
+ }
+
+ private void createAdminClient() throws PulsarClientException {
+ String adminAuthParams = "authParam:admin";
+ org.apache.pulsar.client.api.ClientConfiguration clientConf = new
org.apache.pulsar.client.api.ClientConfiguration();
+ clientConf.setAuthentication(BasicAuthentication.class.getName(),
adminAuthParams);
+
+ admin = spy(new PulsarAdmin(brokerUrl, clientConf));
+ }
+
+ private PulsarClient createPulsarClient(String proxyServiceUrl, String
authParams) throws PulsarClientException {
+ org.apache.pulsar.client.api.ClientConfiguration clientConf = new
org.apache.pulsar.client.api.ClientConfiguration();
+ clientConf.setAuthentication(BasicAuthentication.class.getName(),
authParams);
+ return PulsarClient.create(proxyServiceUrl, clientConf);
+ }
+}
--
To stop receiving notification emails like this one, please contact
[email protected].