This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 459a7a57c1b [improve][txn] Allow superusers to abort transactions 
(#19467)
459a7a57c1b is described below

commit 459a7a57c1b67cfe161cdc40c007a1c2e403b7cd
Author: Nicolò Boschi <[email protected]>
AuthorDate: Thu Feb 9 02:08:19 2023 +0100

    [improve][txn] Allow superusers to abort transactions (#19467)
    
    ### Motivation
    
    Super users must be always allowed to abort a transaction even if they're 
not the original owner.
    
    ### Modifications
    
    * Check that only owner or superusers are allowed to perform txn operations 
(end, add partition and add subscription)
---
 .../broker/TransactionMetadataStoreService.java    |  20 +-
 .../pulsar/broker/admin/impl/TransactionsBase.java |   1 +
 .../apache/pulsar/broker/service/ServerCnx.java    | 140 +++++++--
 .../pulsar/broker/service/ServerCnxTest.java       |  21 ++
 .../TransactionMetadataStoreServiceTest.java       |  36 ++-
 .../broker/stats/TransactionMetricsTest.java       |   8 +-
 ...thenticatedTransactionProducerConsumerTest.java | 330 +++++++++++++++++++++
 .../broker/transaction/TransactionTestBase.java    |  21 +-
 .../client/impl/TransactionEndToEndTest.java       |   2 +-
 .../common/policies/data/TransactionMetadata.java  |   3 +
 .../coordinator/TransactionMetadataStore.java      |   3 +-
 .../pulsar/transaction/coordinator/TxnMeta.java    |   7 +
 .../impl/InMemTransactionMetadataStore.java        |  10 +-
 .../impl/MLTransactionMetadataStore.java           | 104 ++++---
 .../transaction/coordinator/impl/TxnMetaImpl.java  |   8 +-
 .../src/main/proto/PulsarTransactionMetadata.proto |   1 +
 .../MLTransactionMetadataStoreTest.java            |  33 +--
 .../TransactionMetadataStoreProviderTest.java      |  12 +-
 18 files changed, 630 insertions(+), 130 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index d0cf22a8653..3e3b044ec51 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -256,12 +256,13 @@ public class TransactionMetadataStoreService {
         }
     }
 
-    public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID 
tcId, long timeoutInMills) {
+    public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID 
tcId, long timeoutInMills,
+                                                   String owner) {
         TransactionMetadataStore store = stores.get(tcId);
         if (store == null) {
             return FutureUtil.failedFuture(new 
CoordinatorNotFoundException(tcId));
         }
-        return store.newTransaction(timeoutInMills);
+        return store.newTransaction(timeoutInMills, owner);
     }
 
     public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnId, 
List<String> partitions) {
@@ -483,6 +484,21 @@ public class TransactionMetadataStoreService {
         return Collections.unmodifiableMap(stores);
     }
 
+    public CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID, String 
checkOwner) {
+        return getTxnMeta(txnID)
+                .thenCompose(meta -> {
+                    // owner was null in the old versions or no auth enabled
+                    if (meta.getOwner() == null) {
+                        return CompletableFuture.completedFuture(true);
+                    }
+                    if (meta.getOwner().equals(checkOwner)) {
+                        return CompletableFuture.completedFuture(true);
+                    }
+                    return CompletableFuture.completedFuture(false);
+                });
+    }
+
+
     public void close () {
         this.internalPinnedExecutor.shutdown();
         stores.forEach((tcId, metadataStore) -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index f537f0ecdb9..d596cbdd39d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -215,6 +215,7 @@ public abstract class TransactionsBase extends 
AdminResource {
         transactionMetadata.status = txnMeta.status().name();
         transactionMetadata.openTimestamp = txnMeta.getOpenTimestamp();
         transactionMetadata.timeoutAt = txnMeta.getTimeoutAt();
+        transactionMetadata.owner = txnMeta.getOwner();
 
         List<CompletableFuture<TransactionInPendingAckStats>> 
ackedPartitionsFutures = new ArrayList<>();
         Map<String, Map<String, 
CompletableFuture<TransactionInPendingAckStats>>> ackFutures = new HashMap<>();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index d398dbba9b9..9cbdbcf1ae4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2399,7 +2399,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
         TransactionMetadataStoreService transactionMetadataStoreService =
                 service.pulsar().getTransactionMetadataStoreService();
-        transactionMetadataStoreService.newTransaction(tcId, 
command.getTxnTtlSeconds())
+        final String owner = getPrincipal();
+        transactionMetadataStoreService.newTransaction(tcId, 
command.getTxnTtlSeconds(), owner)
             .whenComplete(((txnID, ex) -> {
                 if (ex == null) {
                     if (log.isDebugEnabled()) {
@@ -2443,9 +2444,15 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
         TransactionMetadataStoreService transactionMetadataStoreService =
                 service.pulsar().getTransactionMetadataStoreService();
-        
service.pulsar().getTransactionMetadataStoreService().addProducedPartitionToTxn(txnID,
-                command.getPartitionsList())
-                .whenComplete(((v, ex) -> {
+        verifyTxnOwnership(txnID)
+                .thenCompose(isOwner -> {
+                    if (!isOwner) {
+                        return failedFutureTxnNotOwned(txnID);
+                    }
+                    return transactionMetadataStoreService
+                            .addProducedPartitionToTxn(txnID, 
command.getPartitionsList());
+                })
+                .whenComplete((v, ex) -> {
                     if (ex == null) {
                         if (log.isDebugEnabled()) {
                             log.debug("Send response success for add published 
partition to txn request {}", requestId);
@@ -2462,7 +2469,25 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                 ex.getMessage()));
                         transactionMetadataStoreService.handleOpFail(ex, tcId);
                     }
-            }));
+                });
+    }
+
+    private CompletableFuture<Void> failedFutureTxnNotOwned(TxnID txnID) {
+        String msg = String.format(
+                "Client (%s) is neither the owner of the transaction %s nor a 
super user",
+                getPrincipal(), txnID
+        );
+        log.warn("[{}] {}", remoteAddress, msg);
+        return CompletableFuture.failedFuture(new 
CoordinatorException.TransactionNotFoundException(msg));
+    }
+
+    private CompletableFuture<Void> failedFutureTxnTcNotAllowed(TxnID txnID) {
+        String msg = String.format(
+                "TC client (%s) is not a super user, and is not allowed to 
operate on transaction %s",
+                getPrincipal(), txnID
+        );
+        log.warn("[{}] {}", remoteAddress, msg);
+        return CompletableFuture.failedFuture(new 
CoordinatorException.TransactionNotFoundException(msg));
     }
 
     @Override
@@ -2480,12 +2505,16 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         TransactionMetadataStoreService transactionMetadataStoreService =
                 service.pulsar().getTransactionMetadataStoreService();
 
-        transactionMetadataStoreService
-                .endTransaction(txnID, txnAction, false)
+        verifyTxnOwnership(txnID)
+                .thenCompose(isOwner -> {
+                    if (!isOwner) {
+                        return failedFutureTxnNotOwned(txnID);
+                    }
+                    return 
transactionMetadataStoreService.endTransaction(txnID, txnAction, false);
+                })
                 .whenComplete((v, ex) -> {
                     if (ex == null) {
-                        commandSender.sendEndTxnResponse(requestId,
-                                txnID, txnAction);
+                        commandSender.sendEndTxnResponse(requestId, txnID, 
txnAction);
                     } else {
                         ex = handleTxnException(ex, 
BaseCommand.Type.END_TXN.name(), requestId);
                         commandSender.sendEndTxnErrorResponse(requestId, txnID,
@@ -2496,6 +2525,34 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 });
     }
 
+    private CompletableFuture<Boolean> 
verifyTxnOwnershipForTCToBrokerCommands() {
+        if (service.isAuthenticationEnabled() && 
service.isAuthorizationEnabled()) {
+            return getBrokerService()
+                    .getAuthorizationService()
+                    .isSuperUser(getPrincipal(), getAuthenticationData());
+        } else {
+            return CompletableFuture.completedFuture(true);
+        }
+    }
+
+    private CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID) {
+        final String checkOwner = getPrincipal();
+        return service.pulsar().getTransactionMetadataStoreService()
+                .verifyTxnOwnership(txnID, checkOwner)
+                .thenCompose(isOwner -> {
+                    if (isOwner) {
+                        return CompletableFuture.completedFuture(true);
+                    }
+                    if (service.isAuthenticationEnabled() && 
service.isAuthorizationEnabled()) {
+                        return getBrokerService()
+                                .getAuthorizationService()
+                                .isSuperUser(checkOwner, 
getAuthenticationData());
+                    } else {
+                        return CompletableFuture.completedFuture(false);
+                    }
+                });
+    }
+
     @Override
     protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
         checkArgument(state == State.Connected);
@@ -2512,9 +2569,17 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         CompletableFuture<Optional<Topic>> topicFuture = 
service.getTopicIfExists(TopicName.get(topic).toString());
         topicFuture.thenAccept(optionalTopic -> {
             if (optionalTopic.isPresent()) {
-                optionalTopic.get().endTxn(txnID, txnAction, lowWaterMark)
+                // we only accept super user becase this endpoint is reserved 
for tc to broker communication
+                verifyTxnOwnershipForTCToBrokerCommands()
+                        .thenCompose(isOwner -> {
+                            if (!isOwner) {
+                                return failedFutureTxnTcNotAllowed(txnID);
+                            }
+                            return optionalTopic.get().endTxn(txnID, 
txnAction, lowWaterMark);
+                        })
                         .whenComplete((ignored, throwable) -> {
                             if (throwable != null) {
+                                throwable = 
FutureUtil.unwrapCompletionException(throwable);
                                 log.error("handleEndTxnOnPartition fail!, 
topic {}, txnId: [{}], "
                                         + "txnAction: [{}]", topic, txnID, 
TxnAction.valueOf(txnAction), throwable);
                                 
writeAndFlush(Commands.newEndTxnOnPartitionResponse(
@@ -2526,7 +2591,6 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                             
writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
                                     txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
                         });
-
             } else {
                 getBrokerService().getManagedLedgerFactory()
                         
.asyncExists(TopicName.get(topic).getPersistenceNamingEncoding())
@@ -2596,23 +2660,28 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                             
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, 
txnidMostBits));
                     return;
                 }
-
-                CompletableFuture<Void> completableFuture =
-                        subscription.endTxn(txnidMostBits, txnidLeastBits, 
txnAction, lowWaterMark);
-                completableFuture.whenComplete((ignored, e) -> {
-                    if (e != null) {
-                        log.error("handleEndTxnOnSubscription fail ! topic: 
{}, subscription: {}"
-                                        + "txnId: [{}], txnAction: [{}]", 
topic, subName,
-                                txnID, TxnAction.valueOf(txnAction), 
e.getCause());
-                        writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
-                                requestId, txnidLeastBits, txnidMostBits,
-                                BrokerServiceException.getClientErrorCode(e),
-                                "Handle end txn on subscription failed: " + 
e.getMessage()));
-                        return;
-                    }
-                    writeAndFlush(
-                            
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, 
txnidMostBits));
-                });
+                // we only accept super user becase this endpoint is reserved 
for tc to broker communication
+                verifyTxnOwnershipForTCToBrokerCommands()
+                        .thenCompose(isOwner -> {
+                            if (!isOwner) {
+                                return failedFutureTxnTcNotAllowed(txnID);
+                            }
+                            return subscription.endTxn(txnidMostBits, 
txnidLeastBits, txnAction, lowWaterMark);
+                        }).whenComplete((ignored, e) -> {
+                            if (e != null) {
+                                e = FutureUtil.unwrapCompletionException(e);
+                                log.error("handleEndTxnOnSubscription fail ! 
topic: {}, subscription: {}"
+                                                + "txnId: [{}], txnAction: 
[{}]", topic, subName,
+                                        txnID, TxnAction.valueOf(txnAction), 
e.getCause());
+                                
writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+                                        requestId, txnidLeastBits, 
txnidMostBits,
+                                        
BrokerServiceException.getClientErrorCode(e),
+                                        "Handle end txn on subscription 
failed: " + e.getMessage()));
+                                return;
+                            }
+                            writeAndFlush(
+                                    
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, 
txnidMostBits));
+                        });
             } else {
                 getBrokerService().getManagedLedgerFactory()
                         
.asyncExists(TopicName.get(topic).getPersistenceNamingEncoding())
@@ -2679,6 +2748,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         checkArgument(state == State.Connected);
         final TxnID txnID = new TxnID(command.getTxnidMostBits(), 
command.getTxnidLeastBits());
         final long requestId = command.getRequestId();
+        final List<org.apache.pulsar.common.api.proto.Subscription> 
subscriptionsList = command.getSubscriptionsList();
         if (log.isDebugEnabled()) {
             log.debug("Receive add published partition to txn request {} from 
{} with txnId {}",
                     requestId, remoteAddress, txnID);
@@ -2693,9 +2763,15 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         TransactionMetadataStoreService transactionMetadataStoreService =
                 service.pulsar().getTransactionMetadataStoreService();
 
-        transactionMetadataStoreService.addAckedPartitionToTxn(txnID,
-                
MLTransactionMetadataStore.subscriptionToTxnSubscription(command.getSubscriptionsList()))
-                .whenComplete(((v, ex) -> {
+        verifyTxnOwnership(txnID)
+                .thenCompose(isOwner -> {
+                    if (!isOwner) {
+                        return failedFutureTxnNotOwned(txnID);
+                    }
+                    return 
transactionMetadataStoreService.addAckedPartitionToTxn(txnID,
+                            
MLTransactionMetadataStore.subscriptionToTxnSubscription(subscriptionsList));
+                })
+                .whenComplete((v, ex) -> {
                     if (ex == null) {
                         if (log.isDebugEnabled()) {
                             log.debug("Send response success for add published 
partition to txn request {}",
@@ -2711,7 +2787,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                 ex.getMessage()));
                         transactionMetadataStoreService.handleOpFail(ex, tcId);
                     }
-                }));
+                });
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index b8032f9be3a..b7e2839832c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -134,6 +134,7 @@ import org.apache.pulsar.common.protocol.PulsarHandler;
 import org.apache.pulsar.common.topics.TopicList;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import org.awaitility.Awaitility;
 import org.mockito.ArgumentCaptor;
 import org.mockito.MockedStatic;
@@ -3033,6 +3034,8 @@ public class ServerCnxTest {
     @Test(timeOut = 30000)
     public void sendAddPartitionToTxnResponse() throws Exception {
         final TransactionMetadataStoreService txnStore = 
mock(TransactionMetadataStoreService.class);
+        
when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+        when(txnStore.verifyTxnOwnership(any(), 
any())).thenReturn(CompletableFuture.completedFuture(true));
         when(txnStore.addProducedPartitionToTxn(any(TxnID.class), any()))
                 .thenReturn(CompletableFuture.completedFuture(null));
         when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
@@ -3056,6 +3059,8 @@ public class ServerCnxTest {
     @Test(timeOut = 30000)
     public void sendAddPartitionToTxnResponseFailed() throws Exception {
         final TransactionMetadataStoreService txnStore = 
mock(TransactionMetadataStoreService.class);
+        
when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+        when(txnStore.verifyTxnOwnership(any(), 
any())).thenReturn(CompletableFuture.completedFuture(true));
         when(txnStore.addProducedPartitionToTxn(any(TxnID.class), any()))
                 .thenReturn(CompletableFuture.failedFuture(new 
RuntimeException("server error")));
         when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
@@ -3079,6 +3084,8 @@ public class ServerCnxTest {
     @Test(timeOut = 30000)
     public void sendAddSubscriptionToTxnResponse() throws Exception {
         final TransactionMetadataStoreService txnStore = 
mock(TransactionMetadataStoreService.class);
+        
when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+        when(txnStore.verifyTxnOwnership(any(), 
any())).thenReturn(CompletableFuture.completedFuture(true));
         when(txnStore.addAckedPartitionToTxn(any(TxnID.class), any()))
                 .thenReturn(CompletableFuture.completedFuture(null));
         when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
@@ -3105,6 +3112,8 @@ public class ServerCnxTest {
     @Test(timeOut = 30000)
     public void sendAddSubscriptionToTxnResponseFailed() throws Exception {
         final TransactionMetadataStoreService txnStore = 
mock(TransactionMetadataStoreService.class);
+        
when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+        when(txnStore.verifyTxnOwnership(any(), 
any())).thenReturn(CompletableFuture.completedFuture(true));
         when(txnStore.addAckedPartitionToTxn(any(TxnID.class), any()))
                 .thenReturn(CompletableFuture.failedFuture(new 
RuntimeException("server error")));
         when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
@@ -3132,6 +3141,8 @@ public class ServerCnxTest {
     @Test(timeOut = 30000)
     public void sendEndTxnResponse() throws Exception {
         final TransactionMetadataStoreService txnStore = 
mock(TransactionMetadataStoreService.class);
+        
when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+        when(txnStore.verifyTxnOwnership(any(), 
any())).thenReturn(CompletableFuture.completedFuture(true));
         when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
                 .thenReturn(CompletableFuture.completedFuture(null));
         when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
@@ -3155,6 +3166,8 @@ public class ServerCnxTest {
     @Test(timeOut = 30000)
     public void sendEndTxnResponseFailed() throws Exception {
         final TransactionMetadataStoreService txnStore = 
mock(TransactionMetadataStoreService.class);
+        
when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+        when(txnStore.verifyTxnOwnership(any(), 
any())).thenReturn(CompletableFuture.completedFuture(true));
         when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
                 .thenReturn(CompletableFuture.failedFuture(new 
RuntimeException("server error")));
         when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
@@ -3178,6 +3191,8 @@ public class ServerCnxTest {
     @Test(timeOut = 30000)
     public void sendEndTxnOnPartitionResponse() throws Exception {
         final TransactionMetadataStoreService txnStore = 
mock(TransactionMetadataStoreService.class);
+        
when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+        when(txnStore.verifyTxnOwnership(any(), 
any())).thenReturn(CompletableFuture.completedFuture(true));
         when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
                 .thenReturn(CompletableFuture.completedFuture(null));
         when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
@@ -3206,6 +3221,8 @@ public class ServerCnxTest {
     @Test(timeOut = 30000)
     public void sendEndTxnOnPartitionResponseFailed() throws Exception {
         final TransactionMetadataStoreService txnStore = 
mock(TransactionMetadataStoreService.class);
+        
when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+        when(txnStore.verifyTxnOwnership(any(), 
any())).thenReturn(CompletableFuture.completedFuture(true));
         when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
                 .thenReturn(CompletableFuture.completedFuture(null));
         when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
@@ -3235,6 +3252,8 @@ public class ServerCnxTest {
     @Test(timeOut = 30000)
     public void sendEndTxnOnSubscription() throws Exception {
         final TransactionMetadataStoreService txnStore = 
mock(TransactionMetadataStoreService.class);
+        
when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+        when(txnStore.verifyTxnOwnership(any(), 
any())).thenReturn(CompletableFuture.completedFuture(true));
         when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
                 .thenReturn(CompletableFuture.completedFuture(null));
         when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
@@ -3269,6 +3288,8 @@ public class ServerCnxTest {
     @Test(timeOut = 30000)
     public void sendEndTxnOnSubscriptionFailed() throws Exception {
         final TransactionMetadataStoreService txnStore = 
mock(TransactionMetadataStoreService.class);
+        
when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+        when(txnStore.verifyTxnOwnership(any(), 
any())).thenReturn(CompletableFuture.completedFuture(true));
         when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
                 .thenReturn(CompletableFuture.completedFuture(null));
         when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
index 06fdc13b98c..5cd3ed9f904 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
@@ -104,9 +104,9 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
                 .getStores().get(TransactionCoordinatorID.get(1)));
         checkTransactionMetadataStoreReady((MLTransactionMetadataStore) 
pulsar.getTransactionMetadataStoreService()
                 .getStores().get(TransactionCoordinatorID.get(2)));
-        TxnID txnID0 = 
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 
5).get();
-        TxnID txnID1 = 
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(1), 
5).get();
-        TxnID txnID2 = 
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(2), 
5).get();
+        TxnID txnID0 = 
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 
5, null).get();
+        TxnID txnID1 = 
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(1), 
5, null).get();
+        TxnID txnID2 = 
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(2), 
5, null).get();
         Assert.assertEquals(txnID0.getMostSigBits(), 0);
         Assert.assertEquals(txnID1.getMostSigBits(), 1);
         Assert.assertEquals(txnID2.getMostSigBits(), 2);
@@ -128,7 +128,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
                         .getStores().get(TransactionCoordinatorID.get(0));
 
         checkTransactionMetadataStoreReady(transactionMetadataStore);
-        TxnID txnID = 
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 
5000).get();
+        TxnID txnID = 
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 
5000, null).get();
         List<String> partitions = new ArrayList<>();
         partitions.add("ptn-0");
         partitions.add("ptn-1");
@@ -151,7 +151,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
                 (MLTransactionMetadataStore) 
pulsar.getTransactionMetadataStoreService()
                         .getStores().get(TransactionCoordinatorID.get(0));
         checkTransactionMetadataStoreReady(transactionMetadataStore);
-        TxnID txnID = 
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 
5000).get();
+        TxnID txnID = 
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 
5000, null).get();
         List<TransactionSubscription> partitions = new ArrayList<>();
         
partitions.add(TransactionSubscription.builder().topic("ptn-1").subscription("sub-1").build());
         
partitions.add(TransactionSubscription.builder().topic("ptn-2").subscription("sub-1").build());
@@ -180,7 +180,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
         int i = -1;
         while (++i < 1000) {
             try {
-                transactionMetadataStore.newTransaction(2000).get();
+                newTransactionWithTimeoutOf(2000);
             } catch (Exception e) {
                 //no operation
             }
@@ -192,6 +192,14 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
                 .until(() -> txnMap.size() == 0);
     }
 
+    private TxnID newTransactionWithTimeoutOf(long timeout)
+            throws InterruptedException, ExecutionException {
+        MLTransactionMetadataStore transactionMetadataStore =
+                (MLTransactionMetadataStore) 
pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+        return transactionMetadataStore.newTransaction(timeout, null).get();
+    }
+
     @Test
     public void testTimeoutTrackerExpired() throws Exception {
         
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0));
@@ -206,7 +214,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
         ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
                 (ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) 
field.get(transactionMetadataStore);
 
-        transactionMetadataStore.newTransaction(2000).get();
+        newTransactionWithTimeoutOf(2000);
 
         assertEquals(txnMap.size(), 1);
 
@@ -214,7 +222,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
                 Assert.assertEquals(txnMetaListPair.getLeft().status(), 
TxnStatus.OPEN));
         Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() -> 
txnMap.size() == 0);
 
-        transactionMetadataStore.newTransaction(2000).get();
+        newTransactionWithTimeoutOf(2000);
         assertEquals(txnMap.size(), 1);
 
         txnMap.forEach((txnID, txnMetaListPair) ->
@@ -241,7 +249,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
             int i = -1;
             while (++i < 100) {
                 try {
-                    transactionMetadataStore.newTransaction(1000);
+                    newTransactionWithTimeoutOf(1000);
                 } catch (Exception e) {
                     //no operation
                 }
@@ -252,7 +260,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
             int i = -1;
             while (++i < 100) {
                 try {
-                    transactionMetadataStore.newTransaction(2000);
+                    newTransactionWithTimeoutOf(2000);
                 } catch (Exception e) {
                     //no operation
                 }
@@ -263,7 +271,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
             int i = -1;
             while (++i < 100) {
                 try {
-                    transactionMetadataStore.newTransaction(3000);
+                    newTransactionWithTimeoutOf(3000);
                 } catch (Exception e) {
                     //no operation
                 }
@@ -274,7 +282,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
             int i = -1;
             while (++i < 100) {
                 try {
-                    transactionMetadataStore.newTransaction(4000);
+                    newTransactionWithTimeoutOf(4000);
                 } catch (Exception e) {
                     //no operation
                 }
@@ -304,7 +312,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
                         .getStores().get(TransactionCoordinatorID.get(0));
         checkTransactionMetadataStoreReady(transactionMetadataStore);
 
-        transactionMetadataStore.newTransaction(timeout);
+        newTransactionWithTimeoutOf(2000);
 
         pulsar.getTransactionMetadataStoreService()
                 
.removeTransactionMetadataStore(TransactionCoordinatorID.get(0));
@@ -345,7 +353,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
 
         checkTransactionMetadataStoreReady(transactionMetadataStore);
 
-        TxnID txnID = transactionMetadataStore.newTransaction(timeOut - 
2000).get();
+        TxnID txnID = newTransactionWithTimeoutOf(timeOut - 2000);
         TxnMeta txnMeta = transactionMetadataStore.getTxnMeta(txnID).get();
         txnMeta.updateTxnStatus(txnStatus, TxnStatus.OPEN);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index 37d1f4b0860..30aedc02253 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -111,11 +111,11 @@ public class TransactionMetricsTest extends 
BrokerTestBase {
         Awaitility.await().until(() ->
                 pulsar.getTransactionMetadataStoreService().getStores().size() 
== 2);
         pulsar.getTransactionMetadataStoreService().getStores()
-                
.get(transactionCoordinatorIDOne).newTransaction(timeout).get();
+                .get(transactionCoordinatorIDOne).newTransaction(timeout, 
null).get();
         pulsar.getTransactionMetadataStoreService().getStores()
-                
.get(transactionCoordinatorIDTwo).newTransaction(timeout).get();
+                .get(transactionCoordinatorIDTwo).newTransaction(timeout, 
null).get();
         pulsar.getTransactionMetadataStoreService().getStores()
-                
.get(transactionCoordinatorIDTwo).newTransaction(timeout).get();
+                .get(transactionCoordinatorIDTwo).newTransaction(timeout, 
null).get();
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
@@ -202,7 +202,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         metric.forEach(item -> assertEquals(item.value, txnCount / 2));
 
         TxnID txnID = pulsar.getTransactionMetadataStoreService().getStores()
-                .get(transactionCoordinatorIDOne).newTransaction(1000).get();
+                .get(transactionCoordinatorIDOne).newTransaction(1000, 
null).get();
 
         Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).until(() -> {
             try {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.java
new file mode 100644
index 00000000000..aa9102189d4
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.time.Duration;
+import java.util.Base64;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.utils.Exceptions;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test for consuming transaction messages.
+ */
+@Slf4j
+@Test(groups = "broker")
+public class AuthenticatedTransactionProducerConsumerTest extends 
TransactionTestBase {
+
+    private static final String TOPIC = NAMESPACE1 + "/txn-auth";
+
+    private final String ADMIN_TOKEN;
+    private final String TOKEN_PUBLIC_KEY;
+    private final KeyPair kp;
+
+    AuthenticatedTransactionProducerConsumerTest() throws 
NoSuchAlgorithmException {
+        KeyPairGenerator kpg = KeyPairGenerator.getInstance("RSA");
+        kp = kpg.generateKeyPair();
+
+        byte[] encodedPublicKey = kp.getPublic().getEncoded();
+        TOKEN_PUBLIC_KEY = "data:;base64," + 
Base64.getEncoder().encodeToString(encodedPublicKey);
+        ADMIN_TOKEN = generateToken(kp, "admin");
+    }
+
+
+    private String generateToken(KeyPair kp, String subject) {
+        PrivateKey pkey = kp.getPrivate();
+        long expMillis = System.currentTimeMillis() + 
Duration.ofHours(1).toMillis();
+        Date exp = new Date(expMillis);
+
+        return Jwts.builder()
+                .setSubject(subject)
+                .setExpiration(exp)
+                .signWith(pkey, SignatureAlgorithm.forSigningKey(pkey))
+                .compact();
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup() throws Exception {
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        conf.setAuthenticationProviders(providers);
+
+        // Set provider domain name
+        Properties properties = new Properties();
+        properties.setProperty("tokenPublicKey", TOKEN_PUBLIC_KEY);
+
+        conf.setProperties(properties);
+        
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
+        setBrokerCount(1);
+        internalSetup();
+        setUpBase(1, 1, TOPIC, 1);
+
+        grantTxnLookupToRole("client");
+        admin.namespaces().grantPermissionOnNamespace(NAMESPACE1, "client",
+                EnumSet.allOf(AuthAction.class));
+        grantTxnLookupToRole("client2");
+    }
+
+    @SneakyThrows
+    private void grantTxnLookupToRole(String role) {
+        admin.namespaces().grantPermissionOnNamespace(
+                NamespaceName.SYSTEM_NAMESPACE.toString(),
+                role,
+                Sets.newHashSet(AuthAction.consume));
+    }
+
+    @Override
+    protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) 
throws PulsarClientException {
+        return clientBuilder
+                .enableTransaction(true)
+                .authentication(AuthenticationFactory.token(ADMIN_TOKEN))
+                .build();
+    }
+
+    @Override
+    protected PulsarAdmin createNewPulsarAdmin(PulsarAdminBuilder builder) 
throws PulsarClientException {
+        return builder
+                .authentication(AuthenticationFactory.token(ADMIN_TOKEN))
+                .build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() {
+        super.internalCleanup();
+    }
+
+    @DataProvider(name = "actors")
+    public Object[][] actors() {
+        return new Object[][]{
+                {"client", true},
+                {"client", false},
+                {"client2", true},
+                {"client2", false},
+                {"admin", true},
+                {"admin", false}
+        };
+    }
+
+    @Test(dataProvider = "actors")
+    public void testEndTxn(String actor, boolean afterUnload) throws Exception 
{
+        @Cleanup final PulsarClient pulsarClientOwner = PulsarClient.builder()
+                .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+                .authentication(AuthenticationFactory.token(generateToken(kp, 
"client")))
+                .enableTransaction(true)
+                .build();
+
+        @Cleanup final PulsarClient pulsarClientOther = PulsarClient.builder()
+                .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+                .authentication(AuthenticationFactory.token(generateToken(kp, 
actor)))
+                .enableTransaction(true)
+                .build();
+        Transaction transaction = pulsarClientOwner.newTransaction()
+                .withTransactionTimeout(60, TimeUnit.SECONDS).build().get();
+
+        @Cleanup final Consumer<String> consumer = pulsarClientOwner
+                .newConsumer(Schema.STRING)
+                .subscriptionName("test")
+                .topic(TOPIC)
+                .subscribe();
+
+
+        @Cleanup final Producer<String> producer = pulsarClientOwner
+                .newProducer(Schema.STRING)
+                .sendTimeout(60, TimeUnit.SECONDS)
+                .topic(TOPIC)
+                .create();
+
+        producer.newMessage().value("beforetxn").send();
+        consumer.acknowledgeAsync(consumer.receive(5, 
TimeUnit.SECONDS).getMessageId(), transaction);
+        producer.newMessage(transaction).value("message").send();
+        if (afterUnload) {
+            pulsarServiceList.get(0)
+                    .getTransactionMetadataStoreService()
+                    .removeTransactionMetadataStore(
+                            
TransactionCoordinatorID.get(transaction.getTxnID().getMostSigBits()));
+        }
+
+        final Throwable ex = syncGetException((
+                (PulsarClientImpl) 
pulsarClientOther).getTcClient().commitAsync(transaction.getTxnID())
+        );
+        if (actor.equals("client") || actor.equals("admin")) {
+            Assert.assertNull(ex);
+            Assert.assertEquals(consumer.receive(5, 
TimeUnit.SECONDS).getValue(), "message");
+        } else {
+            Assert.assertNotNull(ex);
+            Assert.assertTrue(ex instanceof 
TransactionCoordinatorClientException, ex.getClass().getName());
+            Assert.assertNull(consumer.receive(5, TimeUnit.SECONDS));
+            transaction.commit().get();
+            Assert.assertEquals(consumer.receive(5, 
TimeUnit.SECONDS).getValue(), "message");
+        }
+    }
+
+    @Test(dataProvider = "actors")
+    public void testAddPartitionToTxn(String actor, boolean afterUnload) 
throws Exception {
+        @Cleanup final PulsarClient pulsarClientOwner = PulsarClient.builder()
+                .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+                .authentication(AuthenticationFactory.token(generateToken(kp, 
"client")))
+                .enableTransaction(true)
+                .build();
+
+        @Cleanup final PulsarClient pulsarClientOther = PulsarClient.builder()
+                .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+                .authentication(AuthenticationFactory.token(generateToken(kp, 
actor)))
+                .enableTransaction(true)
+                .build();
+        Transaction transaction = pulsarClientOwner.newTransaction()
+                .withTransactionTimeout(60, TimeUnit.SECONDS).build().get();
+
+        if (afterUnload) {
+            pulsarServiceList.get(0)
+                    .getTransactionMetadataStoreService()
+                    .removeTransactionMetadataStore(
+                            
TransactionCoordinatorID.get(transaction.getTxnID().getMostSigBits()));
+        }
+
+        final Throwable ex = syncGetException(((PulsarClientImpl) 
pulsarClientOther)
+                
.getTcClient().addPublishPartitionToTxnAsync(transaction.getTxnID(), 
List.of(TOPIC)));
+
+        final TxnMeta txnMeta = 
pulsarServiceList.get(0).getTransactionMetadataStoreService()
+                .getTxnMeta(transaction.getTxnID()).get();
+        if (actor.equals("client") || actor.equals("admin")) {
+            Assert.assertNull(ex);
+            Assert.assertEquals(txnMeta.producedPartitions(), List.of(TOPIC));
+        } else {
+            Assert.assertNotNull(ex);
+            Assert.assertTrue(ex instanceof 
TransactionCoordinatorClientException);
+            Assert.assertTrue(txnMeta.producedPartitions().isEmpty());
+        }
+    }
+
+    @Test(dataProvider = "actors")
+    public void testAddSubscriptionToTxn(String actor, boolean afterUnload) 
throws Exception {
+        @Cleanup final PulsarClient pulsarClientOwner = PulsarClient.builder()
+                .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+                .authentication(AuthenticationFactory.token(generateToken(kp, 
"client")))
+                .enableTransaction(true)
+                .build();
+
+        @Cleanup final PulsarClient pulsarClientOther = PulsarClient.builder()
+                .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+                .authentication(AuthenticationFactory.token(generateToken(kp, 
actor)))
+                .enableTransaction(true)
+                .build();
+        Transaction transaction = pulsarClientOwner.newTransaction()
+                .withTransactionTimeout(60, TimeUnit.SECONDS).build().get();
+
+        if (afterUnload) {
+            pulsarServiceList.get(0)
+                    .getTransactionMetadataStoreService()
+                    .removeTransactionMetadataStore(
+                            
TransactionCoordinatorID.get(transaction.getTxnID().getMostSigBits()));
+        }
+
+
+        final Throwable ex = syncGetException(((PulsarClientImpl) 
pulsarClientOther)
+                
.getTcClient().addSubscriptionToTxnAsync(transaction.getTxnID(), TOPIC, "sub"));
+
+        final TxnMeta txnMeta = 
pulsarServiceList.get(0).getTransactionMetadataStoreService()
+                .getTxnMeta(transaction.getTxnID()).get();
+        if (actor.equals("client") || actor.equals("admin")) {
+            Assert.assertNull(ex);
+            Assert.assertEquals(txnMeta.ackedPartitions().size(), 1);
+        } else {
+            Assert.assertNotNull(ex);
+            Assert.assertTrue(ex instanceof 
TransactionCoordinatorClientException);
+            Assert.assertTrue(txnMeta.ackedPartitions().isEmpty());
+        }
+    }
+
+    @Test
+    public void testNoAuth() throws Exception {
+        try {
+            @Cleanup final PulsarClient pulsarClient = PulsarClient.builder()
+                    .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+                    .enableTransaction(true)
+                    .build();
+            Assert.fail("should have failed");
+        } catch (Exception t) {
+            Assert.assertTrue(Exceptions.areExceptionsPresentInChain(t,
+                    PulsarClientException.AuthenticationException.class));
+        }
+    }
+
+    private static Throwable syncGetException(CompletableFuture<?> future) {
+        try {
+            future.get();
+        } catch (InterruptedException e) {
+            return e;
+        } catch (ExecutionException e) {
+            return FutureUtil.unwrapCompletionException(e);
+        }
+        return null;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 444c68df055..fd49354342f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -35,7 +35,10 @@ import 
org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -71,7 +74,9 @@ public abstract class TransactionTestBase extends 
TestRetrySupport {
         if (admin != null) {
             admin.close();
         }
-        admin = 
spy(PulsarAdmin.builder().serviceHttpUrl(pulsarServiceList.get(0).getWebServiceAddress()).build());
+        admin = spy(
+                
createNewPulsarAdmin(PulsarAdmin.builder().serviceHttpUrl(pulsarServiceList.get(0).getWebServiceAddress()))
+        );
 
         if (pulsarClient != null) {
             pulsarClient.shutdown();
@@ -82,6 +87,15 @@ public abstract class TransactionTestBase extends 
TestRetrySupport {
     private void init() throws Exception {
         startBroker();
     }
+
+    protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) 
throws PulsarClientException {
+        return clientBuilder.build();
+    }
+
+    protected PulsarAdmin createNewPulsarAdmin(PulsarAdminBuilder builder) 
throws PulsarClientException {
+        return builder.build();
+    }
+
     protected void setUpBase(int numBroker,int numPartitionsOfTC, String 
topic, int numPartitions) throws Exception{
         setBrokerCount(numBroker);
         internalSetup();
@@ -108,11 +122,10 @@ public abstract class TransactionTestBase extends 
TestRetrySupport {
         if (pulsarClient != null) {
             pulsarClient.shutdown();
         }
-        pulsarClient = PulsarClient.builder()
+        pulsarClient = createNewPulsarClient(PulsarClient.builder()
                 
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
                 .statsInterval(0, TimeUnit.SECONDS)
-                .enableTransaction(true)
-                .build();
+                .enableTransaction(true));
     }
 
     protected void createTransactionCoordinatorAssign(int numPartitionsOfTC) 
throws MetadataStoreException {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 663c1c50ce7..696a0a7957c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -896,7 +896,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         long timeoutCountOriginal = transactionMetadataStores.stream()
                 .mapToLong(store -> 
store.getMetadataStoreStats().timeoutCount).sum();
         TxnID txnID = 
pulsarServiceList.get(0).getTransactionMetadataStoreService()
-                .newTransaction(new TransactionCoordinatorID(0), 1).get();
+                .newTransaction(new TransactionCoordinatorID(0), 1, 
null).get();
         Awaitility.await().until(() -> {
             try {
                
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txnID).get();
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionMetadata.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionMetadata.java
index 73fb43e45da..d22b956e8b0 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionMetadata.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionMetadata.java
@@ -41,4 +41,7 @@ public class TransactionMetadata {
 
     /** The ackedPartitions of this transaction. */
     public Map<String, Map<String, TransactionInPendingAckStats>> 
ackedPartitions;
+
+    /** The owner of this transaction. */
+    public String owner;
 }
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
index 7f8280f5226..ff5adb4d409 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
@@ -56,11 +56,12 @@ public interface TransactionMetadataStore {
      * Create a new transaction in the transaction metadata store.
      *
      * @param timeoutInMills the timeout duration of the transaction in mills
+*      @param owner the role which is the owner of the transaction
      * @return a future represents the result of creating a new transaction.
      *         it returns {@link TxnID} as the identifier for identifying the
      *         transaction.
      */
-    CompletableFuture<TxnID> newTransaction(long timeoutInMills);
+    CompletableFuture<TxnID> newTransaction(long timeoutInMills, String owner);
 
     /**
      * Add the produced partitions to transaction identified by <tt>txnid</tt>.
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
index 104ae0ff1df..44f225b9448 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
@@ -107,4 +107,11 @@ public interface TxnMeta {
      * @return transaction timeout at.
      */
     long getTimeoutAt();
+
+    /**
+     * Return the transaction's owner.
+     *
+     * @return transaction's owner.
+     */
+    String getOwner();
 }
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
index a62df1df8c6..0f3c5e42d7a 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
@@ -73,12 +74,17 @@ class InMemTransactionMetadataStore implements 
TransactionMetadataStore {
     }
 
     @Override
-    public CompletableFuture<TxnID> newTransaction(long timeoutInMills) {
+    public CompletableFuture<TxnID> newTransaction(long timeoutInMills, String 
owner) {
+        if (owner != null) {
+            if (StringUtils.isBlank(owner)) {
+                return CompletableFuture.failedFuture(new 
IllegalArgumentException("Owner can't be blank"));
+            }
+        }
         TxnID txnID = new TxnID(
             tcID.getId(),
             localID.getAndIncrement()
         );
-        TxnMetaImpl txn = new TxnMetaImpl(txnID, System.currentTimeMillis(), 
timeoutInMills);
+        TxnMetaImpl txn = new TxnMetaImpl(txnID, System.currentTimeMillis(), 
timeoutInMills, owner);
         transactions.put(txnID, txn);
         return CompletableFuture.completedFuture(txnID);
     }
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 903b04a1b5d..b6eaad2e3e3 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
@@ -149,8 +150,11 @@ public class MLTransactionMetadataStore
                                     positions.add(position);
                                     long openTimestamp = 
transactionMetadataEntry.getStartTime();
                                     long timeoutAt = 
transactionMetadataEntry.getTimeoutMs();
-                                    txnMetaMap.put(transactionId, 
MutablePair.of(new TxnMetaImpl(txnID,
-                                            openTimestamp, timeoutAt), 
positions));
+                                    final String owner = 
transactionMetadataEntry.hasOwner()
+                                            ? 
transactionMetadataEntry.getOwner() : null;
+                                    final TxnMetaImpl left = new 
TxnMetaImpl(txnID,
+                                            openTimestamp, timeoutAt, owner);
+                                    txnMetaMap.put(transactionId, 
MutablePair.of(left, positions));
                                     
recoverTracker.handleOpenStatusTransaction(txnSequenceId,
                                             timeoutAt + openTimestamp);
                                 }
@@ -224,50 +228,58 @@ public class MLTransactionMetadataStore
     }
 
     @Override
-    public CompletableFuture<TxnID> newTransaction(long timeOut) {
-        if (this.maxActiveTransactionsPerCoordinator != 0
-                && this.maxActiveTransactionsPerCoordinator <= 
txnMetaMap.size()) {
+    public CompletableFuture<TxnID> newTransaction(long timeOut, String owner) 
{
+        if (this.maxActiveTransactionsPerCoordinator == 0
+                || this.maxActiveTransactionsPerCoordinator > 
txnMetaMap.size()) {
+            CompletableFuture<TxnID> completableFuture = new 
CompletableFuture<>();
+            FutureUtil.safeRunAsync(() -> {
+                if (!checkIfReady()) {
+                    completableFuture.completeExceptionally(new 
CoordinatorException
+                            .TransactionMetadataStoreStateException(tcID, 
State.Ready, getState(), "new Transaction"));
+                    return;
+                }
+
+                long mostSigBits = tcID.getId();
+                long leastSigBits = sequenceIdGenerator.generateSequenceId();
+                TxnID txnID = new TxnID(mostSigBits, leastSigBits);
+                long currentTimeMillis = System.currentTimeMillis();
+                TransactionMetadataEntry transactionMetadataEntry = new 
TransactionMetadataEntry()
+                        .setTxnidMostBits(mostSigBits)
+                        .setTxnidLeastBits(leastSigBits)
+                        .setStartTime(currentTimeMillis)
+                        .setTimeoutMs(timeOut)
+                        
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
+                        .setLastModificationTime(currentTimeMillis)
+                        
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+                if (owner != null) {
+                    if (StringUtils.isBlank(owner)) {
+                        completableFuture.completeExceptionally(new 
IllegalArgumentException("Owner can't be blank"));
+                        return;
+                    }
+                    transactionMetadataEntry.setOwner(owner);
+                }
+                transactionLog.append(transactionMetadataEntry)
+                        .whenComplete((position, throwable) -> {
+                            if (throwable != null) {
+                                
completableFuture.completeExceptionally(throwable);
+                            } else {
+                                appendLogCount.increment();
+                                TxnMeta txn = new TxnMetaImpl(txnID, 
currentTimeMillis, timeOut, owner);
+                                List<Position> positions = new ArrayList<>();
+                                positions.add(position);
+                                Pair<TxnMeta, List<Position>> pair = 
MutablePair.of(txn, positions);
+                                txnMetaMap.put(leastSigBits, pair);
+                                
this.timeoutTracker.addTransaction(leastSigBits, timeOut);
+                                createdTransactionCount.increment();
+                                completableFuture.complete(txnID);
+                            }
+                        });
+            }, internalPinnedExecutor, completableFuture);
+            return completableFuture;
+        } else {
             return FutureUtil.failedFuture(new 
CoordinatorException.ReachMaxActiveTxnException("New txn op "
                     + "reach max active txn! tcId : " + 
getTransactionCoordinatorID().getId()));
         }
-        CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
-        FutureUtil.safeRunAsync(() -> {
-            if (!checkIfReady()) {
-                completableFuture.completeExceptionally(new 
CoordinatorException
-                        .TransactionMetadataStoreStateException(tcID, 
State.Ready, getState(), "new Transaction"));
-                return;
-            }
-
-            long mostSigBits = tcID.getId();
-            long leastSigBits = sequenceIdGenerator.generateSequenceId();
-            TxnID txnID = new TxnID(mostSigBits, leastSigBits);
-            long currentTimeMillis = System.currentTimeMillis();
-            TransactionMetadataEntry transactionMetadataEntry = new 
TransactionMetadataEntry()
-                    .setTxnidMostBits(mostSigBits)
-                    .setTxnidLeastBits(leastSigBits)
-                    .setStartTime(currentTimeMillis)
-                    .setTimeoutMs(timeOut)
-                    
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
-                    .setLastModificationTime(currentTimeMillis)
-                    
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
-            transactionLog.append(transactionMetadataEntry)
-                    .whenComplete((position, throwable) -> {
-                        if (throwable != null) {
-                            completableFuture.completeExceptionally(throwable);
-                        } else {
-                            appendLogCount.increment();
-                            TxnMeta txn = new TxnMetaImpl(txnID, 
currentTimeMillis, timeOut);
-                            List<Position> positions = new ArrayList<>();
-                            positions.add(position);
-                            Pair<TxnMeta, List<Position>> pair = 
MutablePair.of(txn, positions);
-                            txnMetaMap.put(leastSigBits, pair);
-                            this.timeoutTracker.addTransaction(leastSigBits, 
timeOut);
-                            createdTransactionCount.increment();
-                            completableFuture.complete(txnID);
-                        }
-                    });
-        }, internalPinnedExecutor, completableFuture);
-        return completableFuture;
     }
 
     @Override
@@ -300,9 +312,9 @@ public class MLTransactionMetadataStore
                                 promise.complete(null);
                             } catch (InvalidTxnStatusException e) {
                                 
transactionLog.deletePosition(Collections.singletonList(position));
-                                log.error("TxnID : " + 
txnMetaListPair.getLeft().id().toString()
-                                        + " add produced partition error with 
TxnStatus : "
-                                        + 
txnMetaListPair.getLeft().status().name(), e);
+                                log.error("TxnID {} add produced partition 
error"
+                                                + " with TxnStatus: {}", 
txnMetaListPair.getLeft().id().toString()
+                                        , 
txnMetaListPair.getLeft().status().name(), e);
                                 promise.completeExceptionally(e);
                             }
                         });
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
index 8e0c793b588..ed38305abf0 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
@@ -42,11 +42,13 @@ class TxnMetaImpl implements TxnMeta {
     private volatile TxnStatus txnStatus = TxnStatus.OPEN;
     private final long openTimestamp;
     private final long timeoutAt;
+    private final String owner;
 
-    TxnMetaImpl(TxnID txnID, long openTimestamp, long timeoutAt) {
+    TxnMetaImpl(TxnID txnID, long openTimestamp, long timeoutAt, String owner) 
{
         this.txnID = txnID;
         this.openTimestamp = openTimestamp;
         this.timeoutAt = timeoutAt;
+        this.owner = owner;
     }
 
     @Override
@@ -161,4 +163,8 @@ class TxnMetaImpl implements TxnMeta {
         return this.timeoutAt;
     }
 
+    @Override
+    public String getOwner() {
+        return this.owner;
+    }
 }
diff --git 
a/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto 
b/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
index 6d506d48176..134d1cf3b51 100644
--- 
a/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
+++ 
b/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
@@ -51,6 +51,7 @@ message TransactionMetadataEntry {
   optional uint64 start_time      = 9;
   optional uint64 last_modification_time = 10;
   optional uint64 max_local_txn_id = 11;
+  optional string owner          = 12;
 }
 
 message BatchedTransactionMetadataEntry{
diff --git 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index 2f806f0a474..3b831ad38ba 100644
--- 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -100,7 +100,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
                 break;
             }
             if (transactionMetadataStore.checkIfReady()) {
-                TxnID txnID = 
transactionMetadataStore.newTransaction(5000).get();
+                TxnID txnID = transactionMetadataStore.newTransaction(5000, 
null).get();
                 
assertEquals(transactionMetadataStore.getTxnStatus(txnID).get(), 
TxnStatus.OPEN);
 
                 List<String> partitions = new ArrayList<>();
@@ -181,7 +181,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         transactionMetadataStore.init(new 
TransactionRecoverTrackerImpl()).get();
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
-        TxnID txnID = transactionMetadataStore.newTransaction(20000).get();
+        TxnID txnID = transactionMetadataStore.newTransaction(20000, 
null).get();
         transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTING, 
TxnStatus.OPEN, false).get();
         if (isUseManagedLedgerProperties) {
             transactionMetadataStore.updateTxnStatus(txnID, 
TxnStatus.COMMITTED, TxnStatus.COMMITTING, false).get();
@@ -209,7 +209,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         transactionMetadataStore.init(new 
TransactionRecoverTrackerImpl()).get();
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
-        txnID = transactionMetadataStore.newTransaction(100000).get();
+        txnID = transactionMetadataStore.newTransaction(100000, null).get();
         assertEquals(txnID.getLeastSigBits(), 1);
     }
 
@@ -244,10 +244,8 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
                 break;
             }
             if (transactionMetadataStore.checkIfReady()) {
-                CompletableFuture<TxnID> txIDFuture1 = 
transactionMetadataStore.newTransaction(1000);
-                CompletableFuture<TxnID> txIDFuture2 = 
transactionMetadataStore.newTransaction(1000);
-                TxnID txnID1 = txIDFuture1.get();
-                TxnID txnID2 = txIDFuture2.get();
+                TxnID txnID1 = transactionMetadataStore.newTransaction(1000, 
"user1").get();
+                TxnID txnID2 = transactionMetadataStore.newTransaction(1000, 
"user2").get();
                 
assertEquals(transactionMetadataStore.getTxnStatus(txnID1).get(), 
TxnStatus.OPEN);
                 
assertEquals(transactionMetadataStore.getTxnStatus(txnID2).get(), 
TxnStatus.OPEN);
 
@@ -306,6 +304,9 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
                         assertEquals(txnMeta2.ackedPartitions().size(), 
subscriptions.size());
                         
Assert.assertTrue(subscriptions.containsAll(txnMeta1.ackedPartitions()));
                         
Assert.assertTrue(subscriptions.containsAll(txnMeta2.ackedPartitions()));
+
+                        assertEquals(txnMeta1.getOwner(), "user1");
+                        assertEquals(txnMeta2.getOwner(), "user2");
                         assertEquals(txnMeta1.status(), TxnStatus.COMMITTING);
                         assertEquals(txnMeta2.status(), TxnStatus.COMMITTING);
                         transactionMetadataStoreTest
@@ -325,7 +326,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
                         } catch (ExecutionException e) {
                             Assert.assertTrue(e.getCause() instanceof 
TransactionNotFoundException);
                         }
-                        TxnID txnID = 
transactionMetadataStoreTest.newTransaction(1000).get();
+                        TxnID txnID = 
transactionMetadataStoreTest.newTransaction(1000, null).get();
                         assertEquals(txnID.getLeastSigBits(), 2L);
                         break;
                     } else {
@@ -370,10 +371,8 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
                 break;
             }
             if (transactionMetadataStore.checkIfReady()) {
-                CompletableFuture<TxnID> txIDFuture1 = 
transactionMetadataStore.newTransaction(1000);
-                CompletableFuture<TxnID> txIDFuture2 = 
transactionMetadataStore.newTransaction(1000);
-                TxnID txnID1 = txIDFuture1.get();
-                TxnID txnID2 = txIDFuture2.get();
+                TxnID txnID1 = transactionMetadataStore.newTransaction(1000, 
null).get();
+                TxnID txnID2 = transactionMetadataStore.newTransaction(1000, 
null).get();
                 
assertEquals(transactionMetadataStore.getTxnStatus(txnID1).get(), 
TxnStatus.OPEN);
                 
assertEquals(transactionMetadataStore.getTxnStatus(txnID2).get(), 
TxnStatus.OPEN);
 
@@ -447,9 +446,9 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
 
         // txnID1 have not deleted from cursor, we can recover from 
transaction log
-        TxnID txnID1 = transactionMetadataStore.newTransaction(1000).get();
+        TxnID txnID1 = transactionMetadataStore.newTransaction(1000, 
null).get();
         // txnID2 have deleted from cursor.
-        TxnID txnID2 = transactionMetadataStore.newTransaction(1000).get();
+        TxnID txnID2 = transactionMetadataStore.newTransaction(1000, 
null).get();
 
         transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTING, 
TxnStatus.OPEN, false).get();
         transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTED, 
TxnStatus.ABORTING, false).get();
@@ -485,7 +484,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         transactionMetadataStore.init(new 
TransactionRecoverTrackerImpl()).get();
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
-        transactionMetadataStore.newTransaction(5000).get();
+        transactionMetadataStore.newTransaction(5000, null).get();
         Field field = 
MLTransactionLogImpl.class.getDeclaredField("managedLedger");
         field.setAccessible(true);
         ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
field.get(mlTransactionLog);
@@ -494,12 +493,12 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         AtomicReferenceFieldUpdater state = (AtomicReferenceFieldUpdater) 
field.get(managedLedger);
         state.set(managedLedger, WriteFailed);
         try {
-            transactionMetadataStore.newTransaction(5000).get();
+            transactionMetadataStore.newTransaction(5000, null).get();
             fail();
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof 
ManagedLedgerException.ManagedLedgerAlreadyClosedException);
         }
-        transactionMetadataStore.newTransaction(5000).get();
+        transactionMetadataStore.newTransaction(5000, null).get();
 
     }
 
diff --git 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
index c7f9cd21ac8..04b2d2fe650 100644
--- 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
+++ 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
@@ -92,14 +92,14 @@ public class TransactionMetadataStoreProviderTest {
 
     @Test
     public void testGetTxnStatusSuccess() throws Exception {
-        TxnID txnID = this.store.newTransaction(0L).get();
+        TxnID txnID = this.store.newTransaction(0L, null).get();
         TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
         assertEquals(txnStatus, TxnStatus.OPEN);
     }
 
     @Test
     public void testUpdateTxnStatusSuccess() throws Exception {
-        TxnID txnID = this.store.newTransaction(0L).get();
+        TxnID txnID = this.store.newTransaction(0L, null).get();
         TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
         assertEquals(txnStatus, TxnStatus.OPEN);
 
@@ -113,7 +113,7 @@ public class TransactionMetadataStoreProviderTest {
 
     @Test
     public void testUpdateTxnStatusNotExpectedStatus() throws Exception {
-        TxnID txnID = this.store.newTransaction(0L).get();
+        TxnID txnID = this.store.newTransaction(0L, null).get();
         TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
         assertEquals(txnStatus, TxnStatus.OPEN);
 
@@ -132,7 +132,7 @@ public class TransactionMetadataStoreProviderTest {
 
     @Test
     public void testUpdateTxnStatusCannotTransition() throws Exception {
-        TxnID txnID = this.store.newTransaction(0L).get();
+        TxnID txnID = this.store.newTransaction(0L, null).get();
         TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
         assertEquals(txnStatus, TxnStatus.OPEN);
 
@@ -151,7 +151,7 @@ public class TransactionMetadataStoreProviderTest {
 
     @Test
     public void testAddProducedPartition() throws Exception {
-        TxnID txnID = this.store.newTransaction(0L).get();
+        TxnID txnID = this.store.newTransaction(0L, null).get();
         TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
         assertEquals(txnStatus, TxnStatus.OPEN);
 
@@ -205,7 +205,7 @@ public class TransactionMetadataStoreProviderTest {
 
     @Test
     public void testAddAckedPartition() throws Exception {
-        TxnID txnID = this.store.newTransaction(0L).get();
+        TxnID txnID = this.store.newTransaction(0L, null).get();
         TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
         assertEquals(txnStatus, TxnStatus.OPEN);
 

Reply via email to