This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new aa247ad7276 [feat][broker] PIP 97: Implement for ServerCnx (#19409)
aa247ad7276 is described below
commit aa247ad72760ae95f3e7f1a969a6d83121319472
Author: Michael Marshall <[email protected]>
AuthorDate: Sat Feb 4 02:40:42 2023 -0600
[feat][broker] PIP 97: Implement for ServerCnx (#19409)
---
.../apache/pulsar/broker/service/ServerCnx.java | 222 +++++-----
.../broker/service/ServerCnxAuthorizationTest.java | 400 ------------------
.../pulsar/broker/service/ServerCnxTest.java | 445 +++++++++++++++++++--
.../apache/pulsar/sql/presto/TestPulsarAuth.java | 2 +-
.../integration/presto/TestPulsarSQLAuth.java | 2 +-
5 files changed, 553 insertions(+), 518 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 56f3f07fd8c..e355f87581b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -194,6 +194,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
// it will hold the credentials of the original client
private AuthenticationState originalAuthState;
private AuthenticationDataSource originalAuthData;
+ // Keep temporarily in order to verify after verifying proxy's authData
+ private AuthData originalAuthDataCopy;
private boolean pendingAuthChallengeResponse = false;
// Max number of pending requests per connections. If multiple producers
are sharing the same connection the flow
@@ -690,8 +692,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
// complete the connect and sent newConnected command
- private void completeConnect(int clientProtoVersion, String clientVersion,
boolean supportsTopicWatchers) {
- writeAndFlush(Commands.newConnected(clientProtoVersion,
maxMessageSize, supportsTopicWatchers));
+ private void completeConnect(int clientProtoVersion, String clientVersion)
{
+ writeAndFlush(Commands.newConnected(clientProtoVersion,
maxMessageSize, enableSubscriptionPatternEvaluation));
state = State.Connected;
service.getPulsarStats().recordConnectionCreateSuccess();
if (log.isDebugEnabled()) {
@@ -706,74 +708,135 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
}
- // According to auth result, send newConnected or newAuthChallenge command.
- private State doAuthentication(AuthData clientData,
- int clientProtocolVersion,
- String clientVersion) throws Exception {
-
+ // According to auth result, send Connected, AuthChallenge, or Error
command.
+ private void doAuthentication(AuthData clientData,
+ boolean useOriginalAuthState,
+ int clientProtocolVersion,
+ final String clientVersion) {
// The original auth state can only be set on subsequent auth attempts
(and only
// in presence of a proxy and if the proxy is forwarding the
credentials).
// In this case, the re-validation needs to be done against the
original client
// credentials.
- boolean useOriginalAuthState = (originalAuthState != null);
- AuthenticationState authState = useOriginalAuthState ?
originalAuthState : this.authState;
+ AuthenticationState authState = useOriginalAuthState ?
originalAuthState : this.authState;
String authRole = useOriginalAuthState ? originalPrincipal :
this.authRole;
- AuthData brokerData = authState.authenticate(clientData);
-
if (log.isDebugEnabled()) {
log.debug("Authenticate using original auth state : {}, role =
{}", useOriginalAuthState, authRole);
}
+ authState
+ .authenticateAsync(clientData)
+ .whenCompleteAsync((authChallenge, throwable) -> {
+ if (throwable == null) {
+ authChallengeSuccessCallback(authChallenge,
useOriginalAuthState, authRole,
+ clientProtocolVersion, clientVersion);
+ } else {
+ authenticationFailed(throwable);
+ }
+ }, ctx.executor());
+ }
- if (authState.isComplete()) {
- // Authentication has completed. It was either:
- // 1. the 1st time the authentication process was done, in which
case we'll send
- // a `CommandConnected` response
- // 2. an authentication refresh, in which case we need to refresh
authenticationData
-
- String newAuthRole = authState.getAuthRole();
-
- // Refresh the auth data.
- this.authenticationData = authState.getAuthDataSource();
- if (log.isDebugEnabled()) {
- log.debug("[{}] Auth data refreshed for role={}",
remoteAddress, this.authRole);
- }
+ public void authChallengeSuccessCallback(AuthData authChallenge,
+ boolean useOriginalAuthState,
+ String authRole,
+ int clientProtocolVersion,
+ String clientVersion) {
+ try {
+ if (authChallenge == null) {
+ // Authentication has completed. It was either:
+ // 1. the 1st time the authentication process was done, in
which case we'll send
+ // a `CommandConnected` response
+ // 2. an authentication refresh, in which case we need to
refresh authenticationData
+ AuthenticationState authState = useOriginalAuthState ?
originalAuthState : this.authState;
+ String newAuthRole = authState.getAuthRole();
+
+ // Refresh the auth data.
+ this.authenticationData = authState.getAuthDataSource();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Auth data refreshed for role={}",
remoteAddress, this.authRole);
+ }
- if (!useOriginalAuthState) {
- this.authRole = newAuthRole;
- }
+ if (!useOriginalAuthState) {
+ this.authRole = newAuthRole;
+ }
- if (log.isDebugEnabled()) {
- log.debug("[{}] Client successfully authenticated with {} role
{} and originalPrincipal {}",
- remoteAddress, authMethod, this.authRole,
originalPrincipal);
- }
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Client successfully authenticated with {}
role {} and originalPrincipal {}",
+ remoteAddress, authMethod, this.authRole,
originalPrincipal);
+ }
- if (state != State.Connected) {
- // First time authentication is done
- completeConnect(clientProtocolVersion, clientVersion,
enableSubscriptionPatternEvaluation);
- } else {
- // If the connection was already ready, it means we're doing a
refresh
- if (!StringUtils.isEmpty(authRole)) {
- if (!authRole.equals(newAuthRole)) {
- log.warn("[{}] Principal cannot change during an
authentication refresh expected={} got={}",
- remoteAddress, authRole, newAuthRole);
- ctx.close();
+ if (state != State.Connected) {
+ // First time authentication is done
+ if (originalAuthState != null) {
+ // We only set originalAuthState when we are going to
use it.
+ authenticateOriginalData(clientProtocolVersion,
clientVersion);
} else {
- log.info("[{}] Refreshed authentication credentials
for role {}", remoteAddress, authRole);
+ completeConnect(clientProtocolVersion, clientVersion);
}
+ } else {
+ // If the connection was already ready, it means we're
doing a refresh
+ if (!StringUtils.isEmpty(authRole)) {
+ if (!authRole.equals(newAuthRole)) {
+ log.warn("[{}] Principal cannot change during an
authentication refresh expected={} got={}",
+ remoteAddress, authRole, newAuthRole);
+ ctx.close();
+ } else {
+ log.info("[{}] Refreshed authentication
credentials for role {}", remoteAddress, authRole);
+ }
+ }
+ }
+ } else {
+ // auth not complete, continue auth with client side.
+ ctx.writeAndFlush(Commands.newAuthChallenge(authMethod,
authChallenge, clientProtocolVersion));
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Authentication in progress client by
method {}.", remoteAddress, authMethod);
}
}
-
- return State.Connected;
+ } catch (Exception e) {
+ authenticationFailed(e);
}
+ }
- // auth not complete, continue auth with client side.
- writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData,
clientProtocolVersion));
- if (log.isDebugEnabled()) {
- log.debug("[{}] Authentication in progress client by method {}.",
- remoteAddress, authMethod);
- log.debug("[{}] connect state change to : [{}]", remoteAddress,
State.Connecting.name());
+ private void authenticateOriginalData(int clientProtoVersion, String
clientVersion) {
+ originalAuthState
+ .authenticateAsync(originalAuthDataCopy)
+ .whenCompleteAsync((authChallenge, throwable) -> {
+ if (throwable != null) {
+ authenticationFailed(throwable);
+ } else if (authChallenge != null) {
+ // The protocol does not yet handle an auth challenge
here.
+ // See https://github.com/apache/pulsar/issues/19291.
+ authenticationFailed(new
AuthenticationException("Failed to authenticate original auth data "
+ + "due to unsupported authChallenge."));
+ } else {
+ try {
+ // No need to retain these bytes anymore
+ originalAuthDataCopy = null;
+ originalAuthData =
originalAuthState.getAuthDataSource();
+ originalPrincipal =
originalAuthState.getAuthRole();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Authenticated original role
(forwarded from proxy): {}",
+ remoteAddress, originalPrincipal);
+ }
+ completeConnect(clientProtoVersion, clientVersion);
+ } catch (Exception e) {
+ authenticationFailed(e);
+ }
+ }
+ }, ctx.executor());
+ }
+
+ // Handle authentication and authentication refresh failures. Must be
called from event loop.
+ private void authenticationFailed(Throwable t) {
+ String operation;
+ if (state == State.Connecting) {
+ service.getPulsarStats().recordConnectionCreateFail();
+ operation = "connect";
+ } else {
+ operation = "authentication-refresh";
}
- return State.Connecting;
+ state = State.Failed;
+ logAuthException(remoteAddress, operation, getPrincipal(),
Optional.empty(), t);
+ final ByteBuf msg = Commands.newError(-1,
ServerError.AuthenticationError, "Failed to authenticate");
+ NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
}
public void refreshAuthenticationCredentials() {
@@ -871,10 +934,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
if (!service.isAuthenticationEnabled()) {
- completeConnect(clientProtocolVersion, clientVersion,
enableSubscriptionPatternEvaluation);
+ completeConnect(clientProtocolVersion, clientVersion);
return;
}
+ // Go to Connecting state now because auth can be async.
+ state = State.Connecting;
+
try {
byte[] authData = connect.hasAuthData() ? connect.getAuthData() :
emptyArray;
AuthData clientData = AuthData.of(authData);
@@ -899,10 +965,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
authRole =
getBrokerService().getAuthenticationService().getAnonymousUserRole()
.orElseThrow(() ->
new AuthenticationException("No anonymous role, and no
authentication provider configured"));
- completeConnect(clientProtocolVersion, clientVersion,
enableSubscriptionPatternEvaluation);
+ completeConnect(clientProtocolVersion, clientVersion);
return;
}
-
// init authState and other var
ChannelHandler sslHandler =
ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
SSLSession sslSession = null;
@@ -922,14 +987,11 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
log.debug("[{}] Authenticate role : {}", remoteAddress, role);
}
- state = doAuthentication(clientData, clientProtocolVersion,
clientVersion);
-
- // This will fail the check if:
- // 1. client is coming through a proxy
- // 2. we require to validate the original credentials
- // 3. no credentials were passed
if (connect.hasOriginalPrincipal() &&
service.getPulsar().getConfig().isAuthenticateOriginalAuthData()) {
- // init authentication
+ // Flow:
+ // 1. Initialize original authentication.
+ // 2. Authenticate the proxy's authentication data.
+ // 3. Authenticate the original authentication data.
String originalAuthMethod;
if (connect.hasOriginalAuthMethod()) {
originalAuthMethod = connect.getOriginalAuthMethod();
@@ -947,32 +1009,23 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
+ " using auth method [%s] is not
available", originalAuthMethod));
}
- AuthData originalAuthDataCopy =
AuthData.of(connect.getOriginalAuthData().getBytes());
+ originalAuthDataCopy =
AuthData.of(connect.getOriginalAuthData().getBytes());
originalAuthState =
originalAuthenticationProvider.newAuthState(
originalAuthDataCopy,
remoteAddress,
sslSession);
- originalAuthState.authenticate(originalAuthDataCopy);
- originalAuthData = originalAuthState.getAuthDataSource();
- originalPrincipal = originalAuthState.getAuthRole();
+ } else if (connect.hasOriginalPrincipal()) {
+ originalPrincipal = connect.getOriginalPrincipal();
if (log.isDebugEnabled()) {
- log.debug("[{}] Authenticate original role : {}",
remoteAddress, originalPrincipal);
- }
- } else {
- originalPrincipal = connect.hasOriginalPrincipal() ?
connect.getOriginalPrincipal() : null;
-
- if (log.isDebugEnabled()) {
- log.debug("[{}] Authenticate original role (forwarded from
proxy): {}",
+ log.debug("[{}] Setting original role (forwarded from
proxy): {}",
remoteAddress, originalPrincipal);
}
}
+
+ doAuthentication(clientData, false, clientProtocolVersion,
clientVersion);
} catch (Exception e) {
- service.getPulsarStats().recordConnectionCreateFail();
- state = State.Failed;
- logAuthException(remoteAddress, "connect", getPrincipal(),
Optional.empty(), e);
- ByteBuf msg = Commands.newError(-1,
ServerError.AuthenticationError, "Unable to authenticate");
- NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
+ authenticationFailed(e);
}
}
@@ -990,21 +1043,10 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
try {
AuthData clientData =
AuthData.of(authResponse.getResponse().getAuthData());
- doAuthentication(clientData, authResponse.getProtocolVersion(),
+ doAuthentication(clientData, originalAuthState != null,
authResponse.getProtocolVersion(),
authResponse.hasClientVersion() ?
authResponse.getClientVersion() : EMPTY);
- } catch (AuthenticationException e) {
- service.getPulsarStats().recordConnectionCreateFail();
- state = State.Failed;
- log.warn("[{}] Authentication failed: {} ", remoteAddress,
e.getMessage());
- ByteBuf msg = Commands.newError(-1,
ServerError.AuthenticationError, "Unable to authenticate");
- NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
} catch (Exception e) {
- service.getPulsarStats().recordConnectionCreateFail();
- state = State.Failed;
- String msg = "Unable to handleAuthResponse";
- log.warn("[{}] {} ", remoteAddress, msg, e);
- ByteBuf command = Commands.newError(-1, ServerError.UnknownError,
msg);
- NettyChannelUtil.writeAndFlushWithClosePromise(ctx, command);
+ authenticationFailed(e);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
deleted file mode 100644
index 03ef2460f06..00000000000
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
+++ /dev/null
@@ -1,400 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pulsar.broker.service;
-
-import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
-import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-import com.google.common.collect.Sets;
-import io.jsonwebtoken.Jwts;
-import io.jsonwebtoken.SignatureAlgorithm;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPipeline;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.charset.StandardCharsets;
-import java.util.Base64;
-import java.util.Collections;
-import java.util.Optional;
-import java.util.Properties;
-import javax.crypto.SecretKey;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.testcontext.PulsarTestContext;
-import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
-import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
-import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
-import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
-import org.apache.pulsar.broker.authorization.AuthorizationService;
-import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.auth.AuthenticationToken;
-import org.apache.pulsar.common.api.proto.CommandConnect;
-import org.apache.pulsar.common.api.proto.CommandLookupTopic;
-import org.apache.pulsar.common.api.proto.CommandProducer;
-import org.apache.pulsar.common.api.proto.CommandSubscribe;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.policies.data.TopicOperation;
-import org.mockito.ArgumentMatcher;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-@Test(groups = "broker")
-public class ServerCnxAuthorizationTest {
- private final SecretKey SECRET_KEY =
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
- private final String CLIENT_PRINCIPAL = "client";
- private final String PROXY_PRINCIPAL = "proxy";
- private final String CLIENT_TOKEN =
Jwts.builder().setSubject(CLIENT_PRINCIPAL).signWith(SECRET_KEY).compact();
- private final String PROXY_TOKEN =
Jwts.builder().setSubject(PROXY_PRINCIPAL).signWith(SECRET_KEY).compact();
-
- private ServiceConfiguration svcConfig;
-
- protected PulsarTestContext pulsarTestContext;
- private BrokerService brokerService;
-
- @BeforeMethod(alwaysRun = true)
- public void beforeMethod() throws Exception {
- svcConfig = new ServiceConfiguration();
- svcConfig.setKeepAliveIntervalSeconds(0);
- svcConfig.setBrokerShutdownTimeoutMs(0L);
- svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
- svcConfig.setClusterName("pulsar-cluster");
- svcConfig.setSuperUserRoles(Collections.singleton(PROXY_PRINCIPAL));
- svcConfig.setAuthenticationEnabled(true);
-
svcConfig.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName()));
- svcConfig.setAuthorizationEnabled(true);
-
svcConfig.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
- Properties properties = new Properties();
- properties.setProperty("tokenSecretKey", "data:;base64,"
- + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
- svcConfig.setProperties(properties);
- pulsarTestContext = PulsarTestContext.builderForNonStartableContext()
- .config(svcConfig)
- .spyByDefault()
- .build();
- brokerService = pulsarTestContext.getBrokerService();
-
-
pulsarTestContext.getPulsarResources().getTenantResources().createTenant("public",
- TenantInfo.builder().build());
- }
-
- @AfterMethod(alwaysRun = true)
- public void cleanup() throws Exception {
- if (pulsarTestContext != null) {
- pulsarTestContext.close();
- pulsarTestContext = null;
- }
- }
-
- @Test
- public void testVerifyOriginalPrincipalWithAuthDataForwardedFromProxy()
throws Exception {
- svcConfig.setAuthenticateOriginalAuthData(true);
-
-
- ServerCnx serverCnx = pulsarTestContext.createServerCnxSpy();
- ChannelHandlerContext channelHandlerContext =
mock(ChannelHandlerContext.class);
- Channel channel = mock(Channel.class);
- ChannelPipeline channelPipeline = mock(ChannelPipeline.class);
- doReturn(channelPipeline).when(channel).pipeline();
-
doReturn(null).when(channelPipeline).get(PulsarChannelInitializer.TLS_HANDLER);
-
- SocketAddress socketAddress = new InetSocketAddress(0);
- doReturn(socketAddress).when(channel).remoteAddress();
- doReturn(channel).when(channelHandlerContext).channel();
- channelHandlerContext.channel().remoteAddress();
- serverCnx.channelActive(channelHandlerContext);
-
- // connect
- AuthenticationToken clientAuthenticationToken = new
AuthenticationToken(CLIENT_TOKEN);
- AuthenticationToken proxyAuthenticationToken = new
AuthenticationToken(PROXY_TOKEN);
- CommandConnect connect = new CommandConnect();
-
connect.setAuthMethodName(proxyAuthenticationToken.getAuthMethodName());
-
connect.setAuthData(proxyAuthenticationToken.getAuthData().getCommandData().getBytes(StandardCharsets.UTF_8));
- connect.setClientVersion("test");
- connect.setProtocolVersion(1);
- connect.setOriginalPrincipal(CLIENT_PRINCIPAL);
-
connect.setOriginalAuthData(clientAuthenticationToken.getAuthData().getCommandData());
-
connect.setOriginalAuthMethod(clientAuthenticationToken.getAuthMethodName());
-
- serverCnx.handleConnect(connect);
- assertEquals(serverCnx.getOriginalAuthData().getCommandData(),
- clientAuthenticationToken.getAuthData().getCommandData());
- assertEquals(serverCnx.getOriginalAuthState().getAuthRole(),
CLIENT_PRINCIPAL);
- assertEquals(serverCnx.getOriginalPrincipal(), CLIENT_PRINCIPAL);
- assertEquals(serverCnx.getAuthData().getCommandData(),
- proxyAuthenticationToken.getAuthData().getCommandData());
- assertEquals(serverCnx.getAuthRole(), PROXY_PRINCIPAL);
- assertEquals(serverCnx.getAuthState().getAuthRole(), PROXY_PRINCIPAL);
-
- AuthorizationService authorizationService =
-
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class,
svcConfig,
- pulsarTestContext.getPulsarResources());
-
doReturn(authorizationService).when(brokerService).getAuthorizationService();
-
- // lookup
- CommandLookupTopic commandLookupTopic = new CommandLookupTopic();
- TopicName topicName =
TopicName.get("persistent://public/default/test-topic");
- commandLookupTopic.setTopic(topicName.toString());
- commandLookupTopic.setRequestId(1);
- serverCnx.handleLookup(commandLookupTopic);
- verify(authorizationService,
times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
- CLIENT_PRINCIPAL,
- serverCnx.getOriginalAuthData());
- verify(authorizationService,
times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
- PROXY_PRINCIPAL,
- serverCnx.getAuthData());
-
- // producer
- CommandProducer commandProducer = new CommandProducer();
- commandProducer.setRequestId(1);
- commandProducer.setProducerId(1);
- commandProducer.setProducerName("test-producer");
- commandProducer.setTopic(topicName.toString());
- serverCnx.handleProducer(commandProducer);
- verify(authorizationService,
times(1)).allowTopicOperationAsync(topicName, TopicOperation.PRODUCE,
- CLIENT_PRINCIPAL,
- serverCnx.getOriginalAuthData());
- verify(authorizationService,
times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
- PROXY_PRINCIPAL,
- serverCnx.getAuthData());
-
- // consumer
- CommandSubscribe commandSubscribe = new CommandSubscribe();
- commandSubscribe.setTopic(topicName.toString());
- commandSubscribe.setRequestId(1);
- commandSubscribe.setConsumerId(1);
- final String subscriptionName = "test-subscribe";
- commandSubscribe.setSubscription("test-subscribe");
- commandSubscribe.setSubType(CommandSubscribe.SubType.Shared);
- serverCnx.handleSubscribe(commandSubscribe);
-
- verify(authorizationService, times(1)).allowTopicOperationAsync(
- eq(topicName), eq(TopicOperation.CONSUME),
- eq(CLIENT_PRINCIPAL), argThat(arg -> {
- assertTrue(arg instanceof AuthenticationDataSubscription);
- try {
- assertEquals(arg.getCommandData(),
clientAuthenticationToken.getAuthData().getCommandData());
- } catch (PulsarClientException e) {
- fail(e.getMessage());
- }
- assertEquals(arg.getSubscription(), subscriptionName);
- return true;
- }));
- verify(authorizationService, times(1)).allowTopicOperationAsync(
- eq(topicName), eq(TopicOperation.CONSUME),
- eq(PROXY_PRINCIPAL), argThat(arg -> {
- assertTrue(arg instanceof AuthenticationDataSubscription);
- try {
- assertEquals(arg.getCommandData(),
proxyAuthenticationToken.getAuthData().getCommandData());
- } catch (PulsarClientException e) {
- fail(e.getMessage());
- }
- assertEquals(arg.getSubscription(), subscriptionName);
- return true;
- }));
- }
-
- @Test
- public void testVerifyOriginalPrincipalWithoutAuthDataForwardedFromProxy()
throws Exception {
- svcConfig.setAuthenticateOriginalAuthData(false);
-
- ServerCnx serverCnx = pulsarTestContext.createServerCnxSpy();
- ChannelHandlerContext channelHandlerContext =
mock(ChannelHandlerContext.class);
- Channel channel = mock(Channel.class);
- ChannelPipeline channelPipeline = mock(ChannelPipeline.class);
- doReturn(channelPipeline).when(channel).pipeline();
-
doReturn(null).when(channelPipeline).get(PulsarChannelInitializer.TLS_HANDLER);
-
- SocketAddress socketAddress = new InetSocketAddress(0);
- doReturn(socketAddress).when(channel).remoteAddress();
- doReturn(channel).when(channelHandlerContext).channel();
- channelHandlerContext.channel().remoteAddress();
- serverCnx.channelActive(channelHandlerContext);
-
- // connect
- AuthenticationToken proxyAuthenticationToken = new
AuthenticationToken(PROXY_TOKEN);
- CommandConnect connect = new CommandConnect();
-
connect.setAuthMethodName(proxyAuthenticationToken.getAuthMethodName());
-
connect.setAuthData(proxyAuthenticationToken.getAuthData().getCommandData().getBytes(StandardCharsets.UTF_8));
- connect.setClientVersion("test");
- connect.setProtocolVersion(1);
- connect.setOriginalPrincipal(CLIENT_PRINCIPAL);
- serverCnx.handleConnect(connect);
- assertNull(serverCnx.getOriginalAuthData());
- assertNull(serverCnx.getOriginalAuthState());
- assertEquals(serverCnx.getOriginalPrincipal(), CLIENT_PRINCIPAL);
- assertEquals(serverCnx.getAuthData().getCommandData(),
- proxyAuthenticationToken.getAuthData().getCommandData());
- assertEquals(serverCnx.getAuthRole(), PROXY_PRINCIPAL);
- assertEquals(serverCnx.getAuthState().getAuthRole(), PROXY_PRINCIPAL);
-
- AuthorizationService authorizationService =
-
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class,
svcConfig,
- pulsarTestContext.getPulsarResources());
-
doReturn(authorizationService).when(brokerService).getAuthorizationService();
-
- // lookup
- CommandLookupTopic commandLookupTopic = new CommandLookupTopic();
- TopicName topicName =
TopicName.get("persistent://public/default/test-topic");
- commandLookupTopic.setTopic(topicName.toString());
- commandLookupTopic.setRequestId(1);
- serverCnx.handleLookup(commandLookupTopic);
- verify(authorizationService,
times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
- CLIENT_PRINCIPAL,
- serverCnx.getAuthData());
- verify(authorizationService,
times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
- PROXY_PRINCIPAL,
- serverCnx.getAuthData());
-
- // producer
- CommandProducer commandProducer = new CommandProducer();
- commandProducer.setRequestId(1);
- commandProducer.setProducerId(1);
- commandProducer.setProducerName("test-producer");
- commandProducer.setTopic(topicName.toString());
- serverCnx.handleProducer(commandProducer);
- verify(authorizationService,
times(1)).allowTopicOperationAsync(topicName, TopicOperation.PRODUCE,
- CLIENT_PRINCIPAL,
- serverCnx.getAuthData());
- verify(authorizationService,
times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
- PROXY_PRINCIPAL,
- serverCnx.getAuthData());
-
- // consumer
- CommandSubscribe commandSubscribe = new CommandSubscribe();
- commandSubscribe.setTopic(topicName.toString());
- commandSubscribe.setRequestId(1);
- commandSubscribe.setConsumerId(1);
- final String subscriptionName = "test-subscribe";
- commandSubscribe.setSubscription("test-subscribe");
- commandSubscribe.setSubType(CommandSubscribe.SubType.Shared);
- serverCnx.handleSubscribe(commandSubscribe);
-
- ArgumentMatcher<AuthenticationDataSource>
authenticationDataSourceArgumentMatcher = arg -> {
- assertTrue(arg instanceof AuthenticationDataSubscription);
- try {
- assertEquals(arg.getCommandData(),
proxyAuthenticationToken.getAuthData().getCommandData());
- } catch (PulsarClientException e) {
- fail(e.getMessage());
- }
- assertEquals(arg.getSubscription(), subscriptionName);
- return true;
- };
-
- verify(authorizationService, times(1)).allowTopicOperationAsync(
- eq(topicName), eq(TopicOperation.CONSUME),
- eq(CLIENT_PRINCIPAL),
argThat(authenticationDataSourceArgumentMatcher));
- verify(authorizationService, times(1)).allowTopicOperationAsync(
- eq(topicName), eq(TopicOperation.CONSUME),
- eq(PROXY_PRINCIPAL),
argThat(authenticationDataSourceArgumentMatcher));
- }
-
- @Test
- public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker()
throws Exception {
- ServerCnx serverCnx = pulsarTestContext.createServerCnxSpy();
-
- ChannelHandlerContext channelHandlerContext =
mock(ChannelHandlerContext.class);
- Channel channel = mock(Channel.class);
- ChannelPipeline channelPipeline = mock(ChannelPipeline.class);
- doReturn(channelPipeline).when(channel).pipeline();
-
doReturn(null).when(channelPipeline).get(PulsarChannelInitializer.TLS_HANDLER);
-
- SocketAddress socketAddress = new InetSocketAddress(0);
- doReturn(socketAddress).when(channel).remoteAddress();
- doReturn(channel).when(channelHandlerContext).channel();
- channelHandlerContext.channel().remoteAddress();
- serverCnx.channelActive(channelHandlerContext);
-
- // connect
- AuthenticationToken clientAuthenticationToken = new
AuthenticationToken(CLIENT_TOKEN);
- CommandConnect connect = new CommandConnect();
-
connect.setAuthMethodName(clientAuthenticationToken.getAuthMethodName());
-
connect.setAuthData(clientAuthenticationToken.getAuthData().getCommandData().getBytes(StandardCharsets.UTF_8));
- connect.setClientVersion("test");
- connect.setProtocolVersion(1);
- serverCnx.handleConnect(connect);
- assertNull(serverCnx.getOriginalAuthData());
- assertNull(serverCnx.getOriginalAuthState());
- assertNull(serverCnx.getOriginalPrincipal());
- assertEquals(serverCnx.getAuthData().getCommandData(),
- clientAuthenticationToken.getAuthData().getCommandData());
- assertEquals(serverCnx.getAuthRole(), CLIENT_PRINCIPAL);
- assertEquals(serverCnx.getAuthState().getAuthRole(), CLIENT_PRINCIPAL);
-
- AuthorizationService authorizationService =
-
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class,
svcConfig,
- pulsarTestContext.getPulsarResources());
-
doReturn(authorizationService).when(brokerService).getAuthorizationService();
-
- // lookup
- CommandLookupTopic commandLookupTopic = new CommandLookupTopic();
- TopicName topicName =
TopicName.get("persistent://public/default/test-topic");
- commandLookupTopic.setTopic(topicName.toString());
- commandLookupTopic.setRequestId(1);
- serverCnx.handleLookup(commandLookupTopic);
- verify(authorizationService,
times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
- CLIENT_PRINCIPAL,
- serverCnx.getAuthData());
-
- // producer
- CommandProducer commandProducer = new CommandProducer();
- commandProducer.setRequestId(1);
- commandProducer.setProducerId(1);
- commandProducer.setProducerName("test-producer");
- commandProducer.setTopic(topicName.toString());
- serverCnx.handleProducer(commandProducer);
- verify(authorizationService,
times(1)).allowTopicOperationAsync(topicName, TopicOperation.PRODUCE,
- CLIENT_PRINCIPAL,
- serverCnx.getAuthData());
-
- // consumer
- CommandSubscribe commandSubscribe = new CommandSubscribe();
- commandSubscribe.setTopic(topicName.toString());
- commandSubscribe.setRequestId(1);
- commandSubscribe.setConsumerId(1);
- final String subscriptionName = "test-subscribe";
- commandSubscribe.setSubscription("test-subscribe");
- commandSubscribe.setSubType(CommandSubscribe.SubType.Shared);
- serverCnx.handleSubscribe(commandSubscribe);
-
- verify(authorizationService, times(1)).allowTopicOperationAsync(
- eq(topicName), eq(TopicOperation.CONSUME),
- eq(CLIENT_PRINCIPAL), argThat(arg -> {
- assertTrue(arg instanceof AuthenticationDataSubscription);
- try {
- assertEquals(arg.getCommandData(),
clientAuthenticationToken.getAuthData().getCommandData());
- } catch (PulsarClientException e) {
- fail(e.getMessage());
- }
- assertEquals(arg.getSubscription(), subscriptionName);
- return true;
- }));
- }
-}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 3dcea7e4bd7..5b45a16d3dc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -20,11 +20,14 @@
package org.apache.pulsar.broker.service;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@@ -71,13 +74,13 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
-import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -107,6 +110,7 @@ import
org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataRespons
import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
import org.apache.pulsar.common.api.proto.CommandSendError;
import org.apache.pulsar.common.api.proto.CommandSendReceipt;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.CommandSuccess;
@@ -120,6 +124,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Commands.ChecksumType;
@@ -357,34 +362,117 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void testConnectCommandWithAuthenticationPositive() throws
Exception {
AuthenticationService authenticationService =
mock(AuthenticationService.class);
- AuthenticationProvider authenticationProvider =
mock(AuthenticationProvider.class);
- AuthenticationState authenticationState =
mock(AuthenticationState.class);
- AuthData authData = AuthData.of(null);
+ AuthenticationProvider authenticationProvider = new
MockAuthenticationProvider();
+ String authMethodName = authenticationProvider.getAuthMethodName();
-
doReturn(authenticationService).when(brokerService).getAuthenticationService();
-
doReturn(authenticationProvider).when(authenticationService).getAuthenticationProvider(Mockito.anyString());
- doReturn(authenticationState).when(authenticationProvider)
- .newAuthState(Mockito.any(), Mockito.any(), Mockito.any());
- doReturn(authData).when(authenticationState)
- .authenticate(authData);
- doReturn(true).when(authenticationState)
- .isComplete();
+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+ svcConfig.setAuthenticationEnabled(true);
+
+ resetChannel();
+ assertTrue(channel.isActive());
+ assertEquals(serverCnx.getState(), State.Start);
- doReturn("appid1").when(authenticationState)
- .getAuthRole();
+ // test server response to CONNECT
+ ByteBuf clientCommand = Commands.newConnect(authMethodName,
"pass.client", null);
+ channel.writeInbound(clientCommand);
+
+ assertTrue(getResponse() instanceof CommandConnected);
+ assertEquals(serverCnx.getState(), State.Connected);
+ assertEquals(serverCnx.getPrincipal(), "pass.client");
+ assertTrue(serverCnx.isActive());
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void
testConnectCommandWithoutOriginalAuthInfoWhenAuthenticateOriginalAuthData()
throws Exception {
+ AuthenticationService authenticationService =
mock(AuthenticationService.class);
+ AuthenticationProvider authenticationProvider = new
MockAuthenticationProvider();
+ String authMethodName = authenticationProvider.getAuthMethodName();
+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
svcConfig.setAuthenticationEnabled(true);
+ svcConfig.setAuthenticateOriginalAuthData(true);
resetChannel();
assertTrue(channel.isActive());
assertEquals(serverCnx.getState(), State.Start);
- // test server response to CONNECT
- ByteBuf clientCommand = Commands.newConnect("none", "", null);
+ ByteBuf clientCommand = Commands.newConnect(authMethodName,
"pass.client", "");
channel.writeInbound(clientCommand);
+ Object response1 = getResponse();
+ assertTrue(response1 instanceof CommandConnected);
assertEquals(serverCnx.getState(), State.Connected);
- assertTrue(getResponse() instanceof CommandConnected);
+ assertEquals(serverCnx.getAuthRole(), "pass.client");
+ assertEquals(serverCnx.getPrincipal(), "pass.client");
+ assertNull(serverCnx.getOriginalPrincipal());
+ assertTrue(serverCnx.isActive());
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void testConnectCommandWithPassingOriginalAuthData() throws
Exception {
+ AuthenticationService authenticationService =
mock(AuthenticationService.class);
+ AuthenticationProvider authenticationProvider = new
MockAuthenticationProvider();
+ String authMethodName = authenticationProvider.getAuthMethodName();
+
+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+ svcConfig.setAuthenticationEnabled(true);
+ svcConfig.setAuthenticateOriginalAuthData(true);
+ svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));
+
+ resetChannel();
+ assertTrue(channel.isActive());
+ assertEquals(serverCnx.getState(), State.Start);
+
+ ByteBuf clientCommand = Commands.newConnect(authMethodName,
"pass.proxy", 1, null,
+ null, "client", "pass.client", authMethodName);
+ channel.writeInbound(clientCommand);
+
+ Object response1 = getResponse();
+ assertTrue(response1 instanceof CommandConnected);
+ assertEquals(serverCnx.getState(), State.Connected);
+ // Note that this value will change to the client's data if the broker
sends an AuthChallenge to the
+ // proxy/client. Details described here
https://github.com/apache/pulsar/issues/19332.
+ assertEquals(serverCnx.getAuthRole(), "pass.proxy");
+ // These are all taken without verifying the auth data
+ assertEquals(serverCnx.getPrincipal(), "pass.client");
+ assertEquals(serverCnx.getOriginalPrincipal(), "pass.client");
+ assertTrue(serverCnx.isActive());
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void testConnectCommandWithPassingOriginalPrincipal() throws
Exception {
+ AuthenticationService authenticationService =
mock(AuthenticationService.class);
+ AuthenticationProvider authenticationProvider = new
MockAuthenticationProvider();
+ String authMethodName = authenticationProvider.getAuthMethodName();
+
+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+ svcConfig.setAuthenticationEnabled(true);
+ svcConfig.setAuthenticateOriginalAuthData(false);
+ svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));
+
+ resetChannel();
+ assertTrue(channel.isActive());
+ assertEquals(serverCnx.getState(), State.Start);
+
+ ByteBuf clientCommand = Commands.newConnect(authMethodName,
"pass.proxy", 1, null,
+ null, "client", "pass.client", authMethodName);
+ channel.writeInbound(clientCommand);
+
+ Object response1 = getResponse();
+ assertTrue(response1 instanceof CommandConnected);
+ assertEquals(serverCnx.getState(), State.Connected);
+ assertEquals(serverCnx.getAuthRole(), "pass.proxy");
+ // These are all taken without verifying the auth data
+ assertEquals(serverCnx.getPrincipal(), "client");
+ assertEquals(serverCnx.getOriginalPrincipal(), "client");
+ assertTrue(serverCnx.isActive());
channel.finish();
}
@@ -418,7 +506,7 @@ public class ServerCnxTest {
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
svcConfig.setAuthenticationEnabled(true);
svcConfig.setAuthenticateOriginalAuthData(true);
- svcConfig.setProxyRoles(Collections.singleton("proxy"));
+ svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));
resetChannel();
assertTrue(channel.isActive());
@@ -428,14 +516,9 @@ public class ServerCnxTest {
null, "client", "fail", authMethodName);
channel.writeInbound(clientCommand);
- // We currently expect two responses because the originalAuthData is
verified after sending
- // a successful response to the proxy. Because this is a synchronous
operation, there is currently
- // no risk. It would be better to fix this. See
https://github.com/apache/pulsar/issues/19311.
Object response1 = getResponse();
- assertTrue(response1 instanceof CommandConnected);
- Object response2 = getResponse();
- assertTrue(response2 instanceof CommandError);
- assertEquals(((CommandError) response2).getMessage(), "Unable to
authenticate");
+ assertTrue(response1 instanceof CommandError);
+ assertEquals(((CommandError) response1).getMessage(), "Failed to
authenticate");
assertEquals(serverCnx.getState(), State.Failed);
assertFalse(serverCnx.isActive());
channel.finish();
@@ -461,6 +544,7 @@ public class ServerCnxTest {
Object challenge1 = getResponse();
assertTrue(challenge1 instanceof CommandAuthChallenge);
+ assertEquals(serverCnx.getState(), State.Connecting);
// Trigger another AuthChallenge to verify that code path continues to
challenge
ByteBuf authResponse1 =
@@ -469,6 +553,7 @@ public class ServerCnxTest {
Object challenge2 = getResponse();
assertTrue(challenge2 instanceof CommandAuthChallenge);
+ assertEquals(serverCnx.getState(), State.Connecting);
// Trigger failure
ByteBuf authResponse2 = Commands.newAuthResponse(authMethodName,
AuthData.of("fail.client".getBytes()), 1, "1");
@@ -476,12 +561,320 @@ public class ServerCnxTest {
Object response3 = getResponse();
assertTrue(response3 instanceof CommandError);
- assertEquals(((CommandError) response3).getMessage(), "Unable to
authenticate");
+ assertEquals(((CommandError) response3).getMessage(), "Failed to
authenticate");
assertEquals(serverCnx.getState(), State.Failed);
assertFalse(serverCnx.isActive());
channel.finish();
}
+ @Test(timeOut = 30000)
+ public void testOriginalAuthDataTriggersAuthChallengeFailure() throws
Exception {
+ // Test verifies the current behavior in the absence of a solution for
+ // https://github.com/apache/pulsar/issues/19291. When that issue is
completed, we can update this test
+ // to correctly verify that behavior.
+ AuthenticationService authenticationService =
mock(AuthenticationService.class);
+ AuthenticationProvider authenticationProvider = new
MockMultiStageAuthenticationProvider();
+ String authMethodName = authenticationProvider.getAuthMethodName();
+
+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+ svcConfig.setAuthenticationEnabled(true);
+ svcConfig.setAuthenticateOriginalAuthData(true);
+ svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));
+
+ resetChannel();
+ assertTrue(channel.isActive());
+ assertEquals(serverCnx.getState(), State.Start);
+
+ // Trigger connect command to result in AuthChallenge
+ ByteBuf clientCommand = Commands.newConnect(authMethodName,
"pass.proxy", 1, "1",
+ "localhost", "client", "challenge.client", authMethodName);
+ channel.writeInbound(clientCommand);
+
+ Object response = getResponse();
+ assertTrue(response instanceof CommandError);
+
+ assertEquals(((CommandError) response).getMessage(), "Failed to
authenticate");
+ assertEquals(serverCnx.getState(), State.Failed);
+ assertFalse(serverCnx.isActive());
+ channel.finish();
+ }
+
+ // This test used to be in the ServerCnxAuthorizationTest class, but it
was migrated here because the mocking
+ // in that class was too extensive. There is some overlap with this test
and other tests in this class. The primary
+ // role of this test is verifying that the correct role and
AuthenticationDataSource are passed to the
+ // AuthorizationService.
+ @Test
+ public void testVerifyOriginalPrincipalWithAuthDataForwardedFromProxy()
throws Exception {
+ AuthenticationService authenticationService =
mock(AuthenticationService.class);
+ AuthenticationProvider authenticationProvider = new
MockAuthenticationProvider();
+ String authMethodName = authenticationProvider.getAuthMethodName();
+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+ svcConfig.setAuthenticationEnabled(true);
+ svcConfig.setAuthenticateOriginalAuthData(true);
+ svcConfig.setProxyRoles(Collections.singleton("pass.pass"));
+
+
svcConfig.setAuthorizationProvider("org.apache.pulsar.broker.auth.MockAuthorizationProvider");
+ AuthorizationService authorizationService =
+
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class,
svcConfig,
+ pulsarTestContext.getPulsarResources());
+
when(brokerService.getAuthorizationService()).thenReturn(authorizationService);
+ svcConfig.setAuthorizationEnabled(true);
+
+ resetChannel();
+ assertTrue(channel.isActive());
+ assertEquals(serverCnx.getState(), State.Start);
+
+ // Connect
+ // This client role integrates with the MockAuthenticationProvider and
MockAuthorizationProvider
+ // to pass authentication and fail authorization
+ String proxyRole = "pass.pass";
+ String clientRole = "pass.fail";
+ // Submit a failing originalPrincipal to show that it is not used at
all.
+ ByteBuf connect = Commands.newConnect(authMethodName, proxyRole,
"test", "localhost",
+ "fail.fail", clientRole, authMethodName);
+ channel.writeInbound(connect);
+ Object connectResponse = getResponse();
+ assertTrue(connectResponse instanceof CommandConnected);
+ assertEquals(serverCnx.getOriginalAuthData().getCommandData(),
clientRole);
+ assertEquals(serverCnx.getOriginalAuthState().getAuthRole(),
clientRole);
+ assertEquals(serverCnx.getOriginalPrincipal(), clientRole);
+ assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole);
+ assertEquals(serverCnx.getAuthRole(), proxyRole);
+ assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole);
+
+ // Lookup
+ TopicName topicName =
TopicName.get("persistent://public/default/test-topic");
+ ByteBuf lookup = Commands.newLookup(topicName.toString(), false, 1);
+ channel.writeInbound(lookup);
+ Object lookupResponse = getResponse();
+ assertTrue(lookupResponse instanceof CommandLookupTopicResponse);
+ assertEquals(((CommandLookupTopicResponse) lookupResponse).getError(),
ServerError.AuthorizationError);
+ assertEquals(((CommandLookupTopicResponse)
lookupResponse).getRequestId(), 1);
+ verify(authorizationService, times(1))
+ .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
proxyRole, serverCnx.getAuthData());
+ verify(authorizationService, times(1))
+ .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
clientRole, serverCnx.getOriginalAuthData());
+
+ // producer
+ ByteBuf producer = Commands.newProducer(topicName.toString(), 1, 2,
"test-producer", new HashMap<>(), false);
+ channel.writeInbound(producer);
+ Object producerResponse = getResponse();
+ assertTrue(producerResponse instanceof CommandError);
+ assertEquals(((CommandError) producerResponse).getError(),
ServerError.AuthorizationError);
+ assertEquals(((CommandError) producerResponse).getRequestId(), 2);
+ verify(authorizationService, times(1))
+ .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE,
clientRole, serverCnx.getOriginalAuthData());
+ verify(authorizationService, times(1))
+ .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
proxyRole, serverCnx.getAuthData());
+
+ // consumer
+ String subscriptionName = "test-subscribe";
+ ByteBuf subscribe = Commands.newSubscribe(topicName.toString(),
subscriptionName, 1, 3,
+ CommandSubscribe.SubType.Shared, 0, "consumer", 0);
+ channel.writeInbound(subscribe);
+ Object subscribeResponse = getResponse();
+ assertTrue(subscribeResponse instanceof CommandError);
+ assertEquals(((CommandError) subscribeResponse).getError(),
ServerError.AuthorizationError);
+ assertEquals(((CommandError) subscribeResponse).getRequestId(), 3);
+ verify(authorizationService, times(1)).allowTopicOperationAsync(
+ eq(topicName), eq(TopicOperation.CONSUME),
+ eq(clientRole), argThat(arg -> {
+ assertTrue(arg instanceof AuthenticationDataSubscription);
+ assertEquals(arg.getCommandData(), clientRole);
+ assertEquals(arg.getSubscription(), subscriptionName);
+ return true;
+ }));
+ verify(authorizationService, times(1)).allowTopicOperationAsync(
+ eq(topicName), eq(TopicOperation.CONSUME),
+ eq(proxyRole), argThat(arg -> {
+ assertTrue(arg instanceof AuthenticationDataSubscription);
+ assertEquals(arg.getCommandData(), proxyRole);
+ assertEquals(arg.getSubscription(), subscriptionName);
+ return true;
+ }));
+ }
+
+ // This test used to be in the ServerCnxAuthorizationTest class, but it
was migrated here because the mocking
+ // in that class was too extensive. There is some overlap with this test
and other tests in this class. The primary
+ // role of this test is verifying that the correct role and
AuthenticationDataSource are passed to the
+ // AuthorizationService.
+ public void testVerifyOriginalPrincipalWithoutAuthDataForwardedFromProxy()
throws Exception {
+ AuthenticationService authenticationService =
mock(AuthenticationService.class);
+ AuthenticationProvider authenticationProvider = new
MockAuthenticationProvider();
+ String authMethodName = authenticationProvider.getAuthMethodName();
+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+ svcConfig.setAuthenticationEnabled(true);
+ svcConfig.setAuthenticateOriginalAuthData(false);
+ svcConfig.setProxyRoles(Collections.singleton("pass.pass"));
+
+
svcConfig.setAuthorizationProvider("org.apache.pulsar.broker.auth.MockAuthorizationProvider");
+ AuthorizationService authorizationService =
+
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class,
svcConfig,
+ pulsarTestContext.getPulsarResources());
+
when(brokerService.getAuthorizationService()).thenReturn(authorizationService);
+ svcConfig.setAuthorizationEnabled(true);
+
+ resetChannel();
+ assertTrue(channel.isActive());
+ assertEquals(serverCnx.getState(), State.Start);
+
+ // Connect
+ // This client role integrates with the MockAuthenticationProvider and
MockAuthorizationProvider
+ // to pass authentication and fail authorization
+ String proxyRole = "pass.pass";
+ String clientRole = "pass.fail";
+ ByteBuf connect = Commands.newConnect(authMethodName, proxyRole,
"test", "localhost",
+ clientRole, null, null);
+ channel.writeInbound(connect);
+ Object connectResponse = getResponse();
+ assertTrue(connectResponse instanceof CommandConnected);
+ assertNull(serverCnx.getOriginalAuthData());
+ assertNull(serverCnx.getOriginalAuthState());
+ assertEquals(serverCnx.getOriginalPrincipal(), clientRole);
+ assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole);
+ assertEquals(serverCnx.getAuthRole(), proxyRole);
+ assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole);
+
+ // Lookup
+ TopicName topicName =
TopicName.get("persistent://public/default/test-topic");
+ ByteBuf lookup = Commands.newLookup(topicName.toString(), false, 1);
+ channel.writeInbound(lookup);
+ Object lookupResponse = getResponse();
+ assertTrue(lookupResponse instanceof CommandLookupTopicResponse);
+ assertEquals(((CommandLookupTopicResponse) lookupResponse).getError(),
ServerError.AuthorizationError);
+ assertEquals(((CommandLookupTopicResponse)
lookupResponse).getRequestId(), 1);
+ verify(authorizationService, times(1))
+ .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
proxyRole, serverCnx.getAuthData());
+ // This test is an example of
https://github.com/apache/pulsar/issues/19332. Essentially, we're passing
+ // the proxy's auth data because it is all we have. This test should
be updated when we resolve that issue.
+ verify(authorizationService, times(1))
+ .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
clientRole, serverCnx.getAuthData());
+
+ // producer
+ ByteBuf producer = Commands.newProducer(topicName.toString(), 1, 2,
"test-producer", new HashMap<>(), false);
+ channel.writeInbound(producer);
+ Object producerResponse = getResponse();
+ assertTrue(producerResponse instanceof CommandError);
+ assertEquals(((CommandError) producerResponse).getError(),
ServerError.AuthorizationError);
+ assertEquals(((CommandError) producerResponse).getRequestId(), 2);
+ // See https://github.com/apache/pulsar/issues/19332 for justification
of this assertion.
+ verify(authorizationService, times(1))
+ .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE,
clientRole, serverCnx.getAuthData());
+ verify(authorizationService, times(1))
+ .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
proxyRole, serverCnx.getAuthData());
+
+ // consumer
+ String subscriptionName = "test-subscribe";
+ ByteBuf subscribe = Commands.newSubscribe(topicName.toString(),
subscriptionName, 1, 3,
+ CommandSubscribe.SubType.Shared, 0, "consumer", 0);
+ channel.writeInbound(subscribe);
+ Object subscribeResponse = getResponse();
+ assertTrue(subscribeResponse instanceof CommandError);
+ assertEquals(((CommandError) subscribeResponse).getError(),
ServerError.AuthorizationError);
+ assertEquals(((CommandError) subscribeResponse).getRequestId(), 3);
+ verify(authorizationService, times(1)).allowTopicOperationAsync(
+ eq(topicName), eq(TopicOperation.CONSUME),
+ eq(clientRole), argThat(arg -> {
+ assertTrue(arg instanceof AuthenticationDataSubscription);
+ // We assert that the role is clientRole and commandData
is proxyRole due to
+ // https://github.com/apache/pulsar/issues/19332.
+ assertEquals(arg.getCommandData(), proxyRole);
+ assertEquals(arg.getSubscription(), subscriptionName);
+ return true;
+ }));
+ verify(authorizationService, times(1)).allowTopicOperationAsync(
+ eq(topicName), eq(TopicOperation.CONSUME),
+ eq(proxyRole), argThat(arg -> {
+ assertTrue(arg instanceof AuthenticationDataSubscription);
+ assertEquals(arg.getCommandData(), proxyRole);
+ assertEquals(arg.getSubscription(), subscriptionName);
+ return true;
+ }));
+ }
+
+ // This test used to be in the ServerCnxAuthorizationTest class, but it
was migrated here because the mocking
+ // in that class was too extensive. There is some overlap with this test
and other tests in this class. The primary
+ // role of this test is verifying that the correct role and
AuthenticationDataSource are passed to the
+ // AuthorizationService.
+ @Test
+ public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker()
throws Exception {
+ AuthenticationService authenticationService =
mock(AuthenticationService.class);
+ AuthenticationProvider authenticationProvider = new
MockAuthenticationProvider();
+ String authMethodName = authenticationProvider.getAuthMethodName();
+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+ svcConfig.setAuthenticationEnabled(true);
+
+
svcConfig.setAuthorizationProvider("org.apache.pulsar.broker.auth.MockAuthorizationProvider");
+ AuthorizationService authorizationService =
+
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class,
svcConfig,
+ pulsarTestContext.getPulsarResources());
+
when(brokerService.getAuthorizationService()).thenReturn(authorizationService);
+ svcConfig.setAuthorizationEnabled(true);
+
+ resetChannel();
+ assertTrue(channel.isActive());
+ assertEquals(serverCnx.getState(), State.Start);
+
+ // connect
+ // This client role integrates with the MockAuthenticationProvider and
MockAuthorizationProvider
+ // to pass authentication and fail authorization
+ String clientRole = "pass.fail";
+ ByteBuf connect = Commands.newConnect(authMethodName, clientRole,
"test");
+ channel.writeInbound(connect);
+
+ Object connectResponse = getResponse();
+ assertTrue(connectResponse instanceof CommandConnected);
+ assertNull(serverCnx.getOriginalAuthData());
+ assertNull(serverCnx.getOriginalAuthState());
+ assertNull(serverCnx.getOriginalPrincipal());
+ assertEquals(serverCnx.getAuthData().getCommandData(), clientRole);
+ assertEquals(serverCnx.getAuthRole(), clientRole);
+ assertEquals(serverCnx.getAuthState().getAuthRole(), clientRole);
+
+ // lookup
+ TopicName topicName =
TopicName.get("persistent://public/default/test-topic");
+ ByteBuf lookup = Commands.newLookup(topicName.toString(), false, 1);
+ channel.writeInbound(lookup);
+ Object lookupResponse = getResponse();
+ assertTrue(lookupResponse instanceof CommandLookupTopicResponse);
+ assertEquals(((CommandLookupTopicResponse) lookupResponse).getError(),
ServerError.AuthorizationError);
+ assertEquals(((CommandLookupTopicResponse)
lookupResponse).getRequestId(), 1);
+ verify(authorizationService, times(1))
+ .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
clientRole, serverCnx.getAuthData());
+
+ // producer
+ ByteBuf producer = Commands.newProducer(topicName.toString(), 1, 2,
"test-producer", new HashMap<>(), false);
+ channel.writeInbound(producer);
+ Object producerResponse = getResponse();
+ assertTrue(producerResponse instanceof CommandError);
+ assertEquals(((CommandError) producerResponse).getError(),
ServerError.AuthorizationError);
+ assertEquals(((CommandError) producerResponse).getRequestId(), 2);
+ verify(authorizationService, times(1))
+ .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE,
clientRole, serverCnx.getAuthData());
+
+ // consumer
+ String subscriptionName = "test-subscribe";
+ ByteBuf subscribe = Commands.newSubscribe(topicName.toString(),
subscriptionName, 1, 3,
+ CommandSubscribe.SubType.Shared, 0, "consumer", 0);
+ channel.writeInbound(subscribe);
+ Object subscribeResponse = getResponse();
+ assertTrue(subscribeResponse instanceof CommandError);
+ assertEquals(((CommandError) subscribeResponse).getError(),
ServerError.AuthorizationError);
+ assertEquals(((CommandError) subscribeResponse).getRequestId(), 3);
+ verify(authorizationService, times(1)).allowTopicOperationAsync(
+ eq(topicName), eq(TopicOperation.CONSUME),
+ eq(clientRole), argThat(arg -> {
+ assertTrue(arg instanceof AuthenticationDataSubscription);
+ assertEquals(arg.getCommandData(), clientRole);
+ assertEquals(arg.getSubscription(), subscriptionName);
+ return true;
+ }));
+ }
+
@Test(timeOut = 30000)
public void testProducerCommand() throws Exception {
resetChannel();
diff --git
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
index 4233c5cfdfa..9119ffed4e2 100644
---
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
+++
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
@@ -209,7 +209,7 @@ public class TestPulsarAuth extends
MockedPulsarServiceBaseTest {
Assert.fail(); // should fail
} catch (TrinoException e){
Assert.assertEquals(PERMISSION_DENIED.toErrorCode(),
e.getErrorCode());
- Assert.assertTrue(e.getMessage().contains("Unable to
authenticate"));
+ Assert.assertTrue(e.getMessage().contains("Failed to
authenticate"));
}
pulsarAuth.cleanSession(session);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
index 3f0a6e0338f..0a9bb5e1959 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
@@ -141,7 +141,7 @@ public class TestPulsarSQLAuth extends TestPulsarSQLBase {
// Authorization error
assertEquals(e.getResult().getExitCode(), 1);
log.info(e.getResult().getStderr());
- assertTrue(e.getResult().getStderr().contains("Unable
to authenticate"));
+ assertTrue(e.getResult().getStderr().contains("Failed
to authenticate"));
}
}
);