This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 716db378453 [improve][txn] Cleanup how superusers abort txns (#19976)
716db378453 is described below
commit 716db378453fff6aa19b898b91ac1582c701fb70
Author: Michael Marshall <[email protected]>
AuthorDate: Fri Apr 7 00:34:39 2023 -0500
[improve][txn] Cleanup how superusers abort txns (#19976)
This PR builds on https://github.com/apache/pulsar/pull/19467. When we
modify/abort transactions, we need to make sure that authorization is checked
for both the proxy and the client.
* Add a second authorization check when `originalPrincipal` is set in the
`ServerCnx`.
* Fix a bug where we were not doing a deep copy of the `SubscriptionsList`
object. (Tests caught this bug!)
Added a new test to cover some of the changes.
This is an internal change.
- [x] `doc-not-needed`
PR in forked repository: https://github.com/michaeljmarshall/pulsar/pull/38
(cherry picked from commit f76beda21134aff24cc0384e5447ffbe33c5c039)
(cherry picked from commit 5a180f78d7636537198a758e1c9416e58d80bf42)
---
.../apache/pulsar/broker/service/ServerCnx.java | 49 +++++++++++++---------
.../pulsar/broker/service/ServerCnxTest.java | 46 ++++++++++++++++++++
.../broker/service/utils/ClientChannelHelper.java | 7 ++++
3 files changed, 83 insertions(+), 19 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c5359108f16..77c46897bab 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -42,6 +42,7 @@ import io.prometheus.client.Gauge;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
@@ -2325,32 +2326,39 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
});
}
- private CompletableFuture<Boolean>
verifyTxnOwnershipForTCToBrokerCommands() {
+ private CompletableFuture<Boolean> isSuperUser() {
+ assert ctx.executor().inEventLoop();
if (service.isAuthenticationEnabled() &&
service.isAuthorizationEnabled()) {
- return getBrokerService()
- .getAuthorizationService()
- .isSuperUser(getPrincipal(), getAuthenticationData());
+ CompletableFuture<Boolean> isAuthRoleAuthorized =
service.getAuthorizationService().isSuperUser(
+ authRole, authenticationData);
+ if (originalPrincipal != null) {
+ CompletableFuture<Boolean> isOriginalPrincipalAuthorized =
service.getAuthorizationService()
+ .isSuperUser(originalPrincipal,
+ originalAuthData != null ? originalAuthData :
authenticationData);
+ return
isOriginalPrincipalAuthorized.thenCombine(isAuthRoleAuthorized,
+ (originalPrincipal, authRole) -> originalPrincipal &&
authRole);
+ } else {
+ return isAuthRoleAuthorized;
+ }
} else {
return CompletableFuture.completedFuture(true);
}
}
private CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID) {
- final String checkOwner = getPrincipal();
+ assert ctx.executor().inEventLoop();
return service.pulsar().getTransactionMetadataStoreService()
- .verifyTxnOwnership(txnID, checkOwner)
- .thenCompose(isOwner -> {
+ .verifyTxnOwnership(txnID, getPrincipal())
+ .thenComposeAsync(isOwner -> {
if (isOwner) {
return CompletableFuture.completedFuture(true);
}
if (service.isAuthenticationEnabled() &&
service.isAuthorizationEnabled()) {
- return getBrokerService()
- .getAuthorizationService()
- .isSuperUser(checkOwner,
getAuthenticationData());
+ return isSuperUser();
} else {
return CompletableFuture.completedFuture(false);
}
- });
+ }, ctx.executor());
}
@Override
@@ -2366,10 +2374,10 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
txnID, txnAction);
}
CompletableFuture<Optional<Topic>> topicFuture =
service.getTopicIfExists(TopicName.get(topic).toString());
- topicFuture.thenAccept(optionalTopic -> {
+ topicFuture.thenAcceptAsync(optionalTopic -> {
if (optionalTopic.isPresent()) {
- // we only accept super user becase this endpoint is reserved
for tc to broker communication
- verifyTxnOwnershipForTCToBrokerCommands()
+ // we only accept superuser because this endpoint is reserved
for tc to broker communication
+ isSuperUser()
.thenCompose(isOwner -> {
if (!isOwner) {
return failedFutureTxnTcNotAllowed(txnID);
@@ -2419,7 +2427,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return null;
});
}
- }).exceptionally(e -> {
+ }, ctx.executor()).exceptionally(e -> {
log.error("handleEndTxnOnPartition fail ! topic {}, "
+ "txnId: [{}], txnAction: [{}]", topic, txnID,
TxnAction.valueOf(txnAction), e.getCause());
@@ -2447,7 +2455,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
CompletableFuture<Optional<Topic>> topicFuture =
service.getTopicIfExists(TopicName.get(topic).toString());
- topicFuture.thenAccept(optionalTopic -> {
+ topicFuture.thenAcceptAsync(optionalTopic -> {
if (optionalTopic.isPresent()) {
Subscription subscription =
optionalTopic.get().getSubscription(subName);
if (subscription == null) {
@@ -2459,7 +2467,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
// we only accept super user becase this endpoint is reserved
for tc to broker communication
- verifyTxnOwnershipForTCToBrokerCommands()
+ isSuperUser()
.thenCompose(isOwner -> {
if (!isOwner) {
return failedFutureTxnTcNotAllowed(txnID);
@@ -2509,7 +2517,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return null;
});
}
- }).exceptionally(e -> {
+ }, ctx.executor()).exceptionally(e -> {
log.error("handleEndTxnOnSubscription fail ! topic: {},
subscription: {}"
+ "txnId: [{}], txnAction: [{}]", topic, subName,
txnID, TxnAction.valueOf(txnAction), e.getCause());
@@ -2544,7 +2552,10 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn
command) {
final TxnID txnID = new TxnID(command.getTxnidMostBits(),
command.getTxnidLeastBits());
final long requestId = command.getRequestId();
- final List<org.apache.pulsar.common.api.proto.Subscription>
subscriptionsList = command.getSubscriptionsList();
+ final List<org.apache.pulsar.common.api.proto.Subscription>
subscriptionsList = new ArrayList<>();
+ for (org.apache.pulsar.common.api.proto.Subscription sub :
command.getSubscriptionsList()) {
+ subscriptionsList.add(new
org.apache.pulsar.common.api.proto.Subscription().copyFrom(sub));
+ }
if (log.isDebugEnabled()) {
log.debug("Receive add published partition to txn request {} from
{} with txnId {}",
requestId, remoteAddress, txnID);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 96dd8a6cf10..5e4cd2fd0d1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.matches;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -75,7 +76,9 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.auth.MockAlwaysExpiredAuthenticationProvider;
+import org.apache.pulsar.broker.auth.MockAuthorizationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
@@ -93,11 +96,13 @@ import org.apache.pulsar.broker.service.ServerCnx.State;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
+import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.AuthMethod;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.BaseCommand.Type;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
@@ -2748,4 +2753,45 @@ public class ServerCnxTest {
verify(authResponse, times(1)).hasClientVersion();
verify(authResponse, times(0)).getClientVersion();
}
+
+ @Test(timeOut = 30000)
+ public void sendAddPartitionToTxnResponseFailedAuth() throws Exception {
+ AuthenticationService authenticationService =
mock(AuthenticationService.class);
+ AuthenticationProvider authenticationProvider = new
MockAuthenticationProvider();
+ String authMethodName = authenticationProvider.getAuthMethodName();
+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+ svcConfig.setAuthenticationEnabled(true);
+ svcConfig.setProxyRoles(Collections.singleton("pass.fail"));
+
+
svcConfig.setAuthorizationProvider(MockAuthorizationProvider.class.getName());
+ AuthorizationService authorizationService =
+ spyWithClassAndConstructorArgs(AuthorizationService.class,
svcConfig,
+ pulsar.getPulsarResources());
+
when(brokerService.getAuthorizationService()).thenReturn(authorizationService);
+ svcConfig.setAuthorizationEnabled(true);
+
+ final TransactionMetadataStoreService txnStore =
mock(TransactionMetadataStoreService.class);
+ when(txnStore.verifyTxnOwnership(any(),
any())).thenReturn(CompletableFuture.completedFuture(false));
+ when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+ svcConfig.setTransactionCoordinatorEnabled(true);
+ resetChannel();
+
+ ByteBuf connect = Commands.newConnect(authMethodName, "pass.fail",
"test", "localhost",
+ "pass.pass", "pass.pass", authMethodName);
+ channel.writeInbound(connect);
+ Object connectResponse = getResponse();
+ assertTrue(connectResponse instanceof CommandConnected);
+
+ ByteBuf clientCommand = Commands.newAddPartitionToTxn(89L, 1L, 12L,
+ Collections.singletonList("tenant/ns/topic1"));
+ channel.writeInbound(clientCommand);
+ CommandAddPartitionToTxnResponse response =
(CommandAddPartitionToTxnResponse) getResponse();
+
+ assertEquals(response.getError(), ServerError.TransactionNotFound);
+ verify(txnStore, never()).addProducedPartitionToTxn(any(TxnID.class),
any());
+
+ channel.finish();
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index 5b9bd5e74ac..779a0bdba08 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.utils;
import java.util.Queue;
+import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
import org.apache.pulsar.common.protocol.PulsarDecoder;
import org.apache.pulsar.common.api.proto.CommandAck;
@@ -155,6 +156,12 @@ public class ClientChannelHelper {
protected void handleLookupResponse(CommandLookupTopicResponse
connection) {
queue.offer(new CommandLookupTopicResponse().copyFrom(connection));
}
+
+ @Override
+ protected void handleAddPartitionToTxnResponse(
+ CommandAddPartitionToTxnResponse
commandAddPartitionToTxnResponse) {
+ queue.offer(new
CommandAddPartitionToTxnResponse().copyFrom(commandAddPartitionToTxnResponse));
+ }
};
}