This is an automated email from the ASF dual-hosted git repository.

jai1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 15e6655  Proxy roles enforcement (#1168)
15e6655 is described below

commit 15e665545b1610df362bf58d730bf3b8a2699308
Author: Jai Asher <[email protected]>
AuthorDate: Mon Feb 5 18:50:50 2018 -0800

    Proxy roles enforcement (#1168)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  14 +-
 .../broker/authorization/AuthorizationManager.java |  22 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  52 +++-
 .../pulsar/broker/service/ServerCnxTest.java       |   1 +
 .../proxy/server/ProxyRolesEnforcementTest.java    | 266 +++++++++++++++++++++
 5 files changed, 343 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 0cc4001..b542ee4 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -193,6 +193,10 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     // do all admin operations and publish/consume from all topics
     private Set<String> superUserRoles = Sets.newTreeSet();
 
+    // Role names that are treated as "proxy roles". If the broker sees a 
request with
+    // role as proxyRoles - it will demand to see the original client role or 
certificate.
+    private Set<String> proxyRoles = Sets.newTreeSet();
+
     // Allow wildcard matching in authorization
     // (wildcard matching only applicable if wildcard-char:
     // * presents at first or last position eg: *.pulsar.service, 
pulsar.service.*)
@@ -794,7 +798,15 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     public Set<String> getSuperUserRoles() {
         return superUserRoles;
     }
-
+ 
+    public Set<String> getProxyRoles() {
+        return proxyRoles;
+    }
+    
+    public void setProxyRoles(Set<String> proxyRoles) {
+        this.proxyRoles = proxyRoles;
+    }
+    
     public boolean getAuthorizationAllowWildcardsMatching() {
         return authorizationAllowWildcardsMatching;
     }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
index 2bf7ce6..0950ae2 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
@@ -171,9 +171,12 @@ public class AuthorizationManager {
                     finalResult.complete(produceAuthorized);
                     return;
                 }
-            } else if (log.isDebugEnabled()) {
-                log.debug("Destination [{}] Role [{}] exception occured while 
trying to check Produce permissions. {}",
-                        destination.toString(), role, ex.getMessage());
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug(
+                            "Destination [{}] Role [{}] exception occured 
while trying to check Produce permissions. {}",
+                            destination.toString(), role, ex.getMessage());
+                }
             }
             canConsumeAsync(destination, role, 
null).whenComplete((consumeAuthorized, e) -> {
                 if (e == null) {
@@ -181,10 +184,15 @@ public class AuthorizationManager {
                         finalResult.complete(consumeAuthorized);
                         return;
                     }
-                } else if (log.isDebugEnabled()) {
-                    log.debug(
-                            "Destination [{}] Role [{}] exception occured 
while trying to check Consume permissions. {}",
-                            destination.toString(), role, e.getMessage());
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug(
+                                "Destination [{}] Role [{}] exception occured 
while trying to check Consume permissions. {}",
+                                destination.toString(), role, e.getMessage());
+
+                    }
+                    finalResult.completeExceptionally(e);
+                    return;
                 }
                 finalResult.complete(false);
             });
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 8888c97..dfa4fd7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -27,6 +27,7 @@ import static 
org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
 
 import java.net.SocketAddress;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -37,6 +38,7 @@ import javax.net.ssl.SSLSession;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.util.SafeRun;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
@@ -77,6 +79,8 @@ import 
org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Sets;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
@@ -89,7 +93,7 @@ public class ServerCnx extends PulsarHandler {
     private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
     private State state;
     private volatile boolean isActive = true;
-    private String authRole = null;
+    String authRole = null;
 
     // Max number of pending requests per connections. If multiple producers 
are sharing the same connection the flow
     // control done by a single producer might not be enough to prevent write 
spikes on the broker.
@@ -101,7 +105,8 @@ public class ServerCnx extends PulsarHandler {
     private int nonPersistentPendingMessages = 0;
     private final int MaxNonPersistentPendingMessages;
     private String originalPrincipal = null;
-
+    private Set<String> proxyRoles = Sets.newHashSet();
+    
     enum State {
         Start, Connected, Failed
     }
@@ -117,6 +122,7 @@ public class ServerCnx extends PulsarHandler {
         this.replicatorPrefix = 
service.pulsar().getConfiguration().getReplicatorPrefix();
         this.MaxNonPersistentPendingMessages = 
service.pulsar().getConfiguration()
                 .getMaxConcurrentNonPersistentMessagePerConnection();
+        this.proxyRoles = service.pulsar().getConfiguration().getProxyRoles();
     }
 
     @Override
@@ -180,6 +186,19 @@ public class ServerCnx extends PulsarHandler {
         ctx.close();
     }
 
+    private boolean validateOriginalPrincipal(String originalPrincipal, 
ByteBuf errorResponse, String topicName,
+            String msg) {
+        if (service.isAuthenticationEnabled() && 
service.isAuthorizationEnabled() && proxyRoles.contains(authRole)
+                && (StringUtils.isBlank(originalPrincipal) || 
proxyRoles.contains(originalPrincipal))) {
+            log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic 
{}", remoteAddress, msg, authRole,
+                    originalPrincipal, topicName);
+            ctx.writeAndFlush(errorResponse);
+            return false;
+        }
+
+        return true;
+    }
+    
     // ////
     // // Incoming commands handling
     // ////
@@ -196,6 +215,13 @@ public class ServerCnx extends PulsarHandler {
         if (lookupSemaphore.tryAcquire()) {
             final String originalPrincipal = lookup.hasOriginalPrincipal() ? 
lookup.getOriginalPrincipal()
                     : this.originalPrincipal;
+            if (!validateOriginalPrincipal(originalPrincipal,
+                    newLookupErrorResponse(ServerError.AuthorizationError,
+                            "Valid Proxy Client role should be provided for 
lookup ", requestId),
+                    topicName, "Valid Proxy Client role should be provided for 
lookup ")) {
+                lookupSemaphore.release();
+                return;
+            }
             CompletableFuture<Boolean> isProxyAuthorizedFuture;
             if (service.isAuthorizationEnabled() && originalPrincipal != null) 
{
                 isProxyAuthorizedFuture = service.getAuthorizationManager()
@@ -253,9 +279,15 @@ public class ServerCnx extends PulsarHandler {
         }
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
-
             final String originalPrincipal = 
partitionMetadata.hasOriginalPrincipal()
                     ? partitionMetadata.getOriginalPrincipal() : 
this.originalPrincipal;
+            if (!validateOriginalPrincipal(originalPrincipal,
+                    
Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
+                            "Valid Proxy Client role should be provided for 
getPartitionMetadataRequest ", requestId),
+                    topicName, "Valid Proxy Client role should be provided for 
getPartitionMetadataRequest ")) {
+                lookupSemaphore.release();
+                return;
+            }
             CompletableFuture<Boolean> isProxyAuthorizedFuture;
             if (service.isAuthorizationEnabled() && originalPrincipal != null) 
{
                 isProxyAuthorizedFuture = service.getAuthorizationManager()
@@ -417,7 +449,12 @@ public class ServerCnx extends PulsarHandler {
         final String topicName = subscribe.getTopic();
         final long requestId = subscribe.getRequestId();
         final long consumerId = subscribe.getConsumerId();
-
+        if (!validateOriginalPrincipal(originalPrincipal,
+                Commands.newError(requestId, ServerError.AuthorizationError,
+                        "Valid Proxy Client role should be provided while 
subscribing "),
+                topicName, "Valid Proxy Client role should be provided while 
subscribing ")) {
+            return;
+        }
         CompletableFuture<Boolean> isProxyAuthorizedFuture;
         if (service.isAuthorizationEnabled() && originalPrincipal != null) {
             isProxyAuthorizedFuture = 
service.getAuthorizationManager().canConsumeAsync(DestinationName.get(topicName),
@@ -570,6 +607,13 @@ public class ServerCnx extends PulsarHandler {
         final long producerId = cmdProducer.getProducerId();
         final long requestId = cmdProducer.getRequestId();
 
+        if (!validateOriginalPrincipal(originalPrincipal,
+                Commands.newError(requestId, ServerError.AuthorizationError,
+                        "Valid Proxy Client role should be provided while 
creating producer "),
+                topicName, "Valid Proxy Client role should be provided while 
creating producer ")) {
+            return;
+        }
+        
         CompletableFuture<Boolean> isProxyAuthorizedFuture;
         if (service.isAuthorizationEnabled() && originalPrincipal != null) {
             isProxyAuthorizedFuture = 
service.getAuthorizationManager().canProduceAsync(DestinationName.get(topicName),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 058a4da..b604cfd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1341,6 +1341,7 @@ public class ServerCnxTest {
             channel.close().get();
         }
         serverCnx = new ServerCnx(brokerService);
+        serverCnx.authRole = "";
         channel = new EmbeddedChannel(new 
LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4), serverCnx);
     }
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
new file mode 100644
index 0000000..73bf11f
--- /dev/null
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import javax.naming.AuthenticationException;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class ProxyRolesEnforcementTest extends ProducerConsumerBase {
+    private static final Logger log = 
LoggerFactory.getLogger(ProxyRolesEnforcementTest.class);
+
+    public static class BasicAuthenticationData implements 
AuthenticationDataProvider {
+        private String authParam;
+
+        public BasicAuthenticationData(String authParam) {
+            this.authParam = authParam;
+        }
+        
+        public boolean hasDataFromCommand() {
+            return true;
+        }
+
+        public String getCommandData() {
+            return authParam;
+        }
+        
+        public boolean hasDataForHttp() {
+            return true;
+        }
+        
+        @Override
+        public Set<Entry<String, String>> getHttpHeaders() {
+            Map<String, String> headers = new HashMap<>();
+            headers.put("BasicAuthentication", authParam);
+            return headers.entrySet();
+        }
+    }
+    
+    public static class BasicAuthentication implements Authentication {
+
+        private String authParam;
+
+        @Override
+        public void close() throws IOException {
+            // noop
+        }
+
+        @Override
+        public String getAuthMethodName() {
+            return "BasicAuthentication";
+        }
+
+        @Override
+        public AuthenticationDataProvider getAuthData() throws 
PulsarClientException {
+            try {
+                return new BasicAuthenticationData(authParam);
+            } catch (Exception e) {
+                throw new PulsarClientException(e);
+            }
+        }
+
+        @Override
+        public void configure(Map<String, String> authParams) {
+            this.authParam = authParams.get("authParam");
+        }
+
+        @Override
+        public void start() throws PulsarClientException {
+            // noop
+        }
+    }
+    
+    public static class BasicAuthenticationProvider implements 
AuthenticationProvider {
+
+        @Override
+        public void close() throws IOException {            
+        }
+
+        @Override
+        public void initialize(ServiceConfiguration config) throws IOException 
{            
+        }
+
+        @Override
+        public String getAuthMethodName() {
+            return "BasicAuthentication";
+        }
+
+        @Override
+        public String authenticate(AuthenticationDataSource authData) throws 
AuthenticationException {
+            if (authData.hasDataFromCommand()) {
+                return authData.getCommandData();
+            } else if (authData.hasDataFromHttp()) {
+                return authData.getHttpHeader("BasicAuthentication");
+            }
+            
+            return null;
+        }
+    }
+
+    private int webServicePort;
+    private int servicePort;
+    
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        webServicePort = PortManager.nextFreePort();
+        servicePort = PortManager.nextFreePort();
+        // enable tls and auth&auth at broker 
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+        conf.setTlsEnabled(false);
+        
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+        conf.setBrokerClientAuthenticationParameters("authParam:broker");
+        
+        Set<String> superUserRoles = new HashSet<String>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        
+        Set<String> providers = new HashSet<String>();
+        providers.add(BasicAuthenticationProvider.class.getName());
+        conf.setAuthenticationProviders(providers);
+
+        conf.setClusterName("use");
+        Set<String> proxyRoles = new HashSet<String>();
+        proxyRoles.add("proxy");
+        conf.setProxyRoles(proxyRoles);
+
+        super.init();
+    }
+
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();       
+    }
+    
+    @Test
+    void testIncorrectRoles() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        // Step 1: Create Admin Client
+        createAdminClient();
+        final String proxyServiceUrl = "pulsar://localhost:" + servicePort;
+        // create a client which connects to proxy and pass authData
+        String namespaceName = "my-property/use/my-ns";
+        String topicName = "persistent://my-property/use/my-ns/my-topic1";
+        String subscriptionName = "my-subscriber-name";
+        String clientAuthParams = "authParam:client";
+        String proxyAuthParams = "authParam:proxy";
+        
+        admin.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), 
Sets.newHashSet("use")));
+        admin.namespaces().createNamespace(namespaceName);
+        
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy", 
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "client", 
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+        // Step 2: Try to use proxy Client as a normal Client - expect 
exception
+        PulsarClient proxyClient = createPulsarClient("pulsar://localhost:" + 
BROKER_PORT, proxyAuthParams);
+        ConsumerConfiguration consumerConf = new ConsumerConfiguration();
+        consumerConf.setSubscriptionType(SubscriptionType.Exclusive);
+        Consumer consumer;
+        boolean exceptionOccured = false;
+        try {
+            consumer = proxyClient.subscribe(topicName, subscriptionName,
+                consumerConf);
+        } catch(Exception ex) {
+            exceptionOccured = true;
+        }         
+        Assert.assertTrue(exceptionOccured);
+        
+        // Step 4: Run Pulsar Proxy and pass proxy params as client params - 
expect exception
+        ProxyConfiguration proxyConfig = new ProxyConfiguration();
+        proxyConfig.setAuthenticationEnabled(true);
+
+        proxyConfig.setServicePort(servicePort);
+        proxyConfig.setWebServicePort(webServicePort);
+        proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
+        
+        
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+        proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(BasicAuthenticationProvider.class.getName());
+        proxyConfig.setAuthenticationProviders(providers);
+        ProxyService proxyService = new ProxyService(proxyConfig);
+
+        proxyService.start();
+        proxyClient = createPulsarClient(proxyServiceUrl, proxyAuthParams);
+        try {
+            consumer = proxyClient.subscribe(topicName, subscriptionName,
+                consumerConf);
+        } catch(Exception ex) {
+            exceptionOccured = true;
+        } 
+        
+        Assert.assertTrue(exceptionOccured);
+        
+        // Step 4: Pass correct client params 
+        proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams);
+        consumer = proxyClient.subscribe(topicName, subscriptionName,
+                consumerConf);
+        proxyService.close();
+    }
+
+    private void createAdminClient() throws PulsarClientException {
+        String adminAuthParams = "authParam:admin";
+        org.apache.pulsar.client.api.ClientConfiguration clientConf = new 
org.apache.pulsar.client.api.ClientConfiguration();
+        clientConf.setAuthentication(BasicAuthentication.class.getName(), 
adminAuthParams);
+
+        admin = spy(new PulsarAdmin(brokerUrl, clientConf));        
+    }
+    
+    private PulsarClient createPulsarClient(String proxyServiceUrl, String 
authParams) throws PulsarClientException {
+        org.apache.pulsar.client.api.ClientConfiguration clientConf = new 
org.apache.pulsar.client.api.ClientConfiguration();
+        clientConf.setAuthentication(BasicAuthentication.class.getName(), 
authParams);
+        return PulsarClient.create(proxyServiceUrl, clientConf);
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to