This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 01bd9860dbb [cleanup][broker] Validate originalPrincipal earlier in
ServerCnx (#19270)
01bd9860dbb is described below
commit 01bd9860dbb272509bbc13bd71a3983715f84a2a
Author: Michael Marshall <[email protected]>
AuthorDate: Tue Feb 14 12:44:18 2023 -0600
[cleanup][broker] Validate originalPrincipal earlier in ServerCnx (#19270)
(cherry picked from commit fd3ce8b5786baf0b76f301bd9597cd0b99a412f1)
(cherry picked from commit 2847dd19f6e8a546f4d45bf51eb2b72aae0869ce)
---
.../apache/pulsar/broker/service/ServerCnx.java | 78 +++++++++-------------
.../pulsar/broker/service/ServerCnxTest.java | 37 ++++++++++
2 files changed, 67 insertions(+), 48 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index fd73156ae2d..be2386bd369 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -367,17 +367,30 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
ctx.close();
}
- /*
- * If authentication and authorization is enabled(and not sasl) and
- * if the authRole is one of proxyRoles we want to enforce
+ /**
+ * When transitioning from Connecting to Connected, this method validates
the roles.
+ * If the authRole is one of proxyRoles, the following must be true:
* - the originalPrincipal is given while connecting
* - originalPrincipal is not blank
- * - originalPrincipal is not a proxy principal
+ * - originalPrincipal is not a proxy principal.
+ * @return true when roles are valid and false when roles are invalid
*/
- private boolean invalidOriginalPrincipal(String originalPrincipal) {
- return (service.isAuthenticationEnabled() &&
service.isAuthorizationEnabled()
- && proxyRoles.contains(authRole) &&
(StringUtils.isBlank(originalPrincipal)
- || proxyRoles.contains(originalPrincipal)));
+ private boolean isValidRoleAndOriginalPrincipal() {
+ String errorMsg = null;
+ if (proxyRoles.contains(authRole)) {
+ if (StringUtils.isBlank(originalPrincipal)) {
+ errorMsg = "originalPrincipal must be provided when connecting
with a proxy role.";
+ } else if (proxyRoles.contains(originalPrincipal)) {
+ errorMsg = "originalPrincipal cannot be a proxy role.";
+ }
+ }
+ if (errorMsg != null) {
+ log.warn("[{}] Illegal combination of role [{}] and
originalPrincipal [{}]: {}", remoteAddress, authRole,
+ originalPrincipal, errorMsg);
+ return false;
+ } else {
+ return true;
+ }
}
// ////
@@ -448,14 +461,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
- if (invalidOriginalPrincipal(originalPrincipal)) {
- final String msg = "Valid Proxy Client role should be provided
for lookup ";
- log.warn("[{}] {} with role {} and proxyClientAuthRole {} on
topic {}", remoteAddress, msg, authRole,
- originalPrincipal, topicName);
-
ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg,
requestId));
- lookupSemaphore.release();
- return;
- }
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP,
authenticationData, originalAuthData).thenApply(
isAuthorized -> {
if (isAuthorized) {
@@ -512,14 +517,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
- if (invalidOriginalPrincipal(originalPrincipal)) {
- final String msg = "Valid Proxy Client role should be provided
for getPartitionMetadataRequest ";
- log.warn("[{}] {} with role {} and proxyClientAuthRole {} on
topic {}", remoteAddress, msg, authRole,
- originalPrincipal, topicName);
-
commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError,
msg, requestId);
- lookupSemaphore.release();
- return;
- }
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP,
authenticationData, originalAuthData).thenApply(
isAuthorized -> {
if (isAuthorized) {
@@ -629,6 +626,15 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
// complete the connect and sent newConnected command
private void completeConnect(int clientProtoVersion, String clientVersion)
{
+ if (service.isAuthenticationEnabled() &&
service.isAuthorizationEnabled()) {
+ if (!isValidRoleAndOriginalPrincipal()) {
+ state = State.Failed;
+ service.getPulsarStats().recordConnectionCreateFail();
+ final ByteBuf msg = Commands.newError(-1,
ServerError.AuthorizationError, "Invalid roles.");
+
ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
+ return;
+ }
+ }
ctx.writeAndFlush(Commands.newConnected(clientProtoVersion,
maxMessageSize));
state = State.Connected;
service.getPulsarStats().recordConnectionCreateSuccess();
@@ -942,14 +948,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
remoteAddress, authRole, originalPrincipal);
}
- if (invalidOriginalPrincipal(originalPrincipal)) {
- final String msg = "Valid Proxy Client role should be provided
while subscribing ";
- log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic
{}", remoteAddress, msg, authRole,
- originalPrincipal, topicName);
- commandSender.sendErrorResponse(requestId,
ServerError.AuthorizationError, msg);
- return;
- }
-
final String subscriptionName = subscribe.getSubscription();
final SubType subType = subscribe.getSubType();
final String consumerName = subscribe.hasConsumerName() ?
subscribe.getConsumerName() : "";
@@ -1190,14 +1188,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
- if (invalidOriginalPrincipal(originalPrincipal)) {
- final String msg = "Valid Proxy Client role should be provided
while creating producer ";
- log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic
{}", remoteAddress, msg, authRole,
- originalPrincipal, topicName);
- commandSender.sendErrorResponse(requestId,
ServerError.AuthorizationError, msg);
- return;
- }
-
CompletableFuture<Boolean> isAuthorizedFuture =
isTopicOperationAllowed(
topicName, TopicOperation.PRODUCE, authenticationData,
originalAuthData
);
@@ -2012,14 +2002,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
- if (invalidOriginalPrincipal(originalPrincipal)) {
- final String msg = "Valid Proxy Client role should be provided
for getTopicsOfNamespaceRequest ";
- log.warn("[{}] {} with role {} and proxyClientAuthRole {} on
namespace {}", remoteAddress, msg,
- authRole, originalPrincipal, namespaceName);
- commandSender.sendErrorResponse(requestId,
ServerError.AuthorizationError, msg);
- lookupSemaphore.release();
- return;
- }
isNamespaceOperationAllowed(namespaceName,
NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
if (isAuthorized) {
getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName,
mode)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index ee1789b322f..34cc3ba2232 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -72,6 +72,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -398,6 +399,42 @@ public class ServerCnxTest {
channel.finish();
}
+ @Test(timeOut = 30000)
+ public void testConnectCommandWithInvalidRoleCombinations() throws
Exception {
+ AuthenticationService authenticationService =
mock(AuthenticationService.class);
+ AuthenticationProvider authenticationProvider = new
MockAuthenticationProvider();
+ String authMethodName = authenticationProvider.getAuthMethodName();
+
+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+ svcConfig.setAuthenticationEnabled(true);
+ svcConfig.setAuthenticateOriginalAuthData(false);
+ svcConfig.setAuthorizationEnabled(true);
+ svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));
+
+ // Invalid combinations where authData is proxy role
+ verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName,
"pass.proxy", "pass.proxy");
+ verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName,
"pass.proxy", "");
+ verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName,
"pass.proxy", null);
+ }
+
+ private void verifyAuthRoleAndOriginalPrincipalBehavior(String
authMethodName, String authData,
+ String
originalPrincipal) throws Exception {
+ resetChannel();
+ assertTrue(channel.isActive());
+ assertEquals(serverCnx.getState(), State.Start);
+
+ ByteBuf clientCommand = Commands.newConnect(authMethodName, authData,
1,null,
+ null, originalPrincipal, null, null);
+ channel.writeInbound(clientCommand);
+
+ Object response = getResponse();
+ assertTrue(response instanceof CommandError);
+ assertEquals(((CommandError) response).getError(),
ServerError.AuthorizationError);
+ assertEquals(serverCnx.getState(), State.Failed);
+ channel.finish();
+ }
+
@Test(timeOut = 30000)
public void testConnectCommandWithAuthenticationNegative() throws
Exception {
AuthenticationService authenticationService =
mock(AuthenticationService.class);