This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 716db378453 [improve][txn] Cleanup how superusers abort txns (#19976)
716db378453 is described below

commit 716db378453fff6aa19b898b91ac1582c701fb70
Author: Michael Marshall <[email protected]>
AuthorDate: Fri Apr 7 00:34:39 2023 -0500

    [improve][txn] Cleanup how superusers abort txns (#19976)
    
    This PR builds on https://github.com/apache/pulsar/pull/19467. When we 
modify/abort transactions, we need to make sure that authorization is checked 
for both the proxy and the client.
    
    * Add a second authorization check when `originalPrincipal` is set in the 
`ServerCnx`.
    * Fix a bug where we were not doing a deep copy of the `SubscriptionsList` 
object. (Tests caught this bug!)
    
    Added a new test to cover some of the changes.
    
    This is an internal change.
    
    - [x] `doc-not-needed`
    
    PR in forked repository: https://github.com/michaeljmarshall/pulsar/pull/38
    
    (cherry picked from commit f76beda21134aff24cc0384e5447ffbe33c5c039)
    (cherry picked from commit 5a180f78d7636537198a758e1c9416e58d80bf42)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 49 +++++++++++++---------
 .../pulsar/broker/service/ServerCnxTest.java       | 46 ++++++++++++++++++++
 .../broker/service/utils/ClientChannelHelper.java  |  7 ++++
 3 files changed, 83 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c5359108f16..77c46897bab 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -42,6 +42,7 @@ import io.prometheus.client.Gauge;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.IdentityHashMap;
@@ -2325,32 +2326,39 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 });
     }
 
-    private CompletableFuture<Boolean> 
verifyTxnOwnershipForTCToBrokerCommands() {
+    private CompletableFuture<Boolean> isSuperUser() {
+        assert ctx.executor().inEventLoop();
         if (service.isAuthenticationEnabled() && 
service.isAuthorizationEnabled()) {
-            return getBrokerService()
-                    .getAuthorizationService()
-                    .isSuperUser(getPrincipal(), getAuthenticationData());
+            CompletableFuture<Boolean> isAuthRoleAuthorized = 
service.getAuthorizationService().isSuperUser(
+                    authRole, authenticationData);
+            if (originalPrincipal != null) {
+                CompletableFuture<Boolean> isOriginalPrincipalAuthorized = 
service.getAuthorizationService()
+                        .isSuperUser(originalPrincipal,
+                                originalAuthData != null ? originalAuthData : 
authenticationData);
+                return 
isOriginalPrincipalAuthorized.thenCombine(isAuthRoleAuthorized,
+                        (originalPrincipal, authRole) -> originalPrincipal && 
authRole);
+            } else {
+                return isAuthRoleAuthorized;
+            }
         } else {
             return CompletableFuture.completedFuture(true);
         }
     }
 
     private CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID) {
-        final String checkOwner = getPrincipal();
+        assert ctx.executor().inEventLoop();
         return service.pulsar().getTransactionMetadataStoreService()
-                .verifyTxnOwnership(txnID, checkOwner)
-                .thenCompose(isOwner -> {
+                .verifyTxnOwnership(txnID, getPrincipal())
+                .thenComposeAsync(isOwner -> {
                     if (isOwner) {
                         return CompletableFuture.completedFuture(true);
                     }
                     if (service.isAuthenticationEnabled() && 
service.isAuthorizationEnabled()) {
-                        return getBrokerService()
-                                .getAuthorizationService()
-                                .isSuperUser(checkOwner, 
getAuthenticationData());
+                        return isSuperUser();
                     } else {
                         return CompletableFuture.completedFuture(false);
                     }
-                });
+                }, ctx.executor());
     }
 
     @Override
@@ -2366,10 +2374,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     txnID, txnAction);
         }
         CompletableFuture<Optional<Topic>> topicFuture = 
service.getTopicIfExists(TopicName.get(topic).toString());
-        topicFuture.thenAccept(optionalTopic -> {
+        topicFuture.thenAcceptAsync(optionalTopic -> {
             if (optionalTopic.isPresent()) {
-                // we only accept super user becase this endpoint is reserved 
for tc to broker communication
-                verifyTxnOwnershipForTCToBrokerCommands()
+                // we only accept superuser because this endpoint is reserved 
for tc to broker communication
+                isSuperUser()
                         .thenCompose(isOwner -> {
                             if (!isOwner) {
                                 return failedFutureTxnTcNotAllowed(txnID);
@@ -2419,7 +2427,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     return null;
                 });
             }
-        }).exceptionally(e -> {
+        }, ctx.executor()).exceptionally(e -> {
             log.error("handleEndTxnOnPartition fail ! topic {}, "
                             + "txnId: [{}], txnAction: [{}]", topic, txnID,
                     TxnAction.valueOf(txnAction), e.getCause());
@@ -2447,7 +2455,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         }
 
         CompletableFuture<Optional<Topic>> topicFuture = 
service.getTopicIfExists(TopicName.get(topic).toString());
-        topicFuture.thenAccept(optionalTopic -> {
+        topicFuture.thenAcceptAsync(optionalTopic -> {
             if (optionalTopic.isPresent()) {
                 Subscription subscription = 
optionalTopic.get().getSubscription(subName);
                 if (subscription == null) {
@@ -2459,7 +2467,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     return;
                 }
                 // we only accept super user becase this endpoint is reserved 
for tc to broker communication
-                verifyTxnOwnershipForTCToBrokerCommands()
+                isSuperUser()
                         .thenCompose(isOwner -> {
                             if (!isOwner) {
                                 return failedFutureTxnTcNotAllowed(txnID);
@@ -2509,7 +2517,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     return null;
                 });
             }
-        }).exceptionally(e -> {
+        }, ctx.executor()).exceptionally(e -> {
             log.error("handleEndTxnOnSubscription fail ! topic: {}, 
subscription: {}"
                             + "txnId: [{}], txnAction: [{}]", topic, subName,
                     txnID, TxnAction.valueOf(txnAction), e.getCause());
@@ -2544,7 +2552,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn 
command) {
         final TxnID txnID = new TxnID(command.getTxnidMostBits(), 
command.getTxnidLeastBits());
         final long requestId = command.getRequestId();
-        final List<org.apache.pulsar.common.api.proto.Subscription> 
subscriptionsList = command.getSubscriptionsList();
+        final List<org.apache.pulsar.common.api.proto.Subscription> 
subscriptionsList = new ArrayList<>();
+        for (org.apache.pulsar.common.api.proto.Subscription sub : 
command.getSubscriptionsList()) {
+            subscriptionsList.add(new 
org.apache.pulsar.common.api.proto.Subscription().copyFrom(sub));
+        }
         if (log.isDebugEnabled()) {
             log.debug("Receive add published partition to txn request {} from 
{} with txnId {}",
                     requestId, remoteAddress, txnID);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 96dd8a6cf10..5e4cd2fd0d1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.matches;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -75,7 +76,9 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TransactionMetadataStoreService;
 import org.apache.pulsar.broker.auth.MockAlwaysExpiredAuthenticationProvider;
+import org.apache.pulsar.broker.auth.MockAuthorizationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
 import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
 import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
@@ -93,11 +96,13 @@ import org.apache.pulsar.broker.service.ServerCnx.State;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
 import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
+import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.proto.AuthMethod;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.BaseCommand.Type;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
 import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 import org.apache.pulsar.common.api.proto.CommandAuthResponse;
 import org.apache.pulsar.common.api.proto.CommandCloseProducer;
@@ -2748,4 +2753,45 @@ public class ServerCnxTest {
         verify(authResponse, times(1)).hasClientVersion();
         verify(authResponse, times(0)).getClientVersion();
     }
+
+    @Test(timeOut = 30000)
+    public void sendAddPartitionToTxnResponseFailedAuth() throws Exception {
+        AuthenticationService authenticationService = 
mock(AuthenticationService.class);
+        AuthenticationProvider authenticationProvider = new 
MockAuthenticationProvider();
+        String authMethodName = authenticationProvider.getAuthMethodName();
+        
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+        
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+        svcConfig.setAuthenticationEnabled(true);
+        svcConfig.setProxyRoles(Collections.singleton("pass.fail"));
+
+        
svcConfig.setAuthorizationProvider(MockAuthorizationProvider.class.getName());
+        AuthorizationService authorizationService =
+                spyWithClassAndConstructorArgs(AuthorizationService.class, 
svcConfig,
+                        pulsar.getPulsarResources());
+        
when(brokerService.getAuthorizationService()).thenReturn(authorizationService);
+        svcConfig.setAuthorizationEnabled(true);
+
+        final TransactionMetadataStoreService txnStore = 
mock(TransactionMetadataStoreService.class);
+        when(txnStore.verifyTxnOwnership(any(), 
any())).thenReturn(CompletableFuture.completedFuture(false));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+
+        ByteBuf connect = Commands.newConnect(authMethodName, "pass.fail", 
"test", "localhost",
+                "pass.pass", "pass.pass", authMethodName);
+        channel.writeInbound(connect);
+        Object connectResponse = getResponse();
+        assertTrue(connectResponse instanceof CommandConnected);
+
+        ByteBuf clientCommand = Commands.newAddPartitionToTxn(89L, 1L, 12L,
+                Collections.singletonList("tenant/ns/topic1"));
+        channel.writeInbound(clientCommand);
+        CommandAddPartitionToTxnResponse response = 
(CommandAddPartitionToTxnResponse) getResponse();
+
+        assertEquals(response.getError(), ServerError.TransactionNotFound);
+        verify(txnStore, never()).addProducedPartitionToTxn(any(TxnID.class), 
any());
+
+        channel.finish();
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index 5b9bd5e74ac..779a0bdba08 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service.utils;
 
 import java.util.Queue;
+import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
 import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 import org.apache.pulsar.common.protocol.PulsarDecoder;
 import org.apache.pulsar.common.api.proto.CommandAck;
@@ -155,6 +156,12 @@ public class ClientChannelHelper {
         protected void handleLookupResponse(CommandLookupTopicResponse 
connection) {
             queue.offer(new CommandLookupTopicResponse().copyFrom(connection));
         }
+
+        @Override
+        protected void handleAddPartitionToTxnResponse(
+                CommandAddPartitionToTxnResponse 
commandAddPartitionToTxnResponse) {
+            queue.offer(new 
CommandAddPartitionToTxnResponse().copyFrom(commandAddPartitionToTxnResponse));
+        }
     };
 
 }

Reply via email to