This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new bf513e24432 [Branch-2.9] Fix passing incorrect authentication data 
(#16347)
bf513e24432 is described below

commit bf513e24432d51b079f0a9ad5da699d48c2d58b2
Author: Qiang Zhao <[email protected]>
AuthorDate: Sat Jul 2 22:23:44 2022 +0800

    [Branch-2.9] Fix passing incorrect authentication data (#16347)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  47 ++-
 .../broker/service/ServerCnxAuthorizationTest.java | 433 +++++++++++++++++++++
 .../pulsar/broker/service/ServerCnxTest.java       |   2 +-
 3 files changed, 474 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 16cab0a13a9..0bbdaca9f74 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -376,19 +376,21 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     // ////
 
     private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName 
topicName, TopicOperation operation,
-                    AuthenticationDataSource authData) {
+                                                               
AuthenticationDataSource authDataSource,
+                                                               
AuthenticationDataSource originalAuthDataSource) {
         if (!service.isAuthorizationEnabled()) {
             return CompletableFuture.completedFuture(true);
         }
         CompletableFuture<Boolean> isProxyAuthorizedFuture;
         if (originalPrincipal != null) {
             isProxyAuthorizedFuture = 
service.getAuthorizationService().allowTopicOperationAsync(
-                topicName, operation, originalPrincipal, authData);
+                    topicName, operation, originalPrincipal,
+                    originalAuthDataSource != null ? originalAuthDataSource : 
authDataSource);
         } else {
             isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
         }
         CompletableFuture<Boolean> isAuthorizedFuture = 
service.getAuthorizationService().allowTopicOperationAsync(
-            topicName, operation, authRole, authData);
+                topicName, operation, authRole, authDataSource);
         return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, 
(isProxyAuthorized, isAuthorized) -> {
             if (!isProxyAuthorized) {
                 log.warn("OriginalRole {} is not authorized to perform 
operation {} on topic {}",
@@ -407,7 +409,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         if (service.isAuthorizationEnabled()) {
             AuthenticationDataSource authData =
                     new 
AuthenticationDataSubscription(getAuthenticationData(), subscriptionName);
-            return isTopicOperationAllowed(topicName, operation, authData);
+            AuthenticationDataSource authDataSource =
+                    new AuthenticationDataSubscription(authenticationData, 
subscriptionName);
+            AuthenticationDataSource originalAuthDataSource = null;
+            if (originalAuthData != null) {
+                originalAuthDataSource = new 
AuthenticationDataSubscription(originalAuthData, subscriptionName);
+            }
+            return isTopicOperationAllowed(topicName, operation, 
authDataSource, originalAuthDataSource);
         } else {
             return CompletableFuture.completedFuture(true);
         }
@@ -442,7 +450,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 lookupSemaphore.release();
                 return;
             }
-            isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, 
getAuthenticationData()).thenApply(
+            isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, 
authenticationData, originalAuthData).thenApply(
                     isAuthorized -> {
                 if (isAuthorized) {
                     lookupTopicAsync(getBrokerService().pulsar(), topicName, 
authoritative,
@@ -506,7 +514,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 lookupSemaphore.release();
                 return;
             }
-            isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, 
getAuthenticationData()).thenApply(
+            isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, 
authenticationData, originalAuthData).thenApply(
                     isAuthorized -> {
                 if (isAuthorized) {
                     
unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName)
@@ -1161,7 +1169,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         }
 
         CompletableFuture<Boolean> isAuthorizedFuture = 
isTopicOperationAllowed(
-                topicName, TopicOperation.PRODUCE, getAuthenticationData()
+                topicName, TopicOperation.PRODUCE, authenticationData, 
originalAuthData
         );
         isAuthorizedFuture.thenApply(isAuthorized -> {
                     if (isAuthorized) {
@@ -2837,4 +2845,29 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     public boolean hasProducers() {
         return !producers.isEmpty();
     }
+
+    @VisibleForTesting
+    protected String getOriginalPrincipal() {
+        return originalPrincipal;
+    }
+
+    @VisibleForTesting
+    protected AuthenticationDataSource getAuthData() {
+        return authenticationData;
+    }
+
+    @VisibleForTesting
+    protected AuthenticationDataSource getOriginalAuthData() {
+        return originalAuthData;
+    }
+
+    @VisibleForTesting
+    protected AuthenticationState getOriginalAuthState() {
+        return originalAuthState;
+    }
+
+    @VisibleForTesting
+    protected void setAuthRole(String authRole) {
+        this.authRole = authRole;
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
new file mode 100644
index 00000000000..0d4580d044c
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service;
+
+import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import javax.crypto.SecretKey;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.resources.TenantResources;
+import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.api.proto.CommandConnect;
+import org.apache.pulsar.common.api.proto.CommandLookupTopic;
+import org.apache.pulsar.common.api.proto.CommandProducer;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.zookeeper.ZooKeeper;
+import org.mockito.ArgumentMatcher;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class ServerCnxAuthorizationTest {
+    private final SecretKey SECRET_KEY = 
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private final String CLIENT_PRINCIPAL = "client";
+    private final String PROXY_PRINCIPAL = "proxy";
+    private final String CLIENT_TOKEN = 
Jwts.builder().setSubject(CLIENT_PRINCIPAL).signWith(SECRET_KEY).compact();
+    private final String PROXY_TOKEN = 
Jwts.builder().setSubject(PROXY_PRINCIPAL).signWith(SECRET_KEY).compact();
+
+    private PulsarService pulsar;
+    private PulsarResources pulsarResources;
+    private BrokerService brokerService;
+    private ServiceConfiguration svcConfig;
+
+    @BeforeMethod(alwaysRun = true)
+    public void beforeMethod() throws Exception {
+        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
+        svcConfig = spy(ServiceConfiguration.class);
+        svcConfig.setKeepAliveIntervalSeconds(0);
+        svcConfig.setBrokerShutdownTimeoutMs(0L);
+        svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(1.0d);
+        svcConfig.setClusterName("pulsar-cluster");
+        svcConfig.setSuperUserRoles(Collections.singleton(PROXY_PRINCIPAL));
+        svcConfig.setAuthenticationEnabled(true);
+        
svcConfig.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName()));
+        svcConfig.setAuthorizationEnabled(true);
+        
svcConfig.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        svcConfig.setProperties(properties);
+
+        pulsar = spyWithClassAndConstructorArgs(PulsarService.class, 
svcConfig);
+        doReturn(new 
DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();
+
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+        
doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources();
+
+        ManagedLedgerFactory mlFactoryMock = mock(ManagedLedgerFactory.class);
+        doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
+
+        ZooKeeper mockZk = createMockZooKeeper();
+        OrderedExecutor executor = 
OrderedExecutor.newBuilder().numThreads(1).build();
+        doReturn(createMockBookKeeper(executor))
+                .when(pulsar).getBookKeeperClient();
+
+        MetadataStore store = new ZKMetadataStore(mockZk);
+
+        doReturn(store).when(pulsar).getLocalMetadataStore();
+        doReturn(store).when(pulsar).getConfigurationMetadataStore();
+
+        pulsarResources = 
spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
+        doReturn(pulsarResources).when(pulsar).getPulsarResources();
+        NamespaceResources namespaceResources =
+                spyWithClassAndConstructorArgs(NamespaceResources.class, 
store, store, 30);
+        
doReturn(namespaceResources).when(pulsarResources).getNamespaceResources();
+
+        TenantResources tenantResources = 
spyWithClassAndConstructorArgs(TenantResources.class, store, 30);
+        doReturn(tenantResources).when(pulsarResources).getTenantResources();
+
+        
doReturn(CompletableFuture.completedFuture(Optional.of(TenantInfo.builder().build()))).when(tenantResources)
+                .getTenantAsync("public");
+
+        brokerService = spyWithClassAndConstructorArgs(BrokerService.class, 
pulsar, eventLoopGroup);
+        BrokerInterceptor interceptor = mock(BrokerInterceptor.class);
+        doReturn(interceptor).when(brokerService).getInterceptor();
+        doReturn(brokerService).when(pulsar).getBrokerService();
+        doReturn(executor).when(pulsar).getOrderedExecutor();
+    }
+
+    @Test
+    public void testVerifyOriginalPrincipalWithAuthDataForwardedFromProxy() 
throws Exception {
+        doReturn(true).when(svcConfig).isAuthenticateOriginalAuthData();
+
+        ServerCnx serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, 
pulsar);
+        ChannelHandlerContext channelHandlerContext = 
mock(ChannelHandlerContext.class);
+        Channel channel = mock(Channel.class);
+        ChannelPipeline channelPipeline = mock(ChannelPipeline.class);
+        doReturn(channelPipeline).when(channel).pipeline();
+        
doReturn(null).when(channelPipeline).get(PulsarChannelInitializer.TLS_HANDLER);
+
+        SocketAddress socketAddress = new InetSocketAddress(0);
+        doReturn(socketAddress).when(channel).remoteAddress();
+        doReturn(channel).when(channelHandlerContext).channel();
+        channelHandlerContext.channel().remoteAddress();
+        serverCnx.channelActive(channelHandlerContext);
+
+        // connect
+        AuthenticationToken clientAuthenticationToken = new 
AuthenticationToken(CLIENT_TOKEN);
+        AuthenticationToken proxyAuthenticationToken = new 
AuthenticationToken(PROXY_TOKEN);
+        CommandConnect connect = new CommandConnect();
+        
connect.setAuthMethodName(proxyAuthenticationToken.getAuthMethodName());
+        
connect.setAuthData(proxyAuthenticationToken.getAuthData().getCommandData().getBytes(StandardCharsets.UTF_8));
+        connect.setClientVersion("test");
+        connect.setProtocolVersion(1);
+        connect.setOriginalPrincipal(CLIENT_PRINCIPAL);
+        
connect.setOriginalAuthData(clientAuthenticationToken.getAuthData().getCommandData());
+        
connect.setOriginalAuthMethod(clientAuthenticationToken.getAuthMethodName());
+
+        serverCnx.handleConnect(connect);
+        assertEquals(serverCnx.getOriginalAuthData().getCommandData(),
+                clientAuthenticationToken.getAuthData().getCommandData());
+        assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), 
CLIENT_PRINCIPAL);
+        assertEquals(serverCnx.getOriginalPrincipal(), CLIENT_PRINCIPAL);
+        assertEquals(serverCnx.getAuthData().getCommandData(),
+                proxyAuthenticationToken.getAuthData().getCommandData());
+        assertEquals(serverCnx.getAuthRole(), PROXY_PRINCIPAL);
+        assertEquals(serverCnx.getAuthState().getAuthRole(), PROXY_PRINCIPAL);
+
+        AuthorizationService authorizationService =
+                spyWithClassAndConstructorArgs(AuthorizationService.class, 
svcConfig, pulsarResources);
+        
doReturn(authorizationService).when(brokerService).getAuthorizationService();
+
+        // lookup
+        CommandLookupTopic commandLookupTopic = new CommandLookupTopic();
+        TopicName topicName = 
TopicName.get("persistent://public/default/test-topic");
+        commandLookupTopic.setTopic(topicName.toString());
+        commandLookupTopic.setRequestId(1);
+        serverCnx.handleLookup(commandLookupTopic);
+        verify(authorizationService, 
times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
+                CLIENT_PRINCIPAL,
+                serverCnx.getOriginalAuthData());
+        verify(authorizationService, 
times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
+                PROXY_PRINCIPAL,
+                serverCnx.getAuthData());
+
+        // producer
+        CommandProducer commandProducer = new CommandProducer();
+        commandProducer.setRequestId(1);
+        commandProducer.setProducerId(1);
+        commandProducer.setProducerName("test-producer");
+        commandProducer.setTopic(topicName.toString());
+        serverCnx.handleProducer(commandProducer);
+        verify(authorizationService, 
times(1)).allowTopicOperationAsync(topicName, TopicOperation.PRODUCE,
+                CLIENT_PRINCIPAL,
+                serverCnx.getOriginalAuthData());
+        verify(authorizationService, 
times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
+                PROXY_PRINCIPAL,
+                serverCnx.getAuthData());
+
+        // consumer
+        CommandSubscribe commandSubscribe = new CommandSubscribe();
+        commandSubscribe.setTopic(topicName.toString());
+        commandSubscribe.setRequestId(1);
+        commandSubscribe.setConsumerId(1);
+        final String subscriptionName = "test-subscribe";
+        commandSubscribe.setSubscription("test-subscribe");
+        commandSubscribe.setSubType(CommandSubscribe.SubType.Shared);
+        serverCnx.handleSubscribe(commandSubscribe);
+
+        verify(authorizationService, times(1)).allowTopicOperationAsync(
+                eq(topicName), eq(TopicOperation.CONSUME),
+                eq(CLIENT_PRINCIPAL), argThat(arg -> {
+                    assertTrue(arg instanceof AuthenticationDataSubscription);
+                    try {
+                        assertEquals(arg.getCommandData(), 
clientAuthenticationToken.getAuthData().getCommandData());
+                    } catch (PulsarClientException e) {
+                        fail(e.getMessage());
+                    }
+                    assertEquals(arg.getSubscription(), subscriptionName);
+                    return true;
+                }));
+        verify(authorizationService, times(1)).allowTopicOperationAsync(
+                eq(topicName), eq(TopicOperation.CONSUME),
+                eq(PROXY_PRINCIPAL), argThat(arg -> {
+                    assertTrue(arg instanceof AuthenticationDataSubscription);
+                    try {
+                        assertEquals(arg.getCommandData(), 
proxyAuthenticationToken.getAuthData().getCommandData());
+                    } catch (PulsarClientException e) {
+                        fail(e.getMessage());
+                    }
+                    assertEquals(arg.getSubscription(), subscriptionName);
+                    return true;
+                }));
+    }
+
+    @Test
+    public void testVerifyOriginalPrincipalWithoutAuthDataForwardedFromProxy() 
throws Exception {
+        doReturn(false).when(svcConfig).isAuthenticateOriginalAuthData();
+
+        ServerCnx serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, 
pulsar);
+        ChannelHandlerContext channelHandlerContext = 
mock(ChannelHandlerContext.class);
+        Channel channel = mock(Channel.class);
+        ChannelPipeline channelPipeline = mock(ChannelPipeline.class);
+        doReturn(channelPipeline).when(channel).pipeline();
+        
doReturn(null).when(channelPipeline).get(PulsarChannelInitializer.TLS_HANDLER);
+
+        SocketAddress socketAddress = new InetSocketAddress(0);
+        doReturn(socketAddress).when(channel).remoteAddress();
+        doReturn(channel).when(channelHandlerContext).channel();
+        channelHandlerContext.channel().remoteAddress();
+        serverCnx.channelActive(channelHandlerContext);
+
+        // connect
+        AuthenticationToken proxyAuthenticationToken = new 
AuthenticationToken(PROXY_TOKEN);
+        CommandConnect connect = new CommandConnect();
+        
connect.setAuthMethodName(proxyAuthenticationToken.getAuthMethodName());
+        
connect.setAuthData(proxyAuthenticationToken.getAuthData().getCommandData().getBytes(StandardCharsets.UTF_8));
+        connect.setClientVersion("test");
+        connect.setProtocolVersion(1);
+        connect.setOriginalPrincipal(CLIENT_PRINCIPAL);
+        serverCnx.handleConnect(connect);
+        assertNull(serverCnx.getOriginalAuthData());
+        assertNull(serverCnx.getOriginalAuthState());
+        assertEquals(serverCnx.getOriginalPrincipal(), CLIENT_PRINCIPAL);
+        assertEquals(serverCnx.getAuthData().getCommandData(),
+                proxyAuthenticationToken.getAuthData().getCommandData());
+        assertEquals(serverCnx.getAuthRole(), PROXY_PRINCIPAL);
+        assertEquals(serverCnx.getAuthState().getAuthRole(), PROXY_PRINCIPAL);
+
+        AuthorizationService authorizationService =
+                spyWithClassAndConstructorArgs(AuthorizationService.class, 
svcConfig, pulsarResources);
+        
doReturn(authorizationService).when(brokerService).getAuthorizationService();
+
+        // lookup
+        CommandLookupTopic commandLookupTopic = new CommandLookupTopic();
+        TopicName topicName = 
TopicName.get("persistent://public/default/test-topic");
+        commandLookupTopic.setTopic(topicName.toString());
+        commandLookupTopic.setRequestId(1);
+        serverCnx.handleLookup(commandLookupTopic);
+        verify(authorizationService, 
times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
+                CLIENT_PRINCIPAL,
+                serverCnx.getAuthData());
+        verify(authorizationService, 
times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
+                PROXY_PRINCIPAL,
+                serverCnx.getAuthData());
+
+        // producer
+        CommandProducer commandProducer = new CommandProducer();
+        commandProducer.setRequestId(1);
+        commandProducer.setProducerId(1);
+        commandProducer.setProducerName("test-producer");
+        commandProducer.setTopic(topicName.toString());
+        serverCnx.handleProducer(commandProducer);
+        verify(authorizationService, 
times(1)).allowTopicOperationAsync(topicName, TopicOperation.PRODUCE,
+                CLIENT_PRINCIPAL,
+                serverCnx.getAuthData());
+        verify(authorizationService, 
times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
+                PROXY_PRINCIPAL,
+                serverCnx.getAuthData());
+
+        // consumer
+        CommandSubscribe commandSubscribe = new CommandSubscribe();
+        commandSubscribe.setTopic(topicName.toString());
+        commandSubscribe.setRequestId(1);
+        commandSubscribe.setConsumerId(1);
+        final String subscriptionName = "test-subscribe";
+        commandSubscribe.setSubscription("test-subscribe");
+        commandSubscribe.setSubType(CommandSubscribe.SubType.Shared);
+        serverCnx.handleSubscribe(commandSubscribe);
+
+        ArgumentMatcher<AuthenticationDataSource> 
authenticationDataSourceArgumentMatcher = arg -> {
+            assertTrue(arg instanceof AuthenticationDataSubscription);
+            try {
+                assertEquals(arg.getCommandData(), 
proxyAuthenticationToken.getAuthData().getCommandData());
+            } catch (PulsarClientException e) {
+                fail(e.getMessage());
+            }
+            assertEquals(arg.getSubscription(), subscriptionName);
+            return true;
+        };
+
+        verify(authorizationService, times(1)).allowTopicOperationAsync(
+                eq(topicName), eq(TopicOperation.CONSUME),
+                eq(CLIENT_PRINCIPAL), 
argThat(authenticationDataSourceArgumentMatcher));
+        verify(authorizationService, times(1)).allowTopicOperationAsync(
+                eq(topicName), eq(TopicOperation.CONSUME),
+                eq(PROXY_PRINCIPAL), 
argThat(authenticationDataSourceArgumentMatcher));
+    }
+
+    @Test
+    public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker() 
throws Exception {
+        ServerCnx serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, 
pulsar);
+
+        ChannelHandlerContext channelHandlerContext = 
mock(ChannelHandlerContext.class);
+        Channel channel = mock(Channel.class);
+        ChannelPipeline channelPipeline = mock(ChannelPipeline.class);
+        doReturn(channelPipeline).when(channel).pipeline();
+        
doReturn(null).when(channelPipeline).get(PulsarChannelInitializer.TLS_HANDLER);
+
+        SocketAddress socketAddress = new InetSocketAddress(0);
+        doReturn(socketAddress).when(channel).remoteAddress();
+        doReturn(channel).when(channelHandlerContext).channel();
+        channelHandlerContext.channel().remoteAddress();
+        serverCnx.channelActive(channelHandlerContext);
+
+        // connect
+        AuthenticationToken clientAuthenticationToken = new 
AuthenticationToken(CLIENT_TOKEN);
+        CommandConnect connect = new CommandConnect();
+        
connect.setAuthMethodName(clientAuthenticationToken.getAuthMethodName());
+        
connect.setAuthData(clientAuthenticationToken.getAuthData().getCommandData().getBytes(StandardCharsets.UTF_8));
+        connect.setClientVersion("test");
+        connect.setProtocolVersion(1);
+        serverCnx.handleConnect(connect);
+        assertNull(serverCnx.getOriginalAuthData());
+        assertNull(serverCnx.getOriginalAuthState());
+        assertNull(serverCnx.getOriginalPrincipal());
+        assertEquals(serverCnx.getAuthData().getCommandData(),
+                clientAuthenticationToken.getAuthData().getCommandData());
+        assertEquals(serverCnx.getAuthRole(), CLIENT_PRINCIPAL);
+        assertEquals(serverCnx.getAuthState().getAuthRole(), CLIENT_PRINCIPAL);
+
+        AuthorizationService authorizationService =
+                spyWithClassAndConstructorArgs(AuthorizationService.class, 
svcConfig, pulsarResources);
+        
doReturn(authorizationService).when(brokerService).getAuthorizationService();
+
+        // lookup
+        CommandLookupTopic commandLookupTopic = new CommandLookupTopic();
+        TopicName topicName = 
TopicName.get("persistent://public/default/test-topic");
+        commandLookupTopic.setTopic(topicName.toString());
+        commandLookupTopic.setRequestId(1);
+        serverCnx.handleLookup(commandLookupTopic);
+        verify(authorizationService, 
times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
+                CLIENT_PRINCIPAL,
+                serverCnx.getAuthData());
+
+        // producer
+        CommandProducer commandProducer = new CommandProducer();
+        commandProducer.setRequestId(1);
+        commandProducer.setProducerId(1);
+        commandProducer.setProducerName("test-producer");
+        commandProducer.setTopic(topicName.toString());
+        serverCnx.handleProducer(commandProducer);
+        verify(authorizationService, 
times(1)).allowTopicOperationAsync(topicName, TopicOperation.PRODUCE,
+                CLIENT_PRINCIPAL,
+                serverCnx.getAuthData());
+
+        // consumer
+        CommandSubscribe commandSubscribe = new CommandSubscribe();
+        commandSubscribe.setTopic(topicName.toString());
+        commandSubscribe.setRequestId(1);
+        commandSubscribe.setConsumerId(1);
+        final String subscriptionName = "test-subscribe";
+        commandSubscribe.setSubscription("test-subscribe");
+        commandSubscribe.setSubType(CommandSubscribe.SubType.Shared);
+        serverCnx.handleSubscribe(commandSubscribe);
+
+        verify(authorizationService, times(1)).allowTopicOperationAsync(
+                eq(topicName), eq(TopicOperation.CONSUME),
+                eq(CLIENT_PRINCIPAL), argThat(arg -> {
+                    assertTrue(arg instanceof AuthenticationDataSubscription);
+                    try {
+                        assertEquals(arg.getCommandData(), 
clientAuthenticationToken.getAuthData().getCommandData());
+                    } catch (PulsarClientException e) {
+                        fail(e.getMessage());
+                    }
+                    assertEquals(arg.getSubscription(), subscriptionName);
+                    return true;
+                }));
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index be77a50eea5..5615a28a457 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1550,7 +1550,7 @@ public class ServerCnxTest {
             channel.close().get();
         }
         serverCnx = new ServerCnx(pulsar);
-        serverCnx.authRole = "";
+        serverCnx.setAuthRole("");
         channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(
                 MaxMessageSize,
                 0,

Reply via email to