This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fd3ce8b5786 [cleanup][broker] Validate originalPrincipal earlier in
ServerCnx (#19270)
fd3ce8b5786 is described below
commit fd3ce8b5786baf0b76f301bd9597cd0b99a412f1
Author: Michael Marshall <[email protected]>
AuthorDate: Mon Feb 6 08:02:26 2023 -0600
[cleanup][broker] Validate originalPrincipal earlier in ServerCnx (#19270)
---
.../apache/pulsar/broker/service/ServerCnx.java | 86 ++++++++--------------
.../pulsar/broker/service/ServerCnxTest.java | 36 +++++++++
2 files changed, 66 insertions(+), 56 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index e355f87581b..988f7d34e99 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -397,17 +397,30 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
ctx.close();
}
- /*
- * If authentication and authorization is enabled(and not sasl) and
- * if the authRole is one of proxyRoles we want to enforce
+ /**
+ * When transitioning from Connecting to Connected, this method validates
the roles.
+ * If the authRole is one of proxyRoles, the following must be true:
* - the originalPrincipal is given while connecting
* - originalPrincipal is not blank
- * - originalPrincipal is not a proxy principal
+ * - originalPrincipal is not a proxy principal.
+ * @return true when roles are valid and false when roles are invalid
*/
- private boolean invalidOriginalPrincipal(String originalPrincipal) {
- return (service.isAuthenticationEnabled() &&
service.isAuthorizationEnabled()
- && proxyRoles.contains(authRole) &&
(StringUtils.isBlank(originalPrincipal)
- || proxyRoles.contains(originalPrincipal)));
+ private boolean isValidRoleAndOriginalPrincipal() {
+ String errorMsg = null;
+ if (proxyRoles.contains(authRole)) {
+ if (StringUtils.isBlank(originalPrincipal)) {
+ errorMsg = "originalPrincipal must be provided when connecting
with a proxy role.";
+ } else if (proxyRoles.contains(originalPrincipal)) {
+ errorMsg = "originalPrincipal cannot be a proxy role.";
+ }
+ }
+ if (errorMsg != null) {
+ log.warn("[{}] Illegal combination of role [{}] and
originalPrincipal [{}]: {}", remoteAddress, authRole,
+ originalPrincipal, errorMsg);
+ return false;
+ } else {
+ return true;
+ }
}
// ////
@@ -489,14 +502,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
- if (invalidOriginalPrincipal(originalPrincipal)) {
- final String msg = "Valid Proxy Client role should be provided
for lookup ";
- log.warn("[{}] {} with role {} and proxyClientAuthRole {} on
topic {}", remoteAddress, msg, authRole,
- originalPrincipal, topicName);
-
writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg,
requestId));
- lookupSemaphore.release();
- return;
- }
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP,
authenticationData, originalAuthData).thenApply(
isAuthorized -> {
if (isAuthorized) {
@@ -570,14 +575,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
- if (invalidOriginalPrincipal(originalPrincipal)) {
- final String msg = "Valid Proxy Client role should be provided
for getPartitionMetadataRequest ";
- log.warn("[{}] {} with role {} and proxyClientAuthRole {} on
topic {}", remoteAddress, msg, authRole,
- originalPrincipal, topicName);
-
commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError,
msg, requestId);
- lookupSemaphore.release();
- return;
- }
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP,
authenticationData, originalAuthData).thenApply(
isAuthorized -> {
if (isAuthorized) {
@@ -693,6 +690,15 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
// complete the connect and sent newConnected command
private void completeConnect(int clientProtoVersion, String clientVersion)
{
+ if (service.isAuthenticationEnabled() &&
service.isAuthorizationEnabled()) {
+ if (!isValidRoleAndOriginalPrincipal()) {
+ state = State.Failed;
+ service.getPulsarStats().recordConnectionCreateFail();
+ final ByteBuf msg = Commands.newError(-1,
ServerError.AuthorizationError, "Invalid roles.");
+ NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
+ return;
+ }
+ }
writeAndFlush(Commands.newConnected(clientProtoVersion,
maxMessageSize, enableSubscriptionPatternEvaluation));
state = State.Connected;
service.getPulsarStats().recordConnectionCreateSuccess();
@@ -1065,14 +1071,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
remoteAddress, authRole, originalPrincipal);
}
- if (invalidOriginalPrincipal(originalPrincipal)) {
- final String msg = "Valid Proxy Client role should be provided
while subscribing ";
- log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic
{}", remoteAddress, msg, authRole,
- originalPrincipal, topicName);
- commandSender.sendErrorResponse(requestId,
ServerError.AuthorizationError, msg);
- return;
- }
-
final String subscriptionName = subscribe.getSubscription();
final SubType subType = subscribe.getSubType();
final String consumerName = subscribe.hasConsumerName() ?
subscribe.getConsumerName() : "";
@@ -1318,14 +1316,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
- if (invalidOriginalPrincipal(originalPrincipal)) {
- final String msg = "Valid Proxy Client role should be provided
while creating producer ";
- log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic
{}", remoteAddress, msg, authRole,
- originalPrincipal, topicName);
- commandSender.sendErrorResponse(requestId,
ServerError.AuthorizationError, msg);
- return;
- }
-
CompletableFuture<Boolean> isAuthorizedFuture =
isTopicOperationAllowed(
topicName, TopicOperation.PRODUCE, authenticationData,
originalAuthData
);
@@ -2169,14 +2159,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
- if (invalidOriginalPrincipal(originalPrincipal)) {
- final String msg = "Valid Proxy Client role should be provided
for getTopicsOfNamespaceRequest ";
- log.warn("[{}] {} with role {} and proxyClientAuthRole {} on
namespace {}", remoteAddress, msg,
- authRole, originalPrincipal, namespaceName);
- commandSender.sendErrorResponse(requestId,
ServerError.AuthorizationError, msg);
- lookupSemaphore.release();
- return;
- }
isNamespaceOperationAllowed(namespaceName,
NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
if (isAuthorized) {
getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName,
mode)
@@ -2735,14 +2717,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
- if (invalidOriginalPrincipal(originalPrincipal)) {
- final String msg = "Valid Proxy Client role should be provided
for watchTopicListRequest ";
- log.warn("[{}] {} with role {} and proxyClientAuthRole {} on
namespace {}", remoteAddress, msg,
- authRole, originalPrincipal, namespaceName);
- commandSender.sendErrorResponse(watcherId,
ServerError.AuthorizationError, msg);
- lookupSemaphore.release();
- return;
- }
isNamespaceOperationAllowed(namespaceName,
NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
if (isAuthorized) {
topicListService.handleWatchTopicList(namespaceName,
watcherId, requestId, topicsPattern,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index bd4adef2cd1..2ee30ed1a81 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -583,6 +583,42 @@ public class ServerCnxTest {
channel.finish();
}
+ @Test(timeOut = 30000)
+ public void testConnectCommandWithInvalidRoleCombinations() throws
Exception {
+ AuthenticationService authenticationService =
mock(AuthenticationService.class);
+ AuthenticationProvider authenticationProvider = new
MockAuthenticationProvider();
+ String authMethodName = authenticationProvider.getAuthMethodName();
+
+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+ svcConfig.setAuthenticationEnabled(true);
+ svcConfig.setAuthenticateOriginalAuthData(false);
+ svcConfig.setAuthorizationEnabled(true);
+ svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));
+
+ // Invalid combinations where authData is proxy role
+ verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName,
"pass.proxy", "pass.proxy");
+ verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName,
"pass.proxy", "");
+ verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName,
"pass.proxy", null);
+ }
+
+ private void verifyAuthRoleAndOriginalPrincipalBehavior(String
authMethodName, String authData,
+ String
originalPrincipal) throws Exception {
+ resetChannel();
+ assertTrue(channel.isActive());
+ assertEquals(serverCnx.getState(), State.Start);
+
+ ByteBuf clientCommand = Commands.newConnect(authMethodName, authData,
1,null,
+ null, originalPrincipal, null, null);
+ channel.writeInbound(clientCommand);
+
+ Object response = getResponse();
+ assertTrue(response instanceof CommandError);
+ assertEquals(((CommandError) response).getError(),
ServerError.AuthorizationError);
+ assertEquals(serverCnx.getState(), State.Failed);
+ channel.finish();
+ }
+
@Test(timeOut = 30000)
public void testConnectCommandWithAuthenticationNegative() throws
Exception {
AuthenticationService authenticationService =
mock(AuthenticationService.class);