jai1 closed pull request #1168: Proxy roles enforcement
URL: https://github.com/apache/incubator-pulsar/pull/1168
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 0cc40011b..b542ee416 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -193,6 +193,10 @@
     // do all admin operations and publish/consume from all topics
     private Set<String> superUserRoles = Sets.newTreeSet();
 
+    // Role names that are treated as "proxy roles". If the broker sees a 
request with
+    // role as proxyRoles - it will demand to see the original client role or 
certificate.
+    private Set<String> proxyRoles = Sets.newTreeSet();
+
     // Allow wildcard matching in authorization
     // (wildcard matching only applicable if wildcard-char:
     // * presents at first or last position eg: *.pulsar.service, 
pulsar.service.*)
@@ -794,7 +798,15 @@ public void setAuthorizationEnabled(boolean 
authorizationEnabled) {
     public Set<String> getSuperUserRoles() {
         return superUserRoles;
     }
-
+ 
+    public Set<String> getProxyRoles() {
+        return proxyRoles;
+    }
+    
+    public void setProxyRoles(Set<String> proxyRoles) {
+        this.proxyRoles = proxyRoles;
+    }
+    
     public boolean getAuthorizationAllowWildcardsMatching() {
         return authorizationAllowWildcardsMatching;
     }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
index 2bf7ce6df..0950ae27a 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
@@ -171,9 +171,12 @@ public boolean canLookup(DestinationName destination, 
String role) throws Except
                     finalResult.complete(produceAuthorized);
                     return;
                 }
-            } else if (log.isDebugEnabled()) {
-                log.debug("Destination [{}] Role [{}] exception occured while 
trying to check Produce permissions. {}",
-                        destination.toString(), role, ex.getMessage());
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug(
+                            "Destination [{}] Role [{}] exception occured 
while trying to check Produce permissions. {}",
+                            destination.toString(), role, ex.getMessage());
+                }
             }
             canConsumeAsync(destination, role, 
null).whenComplete((consumeAuthorized, e) -> {
                 if (e == null) {
@@ -181,10 +184,15 @@ public boolean canLookup(DestinationName destination, 
String role) throws Except
                         finalResult.complete(consumeAuthorized);
                         return;
                     }
-                } else if (log.isDebugEnabled()) {
-                    log.debug(
-                            "Destination [{}] Role [{}] exception occured 
while trying to check Consume permissions. {}",
-                            destination.toString(), role, e.getMessage());
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug(
+                                "Destination [{}] Role [{}] exception occured 
while trying to check Consume permissions. {}",
+                                destination.toString(), role, e.getMessage());
+
+                    }
+                    finalResult.completeExceptionally(e);
+                    return;
                 }
                 finalResult.complete(false);
             });
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 8888c97e6..dfa4fd7ae 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -27,6 +27,7 @@
 
 import java.net.SocketAddress;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -37,6 +38,7 @@
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.util.SafeRun;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
@@ -77,6 +79,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Sets;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
@@ -89,7 +93,7 @@
     private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
     private State state;
     private volatile boolean isActive = true;
-    private String authRole = null;
+    String authRole = null;
 
     // Max number of pending requests per connections. If multiple producers 
are sharing the same connection the flow
     // control done by a single producer might not be enough to prevent write 
spikes on the broker.
@@ -101,7 +105,8 @@
     private int nonPersistentPendingMessages = 0;
     private final int MaxNonPersistentPendingMessages;
     private String originalPrincipal = null;
-
+    private Set<String> proxyRoles = Sets.newHashSet();
+    
     enum State {
         Start, Connected, Failed
     }
@@ -117,6 +122,7 @@ public ServerCnx(BrokerService service) {
         this.replicatorPrefix = 
service.pulsar().getConfiguration().getReplicatorPrefix();
         this.MaxNonPersistentPendingMessages = 
service.pulsar().getConfiguration()
                 .getMaxConcurrentNonPersistentMessagePerConnection();
+        this.proxyRoles = service.pulsar().getConfiguration().getProxyRoles();
     }
 
     @Override
@@ -180,6 +186,19 @@ public void exceptionCaught(ChannelHandlerContext ctx, 
Throwable cause) throws E
         ctx.close();
     }
 
+    private boolean validateOriginalPrincipal(String originalPrincipal, 
ByteBuf errorResponse, String topicName,
+            String msg) {
+        if (service.isAuthenticationEnabled() && 
service.isAuthorizationEnabled() && proxyRoles.contains(authRole)
+                && (StringUtils.isBlank(originalPrincipal) || 
proxyRoles.contains(originalPrincipal))) {
+            log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic 
{}", remoteAddress, msg, authRole,
+                    originalPrincipal, topicName);
+            ctx.writeAndFlush(errorResponse);
+            return false;
+        }
+
+        return true;
+    }
+    
     // ////
     // // Incoming commands handling
     // ////
@@ -196,6 +215,13 @@ protected void handleLookup(CommandLookupTopic lookup) {
         if (lookupSemaphore.tryAcquire()) {
             final String originalPrincipal = lookup.hasOriginalPrincipal() ? 
lookup.getOriginalPrincipal()
                     : this.originalPrincipal;
+            if (!validateOriginalPrincipal(originalPrincipal,
+                    newLookupErrorResponse(ServerError.AuthorizationError,
+                            "Valid Proxy Client role should be provided for 
lookup ", requestId),
+                    topicName, "Valid Proxy Client role should be provided for 
lookup ")) {
+                lookupSemaphore.release();
+                return;
+            }
             CompletableFuture<Boolean> isProxyAuthorizedFuture;
             if (service.isAuthorizationEnabled() && originalPrincipal != null) 
{
                 isProxyAuthorizedFuture = service.getAuthorizationManager()
@@ -253,9 +279,15 @@ protected void 
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
         }
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
-
             final String originalPrincipal = 
partitionMetadata.hasOriginalPrincipal()
                     ? partitionMetadata.getOriginalPrincipal() : 
this.originalPrincipal;
+            if (!validateOriginalPrincipal(originalPrincipal,
+                    
Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
+                            "Valid Proxy Client role should be provided for 
getPartitionMetadataRequest ", requestId),
+                    topicName, "Valid Proxy Client role should be provided for 
getPartitionMetadataRequest ")) {
+                lookupSemaphore.release();
+                return;
+            }
             CompletableFuture<Boolean> isProxyAuthorizedFuture;
             if (service.isAuthorizationEnabled() && originalPrincipal != null) 
{
                 isProxyAuthorizedFuture = service.getAuthorizationManager()
@@ -417,7 +449,12 @@ protected void handleSubscribe(final CommandSubscribe 
subscribe) {
         final String topicName = subscribe.getTopic();
         final long requestId = subscribe.getRequestId();
         final long consumerId = subscribe.getConsumerId();
-
+        if (!validateOriginalPrincipal(originalPrincipal,
+                Commands.newError(requestId, ServerError.AuthorizationError,
+                        "Valid Proxy Client role should be provided while 
subscribing "),
+                topicName, "Valid Proxy Client role should be provided while 
subscribing ")) {
+            return;
+        }
         CompletableFuture<Boolean> isProxyAuthorizedFuture;
         if (service.isAuthorizationEnabled() && originalPrincipal != null) {
             isProxyAuthorizedFuture = 
service.getAuthorizationManager().canConsumeAsync(DestinationName.get(topicName),
@@ -570,6 +607,13 @@ protected void handleProducer(final CommandProducer 
cmdProducer) {
         final long producerId = cmdProducer.getProducerId();
         final long requestId = cmdProducer.getRequestId();
 
+        if (!validateOriginalPrincipal(originalPrincipal,
+                Commands.newError(requestId, ServerError.AuthorizationError,
+                        "Valid Proxy Client role should be provided while 
creating producer "),
+                topicName, "Valid Proxy Client role should be provided while 
creating producer ")) {
+            return;
+        }
+        
         CompletableFuture<Boolean> isProxyAuthorizedFuture;
         if (service.isAuthorizationEnabled() && originalPrincipal != null) {
             isProxyAuthorizedFuture = 
service.getAuthorizationManager().canProduceAsync(DestinationName.get(topicName),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 058a4dae5..b604cfdad 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1341,6 +1341,7 @@ private void resetChannel() throws Exception {
             channel.close().get();
         }
         serverCnx = new ServerCnx(brokerService);
+        serverCnx.authRole = "";
         channel = new EmbeddedChannel(new 
LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4), serverCnx);
     }
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
new file mode 100644
index 000000000..73bf11f43
--- /dev/null
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import javax.naming.AuthenticationException;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class ProxyRolesEnforcementTest extends ProducerConsumerBase {
+    private static final Logger log = 
LoggerFactory.getLogger(ProxyRolesEnforcementTest.class);
+
+    public static class BasicAuthenticationData implements 
AuthenticationDataProvider {
+        private String authParam;
+
+        public BasicAuthenticationData(String authParam) {
+            this.authParam = authParam;
+        }
+        
+        public boolean hasDataFromCommand() {
+            return true;
+        }
+
+        public String getCommandData() {
+            return authParam;
+        }
+        
+        public boolean hasDataForHttp() {
+            return true;
+        }
+        
+        @Override
+        public Set<Entry<String, String>> getHttpHeaders() {
+            Map<String, String> headers = new HashMap<>();
+            headers.put("BasicAuthentication", authParam);
+            return headers.entrySet();
+        }
+    }
+    
+    public static class BasicAuthentication implements Authentication {
+
+        private String authParam;
+
+        @Override
+        public void close() throws IOException {
+            // noop
+        }
+
+        @Override
+        public String getAuthMethodName() {
+            return "BasicAuthentication";
+        }
+
+        @Override
+        public AuthenticationDataProvider getAuthData() throws 
PulsarClientException {
+            try {
+                return new BasicAuthenticationData(authParam);
+            } catch (Exception e) {
+                throw new PulsarClientException(e);
+            }
+        }
+
+        @Override
+        public void configure(Map<String, String> authParams) {
+            this.authParam = authParams.get("authParam");
+        }
+
+        @Override
+        public void start() throws PulsarClientException {
+            // noop
+        }
+    }
+    
+    public static class BasicAuthenticationProvider implements 
AuthenticationProvider {
+
+        @Override
+        public void close() throws IOException {            
+        }
+
+        @Override
+        public void initialize(ServiceConfiguration config) throws IOException 
{            
+        }
+
+        @Override
+        public String getAuthMethodName() {
+            return "BasicAuthentication";
+        }
+
+        @Override
+        public String authenticate(AuthenticationDataSource authData) throws 
AuthenticationException {
+            if (authData.hasDataFromCommand()) {
+                return authData.getCommandData();
+            } else if (authData.hasDataFromHttp()) {
+                return authData.getHttpHeader("BasicAuthentication");
+            }
+            
+            return null;
+        }
+    }
+
+    private int webServicePort;
+    private int servicePort;
+    
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        webServicePort = PortManager.nextFreePort();
+        servicePort = PortManager.nextFreePort();
+        // enable tls and auth&auth at broker 
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+        conf.setTlsEnabled(false);
+        
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+        conf.setBrokerClientAuthenticationParameters("authParam:broker");
+        
+        Set<String> superUserRoles = new HashSet<String>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        
+        Set<String> providers = new HashSet<String>();
+        providers.add(BasicAuthenticationProvider.class.getName());
+        conf.setAuthenticationProviders(providers);
+
+        conf.setClusterName("use");
+        Set<String> proxyRoles = new HashSet<String>();
+        proxyRoles.add("proxy");
+        conf.setProxyRoles(proxyRoles);
+
+        super.init();
+    }
+
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();       
+    }
+    
+    @Test
+    void testIncorrectRoles() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        // Step 1: Create Admin Client
+        createAdminClient();
+        final String proxyServiceUrl = "pulsar://localhost:" + servicePort;
+        // create a client which connects to proxy and pass authData
+        String namespaceName = "my-property/use/my-ns";
+        String topicName = "persistent://my-property/use/my-ns/my-topic1";
+        String subscriptionName = "my-subscriber-name";
+        String clientAuthParams = "authParam:client";
+        String proxyAuthParams = "authParam:proxy";
+        
+        admin.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), 
Sets.newHashSet("use")));
+        admin.namespaces().createNamespace(namespaceName);
+        
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy", 
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "client", 
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+        // Step 2: Try to use proxy Client as a normal Client - expect 
exception
+        PulsarClient proxyClient = createPulsarClient("pulsar://localhost:" + 
BROKER_PORT, proxyAuthParams);
+        ConsumerConfiguration consumerConf = new ConsumerConfiguration();
+        consumerConf.setSubscriptionType(SubscriptionType.Exclusive);
+        Consumer consumer;
+        boolean exceptionOccured = false;
+        try {
+            consumer = proxyClient.subscribe(topicName, subscriptionName,
+                consumerConf);
+        } catch(Exception ex) {
+            exceptionOccured = true;
+        }         
+        Assert.assertTrue(exceptionOccured);
+        
+        // Step 4: Run Pulsar Proxy and pass proxy params as client params - 
expect exception
+        ProxyConfiguration proxyConfig = new ProxyConfiguration();
+        proxyConfig.setAuthenticationEnabled(true);
+
+        proxyConfig.setServicePort(servicePort);
+        proxyConfig.setWebServicePort(webServicePort);
+        proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
+        
+        
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+        proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(BasicAuthenticationProvider.class.getName());
+        proxyConfig.setAuthenticationProviders(providers);
+        ProxyService proxyService = new ProxyService(proxyConfig);
+
+        proxyService.start();
+        proxyClient = createPulsarClient(proxyServiceUrl, proxyAuthParams);
+        try {
+            consumer = proxyClient.subscribe(topicName, subscriptionName,
+                consumerConf);
+        } catch(Exception ex) {
+            exceptionOccured = true;
+        } 
+        
+        Assert.assertTrue(exceptionOccured);
+        
+        // Step 4: Pass correct client params 
+        proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams);
+        consumer = proxyClient.subscribe(topicName, subscriptionName,
+                consumerConf);
+        proxyService.close();
+    }
+
+    private void createAdminClient() throws PulsarClientException {
+        String adminAuthParams = "authParam:admin";
+        org.apache.pulsar.client.api.ClientConfiguration clientConf = new 
org.apache.pulsar.client.api.ClientConfiguration();
+        clientConf.setAuthentication(BasicAuthentication.class.getName(), 
adminAuthParams);
+
+        admin = spy(new PulsarAdmin(brokerUrl, clientConf));        
+    }
+    
+    private PulsarClient createPulsarClient(String proxyServiceUrl, String 
authParams) throws PulsarClientException {
+        org.apache.pulsar.client.api.ClientConfiguration clientConf = new 
org.apache.pulsar.client.api.ClientConfiguration();
+        clientConf.setAuthentication(BasicAuthentication.class.getName(), 
authParams);
+        return PulsarClient.create(proxyServiceUrl, clientConf);
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to