jai1 closed pull request #1168: Proxy roles enforcement
URL: https://github.com/apache/incubator-pulsar/pull/1168
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 0cc40011b..b542ee416 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -193,6 +193,10 @@
// do all admin operations and publish/consume from all topics
private Set<String> superUserRoles = Sets.newTreeSet();
+ // Role names that are treated as "proxy roles". If the broker sees a
request with
+ // role as proxyRoles - it will demand to see the original client role or
certificate.
+ private Set<String> proxyRoles = Sets.newTreeSet();
+
// Allow wildcard matching in authorization
// (wildcard matching only applicable if wildcard-char:
// * presents at first or last position eg: *.pulsar.service,
pulsar.service.*)
@@ -794,7 +798,15 @@ public void setAuthorizationEnabled(boolean
authorizationEnabled) {
public Set<String> getSuperUserRoles() {
return superUserRoles;
}
-
+
+ public Set<String> getProxyRoles() {
+ return proxyRoles;
+ }
+
+ public void setProxyRoles(Set<String> proxyRoles) {
+ this.proxyRoles = proxyRoles;
+ }
+
public boolean getAuthorizationAllowWildcardsMatching() {
return authorizationAllowWildcardsMatching;
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
index 2bf7ce6df..0950ae27a 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
@@ -171,9 +171,12 @@ public boolean canLookup(DestinationName destination,
String role) throws Except
finalResult.complete(produceAuthorized);
return;
}
- } else if (log.isDebugEnabled()) {
- log.debug("Destination [{}] Role [{}] exception occured while
trying to check Produce permissions. {}",
- destination.toString(), role, ex.getMessage());
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Destination [{}] Role [{}] exception occured
while trying to check Produce permissions. {}",
+ destination.toString(), role, ex.getMessage());
+ }
}
canConsumeAsync(destination, role,
null).whenComplete((consumeAuthorized, e) -> {
if (e == null) {
@@ -181,10 +184,15 @@ public boolean canLookup(DestinationName destination,
String role) throws Except
finalResult.complete(consumeAuthorized);
return;
}
- } else if (log.isDebugEnabled()) {
- log.debug(
- "Destination [{}] Role [{}] exception occured
while trying to check Consume permissions. {}",
- destination.toString(), role, e.getMessage());
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Destination [{}] Role [{}] exception occured
while trying to check Consume permissions. {}",
+ destination.toString(), role, e.getMessage());
+
+ }
+ finalResult.completeExceptionally(e);
+ return;
}
finalResult.complete(false);
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 8888c97e6..dfa4fd7ae 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -27,6 +27,7 @@
import java.net.SocketAddress;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -37,6 +38,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
@@ -77,6 +79,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Sets;
+
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
@@ -89,7 +93,7 @@
private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
private State state;
private volatile boolean isActive = true;
- private String authRole = null;
+ String authRole = null;
// Max number of pending requests per connections. If multiple producers
are sharing the same connection the flow
// control done by a single producer might not be enough to prevent write
spikes on the broker.
@@ -101,7 +105,8 @@
private int nonPersistentPendingMessages = 0;
private final int MaxNonPersistentPendingMessages;
private String originalPrincipal = null;
-
+ private Set<String> proxyRoles = Sets.newHashSet();
+
enum State {
Start, Connected, Failed
}
@@ -117,6 +122,7 @@ public ServerCnx(BrokerService service) {
this.replicatorPrefix =
service.pulsar().getConfiguration().getReplicatorPrefix();
this.MaxNonPersistentPendingMessages =
service.pulsar().getConfiguration()
.getMaxConcurrentNonPersistentMessagePerConnection();
+ this.proxyRoles = service.pulsar().getConfiguration().getProxyRoles();
}
@Override
@@ -180,6 +186,19 @@ public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws E
ctx.close();
}
+ private boolean validateOriginalPrincipal(String originalPrincipal,
ByteBuf errorResponse, String topicName,
+ String msg) {
+ if (service.isAuthenticationEnabled() &&
service.isAuthorizationEnabled() && proxyRoles.contains(authRole)
+ && (StringUtils.isBlank(originalPrincipal) ||
proxyRoles.contains(originalPrincipal))) {
+ log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic
{}", remoteAddress, msg, authRole,
+ originalPrincipal, topicName);
+ ctx.writeAndFlush(errorResponse);
+ return false;
+ }
+
+ return true;
+ }
+
// ////
// // Incoming commands handling
// ////
@@ -196,6 +215,13 @@ protected void handleLookup(CommandLookupTopic lookup) {
if (lookupSemaphore.tryAcquire()) {
final String originalPrincipal = lookup.hasOriginalPrincipal() ?
lookup.getOriginalPrincipal()
: this.originalPrincipal;
+ if (!validateOriginalPrincipal(originalPrincipal,
+ newLookupErrorResponse(ServerError.AuthorizationError,
+ "Valid Proxy Client role should be provided for
lookup ", requestId),
+ topicName, "Valid Proxy Client role should be provided for
lookup ")) {
+ lookupSemaphore.release();
+ return;
+ }
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null)
{
isProxyAuthorizedFuture = service.getAuthorizationManager()
@@ -253,9 +279,15 @@ protected void
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
}
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
-
final String originalPrincipal =
partitionMetadata.hasOriginalPrincipal()
? partitionMetadata.getOriginalPrincipal() :
this.originalPrincipal;
+ if (!validateOriginalPrincipal(originalPrincipal,
+
Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
+ "Valid Proxy Client role should be provided for
getPartitionMetadataRequest ", requestId),
+ topicName, "Valid Proxy Client role should be provided for
getPartitionMetadataRequest ")) {
+ lookupSemaphore.release();
+ return;
+ }
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null)
{
isProxyAuthorizedFuture = service.getAuthorizationManager()
@@ -417,7 +449,12 @@ protected void handleSubscribe(final CommandSubscribe
subscribe) {
final String topicName = subscribe.getTopic();
final long requestId = subscribe.getRequestId();
final long consumerId = subscribe.getConsumerId();
-
+ if (!validateOriginalPrincipal(originalPrincipal,
+ Commands.newError(requestId, ServerError.AuthorizationError,
+ "Valid Proxy Client role should be provided while
subscribing "),
+ topicName, "Valid Proxy Client role should be provided while
subscribing ")) {
+ return;
+ }
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
isProxyAuthorizedFuture =
service.getAuthorizationManager().canConsumeAsync(DestinationName.get(topicName),
@@ -570,6 +607,13 @@ protected void handleProducer(final CommandProducer
cmdProducer) {
final long producerId = cmdProducer.getProducerId();
final long requestId = cmdProducer.getRequestId();
+ if (!validateOriginalPrincipal(originalPrincipal,
+ Commands.newError(requestId, ServerError.AuthorizationError,
+ "Valid Proxy Client role should be provided while
creating producer "),
+ topicName, "Valid Proxy Client role should be provided while
creating producer ")) {
+ return;
+ }
+
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
isProxyAuthorizedFuture =
service.getAuthorizationManager().canProduceAsync(DestinationName.get(topicName),
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 058a4dae5..b604cfdad 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1341,6 +1341,7 @@ private void resetChannel() throws Exception {
channel.close().get();
}
serverCnx = new ServerCnx(brokerService);
+ serverCnx.authRole = "";
channel = new EmbeddedChannel(new
LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4), serverCnx);
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
new file mode 100644
index 000000000..73bf11f43
--- /dev/null
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import javax.naming.AuthenticationException;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class ProxyRolesEnforcementTest extends ProducerConsumerBase {
+ private static final Logger log =
LoggerFactory.getLogger(ProxyRolesEnforcementTest.class);
+
+ public static class BasicAuthenticationData implements
AuthenticationDataProvider {
+ private String authParam;
+
+ public BasicAuthenticationData(String authParam) {
+ this.authParam = authParam;
+ }
+
+ public boolean hasDataFromCommand() {
+ return true;
+ }
+
+ public String getCommandData() {
+ return authParam;
+ }
+
+ public boolean hasDataForHttp() {
+ return true;
+ }
+
+ @Override
+ public Set<Entry<String, String>> getHttpHeaders() {
+ Map<String, String> headers = new HashMap<>();
+ headers.put("BasicAuthentication", authParam);
+ return headers.entrySet();
+ }
+ }
+
+ public static class BasicAuthentication implements Authentication {
+
+ private String authParam;
+
+ @Override
+ public void close() throws IOException {
+ // noop
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return "BasicAuthentication";
+ }
+
+ @Override
+ public AuthenticationDataProvider getAuthData() throws
PulsarClientException {
+ try {
+ return new BasicAuthenticationData(authParam);
+ } catch (Exception e) {
+ throw new PulsarClientException(e);
+ }
+ }
+
+ @Override
+ public void configure(Map<String, String> authParams) {
+ this.authParam = authParams.get("authParam");
+ }
+
+ @Override
+ public void start() throws PulsarClientException {
+ // noop
+ }
+ }
+
+ public static class BasicAuthenticationProvider implements
AuthenticationProvider {
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void initialize(ServiceConfiguration config) throws IOException
{
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return "BasicAuthentication";
+ }
+
+ @Override
+ public String authenticate(AuthenticationDataSource authData) throws
AuthenticationException {
+ if (authData.hasDataFromCommand()) {
+ return authData.getCommandData();
+ } else if (authData.hasDataFromHttp()) {
+ return authData.getHttpHeader("BasicAuthentication");
+ }
+
+ return null;
+ }
+ }
+
+ private int webServicePort;
+ private int servicePort;
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ webServicePort = PortManager.nextFreePort();
+ servicePort = PortManager.nextFreePort();
+ // enable tls and auth&auth at broker
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthorizationEnabled(true);
+ conf.setTlsEnabled(false);
+
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+ conf.setBrokerClientAuthenticationParameters("authParam:broker");
+
+ Set<String> superUserRoles = new HashSet<String>();
+ superUserRoles.add("admin");
+ conf.setSuperUserRoles(superUserRoles);
+
+ Set<String> providers = new HashSet<String>();
+ providers.add(BasicAuthenticationProvider.class.getName());
+ conf.setAuthenticationProviders(providers);
+
+ conf.setClusterName("use");
+ Set<String> proxyRoles = new HashSet<String>();
+ proxyRoles.add("proxy");
+ conf.setProxyRoles(proxyRoles);
+
+ super.init();
+ }
+
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ void testIncorrectRoles() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ // Step 1: Create Admin Client
+ createAdminClient();
+ final String proxyServiceUrl = "pulsar://localhost:" + servicePort;
+ // create a client which connects to proxy and pass authData
+ String namespaceName = "my-property/use/my-ns";
+ String topicName = "persistent://my-property/use/my-ns/my-topic1";
+ String subscriptionName = "my-subscriber-name";
+ String clientAuthParams = "authParam:client";
+ String proxyAuthParams = "authParam:proxy";
+
+ admin.properties().createProperty("my-property",
+ new PropertyAdmin(Lists.newArrayList("appid1", "appid2"),
Sets.newHashSet("use")));
+ admin.namespaces().createNamespace(namespaceName);
+
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "client",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+ // Step 2: Try to use proxy Client as a normal Client - expect
exception
+ PulsarClient proxyClient = createPulsarClient("pulsar://localhost:" +
BROKER_PORT, proxyAuthParams);
+ ConsumerConfiguration consumerConf = new ConsumerConfiguration();
+ consumerConf.setSubscriptionType(SubscriptionType.Exclusive);
+ Consumer consumer;
+ boolean exceptionOccured = false;
+ try {
+ consumer = proxyClient.subscribe(topicName, subscriptionName,
+ consumerConf);
+ } catch(Exception ex) {
+ exceptionOccured = true;
+ }
+ Assert.assertTrue(exceptionOccured);
+
+ // Step 4: Run Pulsar Proxy and pass proxy params as client params -
expect exception
+ ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ proxyConfig.setAuthenticationEnabled(true);
+
+ proxyConfig.setServicePort(servicePort);
+ proxyConfig.setWebServicePort(webServicePort);
+ proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
+
+
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+ proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
+
+ Set<String> providers = new HashSet<>();
+ providers.add(BasicAuthenticationProvider.class.getName());
+ proxyConfig.setAuthenticationProviders(providers);
+ ProxyService proxyService = new ProxyService(proxyConfig);
+
+ proxyService.start();
+ proxyClient = createPulsarClient(proxyServiceUrl, proxyAuthParams);
+ try {
+ consumer = proxyClient.subscribe(topicName, subscriptionName,
+ consumerConf);
+ } catch(Exception ex) {
+ exceptionOccured = true;
+ }
+
+ Assert.assertTrue(exceptionOccured);
+
+ // Step 4: Pass correct client params
+ proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams);
+ consumer = proxyClient.subscribe(topicName, subscriptionName,
+ consumerConf);
+ proxyService.close();
+ }
+
+ private void createAdminClient() throws PulsarClientException {
+ String adminAuthParams = "authParam:admin";
+ org.apache.pulsar.client.api.ClientConfiguration clientConf = new
org.apache.pulsar.client.api.ClientConfiguration();
+ clientConf.setAuthentication(BasicAuthentication.class.getName(),
adminAuthParams);
+
+ admin = spy(new PulsarAdmin(brokerUrl, clientConf));
+ }
+
+ private PulsarClient createPulsarClient(String proxyServiceUrl, String
authParams) throws PulsarClientException {
+ org.apache.pulsar.client.api.ClientConfiguration clientConf = new
org.apache.pulsar.client.api.ClientConfiguration();
+ clientConf.setAuthentication(BasicAuthentication.class.getName(),
authParams);
+ return PulsarClient.create(proxyServiceUrl, clientConf);
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services