This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 459a7a57c1b [improve][txn] Allow superusers to abort transactions
(#19467)
459a7a57c1b is described below
commit 459a7a57c1b67cfe161cdc40c007a1c2e403b7cd
Author: Nicolò Boschi <[email protected]>
AuthorDate: Thu Feb 9 02:08:19 2023 +0100
[improve][txn] Allow superusers to abort transactions (#19467)
### Motivation
Super users must be always allowed to abort a transaction even if they're
not the original owner.
### Modifications
* Check that only owner or superusers are allowed to perform txn operations
(end, add partition and add subscription)
---
.../broker/TransactionMetadataStoreService.java | 20 +-
.../pulsar/broker/admin/impl/TransactionsBase.java | 1 +
.../apache/pulsar/broker/service/ServerCnx.java | 140 +++++++--
.../pulsar/broker/service/ServerCnxTest.java | 21 ++
.../TransactionMetadataStoreServiceTest.java | 36 ++-
.../broker/stats/TransactionMetricsTest.java | 8 +-
...thenticatedTransactionProducerConsumerTest.java | 330 +++++++++++++++++++++
.../broker/transaction/TransactionTestBase.java | 21 +-
.../client/impl/TransactionEndToEndTest.java | 2 +-
.../common/policies/data/TransactionMetadata.java | 3 +
.../coordinator/TransactionMetadataStore.java | 3 +-
.../pulsar/transaction/coordinator/TxnMeta.java | 7 +
.../impl/InMemTransactionMetadataStore.java | 10 +-
.../impl/MLTransactionMetadataStore.java | 104 ++++---
.../transaction/coordinator/impl/TxnMetaImpl.java | 8 +-
.../src/main/proto/PulsarTransactionMetadata.proto | 1 +
.../MLTransactionMetadataStoreTest.java | 33 +--
.../TransactionMetadataStoreProviderTest.java | 12 +-
18 files changed, 630 insertions(+), 130 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index d0cf22a8653..3e3b044ec51 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -256,12 +256,13 @@ public class TransactionMetadataStoreService {
}
}
- public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID
tcId, long timeoutInMills) {
+ public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID
tcId, long timeoutInMills,
+ String owner) {
TransactionMetadataStore store = stores.get(tcId);
if (store == null) {
return FutureUtil.failedFuture(new
CoordinatorNotFoundException(tcId));
}
- return store.newTransaction(timeoutInMills);
+ return store.newTransaction(timeoutInMills, owner);
}
public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnId,
List<String> partitions) {
@@ -483,6 +484,21 @@ public class TransactionMetadataStoreService {
return Collections.unmodifiableMap(stores);
}
+ public CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID, String
checkOwner) {
+ return getTxnMeta(txnID)
+ .thenCompose(meta -> {
+ // owner was null in the old versions or no auth enabled
+ if (meta.getOwner() == null) {
+ return CompletableFuture.completedFuture(true);
+ }
+ if (meta.getOwner().equals(checkOwner)) {
+ return CompletableFuture.completedFuture(true);
+ }
+ return CompletableFuture.completedFuture(false);
+ });
+ }
+
+
public void close () {
this.internalPinnedExecutor.shutdown();
stores.forEach((tcId, metadataStore) -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index f537f0ecdb9..d596cbdd39d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -215,6 +215,7 @@ public abstract class TransactionsBase extends
AdminResource {
transactionMetadata.status = txnMeta.status().name();
transactionMetadata.openTimestamp = txnMeta.getOpenTimestamp();
transactionMetadata.timeoutAt = txnMeta.getTimeoutAt();
+ transactionMetadata.owner = txnMeta.getOwner();
List<CompletableFuture<TransactionInPendingAckStats>>
ackedPartitionsFutures = new ArrayList<>();
Map<String, Map<String,
CompletableFuture<TransactionInPendingAckStats>>> ackFutures = new HashMap<>();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index d398dbba9b9..9cbdbcf1ae4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2399,7 +2399,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
- transactionMetadataStoreService.newTransaction(tcId,
command.getTxnTtlSeconds())
+ final String owner = getPrincipal();
+ transactionMetadataStoreService.newTransaction(tcId,
command.getTxnTtlSeconds(), owner)
.whenComplete(((txnID, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
@@ -2443,9 +2444,15 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
-
service.pulsar().getTransactionMetadataStoreService().addProducedPartitionToTxn(txnID,
- command.getPartitionsList())
- .whenComplete(((v, ex) -> {
+ verifyTxnOwnership(txnID)
+ .thenCompose(isOwner -> {
+ if (!isOwner) {
+ return failedFutureTxnNotOwned(txnID);
+ }
+ return transactionMetadataStoreService
+ .addProducedPartitionToTxn(txnID,
command.getPartitionsList());
+ })
+ .whenComplete((v, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
log.debug("Send response success for add published
partition to txn request {}", requestId);
@@ -2462,7 +2469,25 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
ex.getMessage()));
transactionMetadataStoreService.handleOpFail(ex, tcId);
}
- }));
+ });
+ }
+
+ private CompletableFuture<Void> failedFutureTxnNotOwned(TxnID txnID) {
+ String msg = String.format(
+ "Client (%s) is neither the owner of the transaction %s nor a
super user",
+ getPrincipal(), txnID
+ );
+ log.warn("[{}] {}", remoteAddress, msg);
+ return CompletableFuture.failedFuture(new
CoordinatorException.TransactionNotFoundException(msg));
+ }
+
+ private CompletableFuture<Void> failedFutureTxnTcNotAllowed(TxnID txnID) {
+ String msg = String.format(
+ "TC client (%s) is not a super user, and is not allowed to
operate on transaction %s",
+ getPrincipal(), txnID
+ );
+ log.warn("[{}] {}", remoteAddress, msg);
+ return CompletableFuture.failedFuture(new
CoordinatorException.TransactionNotFoundException(msg));
}
@Override
@@ -2480,12 +2505,16 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
- transactionMetadataStoreService
- .endTransaction(txnID, txnAction, false)
+ verifyTxnOwnership(txnID)
+ .thenCompose(isOwner -> {
+ if (!isOwner) {
+ return failedFutureTxnNotOwned(txnID);
+ }
+ return
transactionMetadataStoreService.endTransaction(txnID, txnAction, false);
+ })
.whenComplete((v, ex) -> {
if (ex == null) {
- commandSender.sendEndTxnResponse(requestId,
- txnID, txnAction);
+ commandSender.sendEndTxnResponse(requestId, txnID,
txnAction);
} else {
ex = handleTxnException(ex,
BaseCommand.Type.END_TXN.name(), requestId);
commandSender.sendEndTxnErrorResponse(requestId, txnID,
@@ -2496,6 +2525,34 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
});
}
+ private CompletableFuture<Boolean>
verifyTxnOwnershipForTCToBrokerCommands() {
+ if (service.isAuthenticationEnabled() &&
service.isAuthorizationEnabled()) {
+ return getBrokerService()
+ .getAuthorizationService()
+ .isSuperUser(getPrincipal(), getAuthenticationData());
+ } else {
+ return CompletableFuture.completedFuture(true);
+ }
+ }
+
+ private CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID) {
+ final String checkOwner = getPrincipal();
+ return service.pulsar().getTransactionMetadataStoreService()
+ .verifyTxnOwnership(txnID, checkOwner)
+ .thenCompose(isOwner -> {
+ if (isOwner) {
+ return CompletableFuture.completedFuture(true);
+ }
+ if (service.isAuthenticationEnabled() &&
service.isAuthorizationEnabled()) {
+ return getBrokerService()
+ .getAuthorizationService()
+ .isSuperUser(checkOwner,
getAuthenticationData());
+ } else {
+ return CompletableFuture.completedFuture(false);
+ }
+ });
+ }
+
@Override
protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
checkArgument(state == State.Connected);
@@ -2512,9 +2569,17 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
CompletableFuture<Optional<Topic>> topicFuture =
service.getTopicIfExists(TopicName.get(topic).toString());
topicFuture.thenAccept(optionalTopic -> {
if (optionalTopic.isPresent()) {
- optionalTopic.get().endTxn(txnID, txnAction, lowWaterMark)
+ // we only accept super user becase this endpoint is reserved
for tc to broker communication
+ verifyTxnOwnershipForTCToBrokerCommands()
+ .thenCompose(isOwner -> {
+ if (!isOwner) {
+ return failedFutureTxnTcNotAllowed(txnID);
+ }
+ return optionalTopic.get().endTxn(txnID,
txnAction, lowWaterMark);
+ })
.whenComplete((ignored, throwable) -> {
if (throwable != null) {
+ throwable =
FutureUtil.unwrapCompletionException(throwable);
log.error("handleEndTxnOnPartition fail!,
topic {}, txnId: [{}], "
+ "txnAction: [{}]", topic, txnID,
TxnAction.valueOf(txnAction), throwable);
writeAndFlush(Commands.newEndTxnOnPartitionResponse(
@@ -2526,7 +2591,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
txnID.getLeastSigBits(),
txnID.getMostSigBits()));
});
-
} else {
getBrokerService().getManagedLedgerFactory()
.asyncExists(TopicName.get(topic).getPersistenceNamingEncoding())
@@ -2596,23 +2660,28 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits,
txnidMostBits));
return;
}
-
- CompletableFuture<Void> completableFuture =
- subscription.endTxn(txnidMostBits, txnidLeastBits,
txnAction, lowWaterMark);
- completableFuture.whenComplete((ignored, e) -> {
- if (e != null) {
- log.error("handleEndTxnOnSubscription fail ! topic:
{}, subscription: {}"
- + "txnId: [{}], txnAction: [{}]",
topic, subName,
- txnID, TxnAction.valueOf(txnAction),
e.getCause());
- writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
- requestId, txnidLeastBits, txnidMostBits,
- BrokerServiceException.getClientErrorCode(e),
- "Handle end txn on subscription failed: " +
e.getMessage()));
- return;
- }
- writeAndFlush(
-
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits,
txnidMostBits));
- });
+ // we only accept super user becase this endpoint is reserved
for tc to broker communication
+ verifyTxnOwnershipForTCToBrokerCommands()
+ .thenCompose(isOwner -> {
+ if (!isOwner) {
+ return failedFutureTxnTcNotAllowed(txnID);
+ }
+ return subscription.endTxn(txnidMostBits,
txnidLeastBits, txnAction, lowWaterMark);
+ }).whenComplete((ignored, e) -> {
+ if (e != null) {
+ e = FutureUtil.unwrapCompletionException(e);
+ log.error("handleEndTxnOnSubscription fail !
topic: {}, subscription: {}"
+ + "txnId: [{}], txnAction:
[{}]", topic, subName,
+ txnID, TxnAction.valueOf(txnAction),
e.getCause());
+
writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+ requestId, txnidLeastBits,
txnidMostBits,
+
BrokerServiceException.getClientErrorCode(e),
+ "Handle end txn on subscription
failed: " + e.getMessage()));
+ return;
+ }
+ writeAndFlush(
+
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits,
txnidMostBits));
+ });
} else {
getBrokerService().getManagedLedgerFactory()
.asyncExists(TopicName.get(topic).getPersistenceNamingEncoding())
@@ -2679,6 +2748,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
checkArgument(state == State.Connected);
final TxnID txnID = new TxnID(command.getTxnidMostBits(),
command.getTxnidLeastBits());
final long requestId = command.getRequestId();
+ final List<org.apache.pulsar.common.api.proto.Subscription>
subscriptionsList = command.getSubscriptionsList();
if (log.isDebugEnabled()) {
log.debug("Receive add published partition to txn request {} from
{} with txnId {}",
requestId, remoteAddress, txnID);
@@ -2693,9 +2763,15 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
- transactionMetadataStoreService.addAckedPartitionToTxn(txnID,
-
MLTransactionMetadataStore.subscriptionToTxnSubscription(command.getSubscriptionsList()))
- .whenComplete(((v, ex) -> {
+ verifyTxnOwnership(txnID)
+ .thenCompose(isOwner -> {
+ if (!isOwner) {
+ return failedFutureTxnNotOwned(txnID);
+ }
+ return
transactionMetadataStoreService.addAckedPartitionToTxn(txnID,
+
MLTransactionMetadataStore.subscriptionToTxnSubscription(subscriptionsList));
+ })
+ .whenComplete((v, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
log.debug("Send response success for add published
partition to txn request {}",
@@ -2711,7 +2787,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
ex.getMessage()));
transactionMetadataStoreService.handleOpFail(ex, tcId);
}
- }));
+ });
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index b8032f9be3a..b7e2839832c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -134,6 +134,7 @@ import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.awaitility.Awaitility;
import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;
@@ -3033,6 +3034,8 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void sendAddPartitionToTxnResponse() throws Exception {
final TransactionMetadataStoreService txnStore =
mock(TransactionMetadataStoreService.class);
+
when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+ when(txnStore.verifyTxnOwnership(any(),
any())).thenReturn(CompletableFuture.completedFuture(true));
when(txnStore.addProducedPartitionToTxn(any(TxnID.class), any()))
.thenReturn(CompletableFuture.completedFuture(null));
when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
@@ -3056,6 +3059,8 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void sendAddPartitionToTxnResponseFailed() throws Exception {
final TransactionMetadataStoreService txnStore =
mock(TransactionMetadataStoreService.class);
+
when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+ when(txnStore.verifyTxnOwnership(any(),
any())).thenReturn(CompletableFuture.completedFuture(true));
when(txnStore.addProducedPartitionToTxn(any(TxnID.class), any()))
.thenReturn(CompletableFuture.failedFuture(new
RuntimeException("server error")));
when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
@@ -3079,6 +3084,8 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void sendAddSubscriptionToTxnResponse() throws Exception {
final TransactionMetadataStoreService txnStore =
mock(TransactionMetadataStoreService.class);
+
when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+ when(txnStore.verifyTxnOwnership(any(),
any())).thenReturn(CompletableFuture.completedFuture(true));
when(txnStore.addAckedPartitionToTxn(any(TxnID.class), any()))
.thenReturn(CompletableFuture.completedFuture(null));
when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
@@ -3105,6 +3112,8 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void sendAddSubscriptionToTxnResponseFailed() throws Exception {
final TransactionMetadataStoreService txnStore =
mock(TransactionMetadataStoreService.class);
+
when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+ when(txnStore.verifyTxnOwnership(any(),
any())).thenReturn(CompletableFuture.completedFuture(true));
when(txnStore.addAckedPartitionToTxn(any(TxnID.class), any()))
.thenReturn(CompletableFuture.failedFuture(new
RuntimeException("server error")));
when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
@@ -3132,6 +3141,8 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void sendEndTxnResponse() throws Exception {
final TransactionMetadataStoreService txnStore =
mock(TransactionMetadataStoreService.class);
+
when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+ when(txnStore.verifyTxnOwnership(any(),
any())).thenReturn(CompletableFuture.completedFuture(true));
when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(null));
when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
@@ -3155,6 +3166,8 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void sendEndTxnResponseFailed() throws Exception {
final TransactionMetadataStoreService txnStore =
mock(TransactionMetadataStoreService.class);
+
when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+ when(txnStore.verifyTxnOwnership(any(),
any())).thenReturn(CompletableFuture.completedFuture(true));
when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
.thenReturn(CompletableFuture.failedFuture(new
RuntimeException("server error")));
when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
@@ -3178,6 +3191,8 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void sendEndTxnOnPartitionResponse() throws Exception {
final TransactionMetadataStoreService txnStore =
mock(TransactionMetadataStoreService.class);
+
when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+ when(txnStore.verifyTxnOwnership(any(),
any())).thenReturn(CompletableFuture.completedFuture(true));
when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(null));
when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
@@ -3206,6 +3221,8 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void sendEndTxnOnPartitionResponseFailed() throws Exception {
final TransactionMetadataStoreService txnStore =
mock(TransactionMetadataStoreService.class);
+
when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+ when(txnStore.verifyTxnOwnership(any(),
any())).thenReturn(CompletableFuture.completedFuture(true));
when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(null));
when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
@@ -3235,6 +3252,8 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void sendEndTxnOnSubscription() throws Exception {
final TransactionMetadataStoreService txnStore =
mock(TransactionMetadataStoreService.class);
+
when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+ when(txnStore.verifyTxnOwnership(any(),
any())).thenReturn(CompletableFuture.completedFuture(true));
when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(null));
when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
@@ -3269,6 +3288,8 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void sendEndTxnOnSubscriptionFailed() throws Exception {
final TransactionMetadataStoreService txnStore =
mock(TransactionMetadataStoreService.class);
+
when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+ when(txnStore.verifyTxnOwnership(any(),
any())).thenReturn(CompletableFuture.completedFuture(true));
when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(null));
when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
index 06fdc13b98c..5cd3ed9f904 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
@@ -104,9 +104,9 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
.getStores().get(TransactionCoordinatorID.get(1)));
checkTransactionMetadataStoreReady((MLTransactionMetadataStore)
pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(2)));
- TxnID txnID0 =
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0),
5).get();
- TxnID txnID1 =
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(1),
5).get();
- TxnID txnID2 =
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(2),
5).get();
+ TxnID txnID0 =
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0),
5, null).get();
+ TxnID txnID1 =
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(1),
5, null).get();
+ TxnID txnID2 =
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(2),
5, null).get();
Assert.assertEquals(txnID0.getMostSigBits(), 0);
Assert.assertEquals(txnID1.getMostSigBits(), 1);
Assert.assertEquals(txnID2.getMostSigBits(), 2);
@@ -128,7 +128,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
.getStores().get(TransactionCoordinatorID.get(0));
checkTransactionMetadataStoreReady(transactionMetadataStore);
- TxnID txnID =
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0),
5000).get();
+ TxnID txnID =
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0),
5000, null).get();
List<String> partitions = new ArrayList<>();
partitions.add("ptn-0");
partitions.add("ptn-1");
@@ -151,7 +151,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
(MLTransactionMetadataStore)
pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0));
checkTransactionMetadataStoreReady(transactionMetadataStore);
- TxnID txnID =
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0),
5000).get();
+ TxnID txnID =
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0),
5000, null).get();
List<TransactionSubscription> partitions = new ArrayList<>();
partitions.add(TransactionSubscription.builder().topic("ptn-1").subscription("sub-1").build());
partitions.add(TransactionSubscription.builder().topic("ptn-2").subscription("sub-1").build());
@@ -180,7 +180,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
int i = -1;
while (++i < 1000) {
try {
- transactionMetadataStore.newTransaction(2000).get();
+ newTransactionWithTimeoutOf(2000);
} catch (Exception e) {
//no operation
}
@@ -192,6 +192,14 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
.until(() -> txnMap.size() == 0);
}
+ private TxnID newTransactionWithTimeoutOf(long timeout)
+ throws InterruptedException, ExecutionException {
+ MLTransactionMetadataStore transactionMetadataStore =
+ (MLTransactionMetadataStore)
pulsar.getTransactionMetadataStoreService()
+ .getStores().get(TransactionCoordinatorID.get(0));
+ return transactionMetadataStore.newTransaction(timeout, null).get();
+ }
+
@Test
public void testTimeoutTrackerExpired() throws Exception {
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0));
@@ -206,7 +214,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
(ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>)
field.get(transactionMetadataStore);
- transactionMetadataStore.newTransaction(2000).get();
+ newTransactionWithTimeoutOf(2000);
assertEquals(txnMap.size(), 1);
@@ -214,7 +222,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
Assert.assertEquals(txnMetaListPair.getLeft().status(),
TxnStatus.OPEN));
Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() ->
txnMap.size() == 0);
- transactionMetadataStore.newTransaction(2000).get();
+ newTransactionWithTimeoutOf(2000);
assertEquals(txnMap.size(), 1);
txnMap.forEach((txnID, txnMetaListPair) ->
@@ -241,7 +249,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
int i = -1;
while (++i < 100) {
try {
- transactionMetadataStore.newTransaction(1000);
+ newTransactionWithTimeoutOf(1000);
} catch (Exception e) {
//no operation
}
@@ -252,7 +260,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
int i = -1;
while (++i < 100) {
try {
- transactionMetadataStore.newTransaction(2000);
+ newTransactionWithTimeoutOf(2000);
} catch (Exception e) {
//no operation
}
@@ -263,7 +271,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
int i = -1;
while (++i < 100) {
try {
- transactionMetadataStore.newTransaction(3000);
+ newTransactionWithTimeoutOf(3000);
} catch (Exception e) {
//no operation
}
@@ -274,7 +282,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
int i = -1;
while (++i < 100) {
try {
- transactionMetadataStore.newTransaction(4000);
+ newTransactionWithTimeoutOf(4000);
} catch (Exception e) {
//no operation
}
@@ -304,7 +312,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
.getStores().get(TransactionCoordinatorID.get(0));
checkTransactionMetadataStoreReady(transactionMetadataStore);
- transactionMetadataStore.newTransaction(timeout);
+ newTransactionWithTimeoutOf(2000);
pulsar.getTransactionMetadataStoreService()
.removeTransactionMetadataStore(TransactionCoordinatorID.get(0));
@@ -345,7 +353,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
checkTransactionMetadataStoreReady(transactionMetadataStore);
- TxnID txnID = transactionMetadataStore.newTransaction(timeOut -
2000).get();
+ TxnID txnID = newTransactionWithTimeoutOf(timeOut - 2000);
TxnMeta txnMeta = transactionMetadataStore.getTxnMeta(txnID).get();
txnMeta.updateTxnStatus(txnStatus, TxnStatus.OPEN);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index 37d1f4b0860..30aedc02253 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -111,11 +111,11 @@ public class TransactionMetricsTest extends
BrokerTestBase {
Awaitility.await().until(() ->
pulsar.getTransactionMetadataStoreService().getStores().size()
== 2);
pulsar.getTransactionMetadataStoreService().getStores()
-
.get(transactionCoordinatorIDOne).newTransaction(timeout).get();
+ .get(transactionCoordinatorIDOne).newTransaction(timeout,
null).get();
pulsar.getTransactionMetadataStoreService().getStores()
-
.get(transactionCoordinatorIDTwo).newTransaction(timeout).get();
+ .get(transactionCoordinatorIDTwo).newTransaction(timeout,
null).get();
pulsar.getTransactionMetadataStoreService().getStores()
-
.get(transactionCoordinatorIDTwo).newTransaction(timeout).get();
+ .get(transactionCoordinatorIDTwo).newTransaction(timeout,
null).get();
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false,
statsOut);
String metricsStr = statsOut.toString();
@@ -202,7 +202,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
metric.forEach(item -> assertEquals(item.value, txnCount / 2));
TxnID txnID = pulsar.getTransactionMetadataStoreService().getStores()
- .get(transactionCoordinatorIDOne).newTransaction(1000).get();
+ .get(transactionCoordinatorIDOne).newTransaction(1000,
null).get();
Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).until(() -> {
try {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.java
new file mode 100644
index 00000000000..aa9102189d4
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.time.Duration;
+import java.util.Base64;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.utils.Exceptions;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test for consuming transaction messages.
+ */
+@Slf4j
+@Test(groups = "broker")
+public class AuthenticatedTransactionProducerConsumerTest extends
TransactionTestBase {
+
+ private static final String TOPIC = NAMESPACE1 + "/txn-auth";
+
+ private final String ADMIN_TOKEN;
+ private final String TOKEN_PUBLIC_KEY;
+ private final KeyPair kp;
+
+ AuthenticatedTransactionProducerConsumerTest() throws
NoSuchAlgorithmException {
+ KeyPairGenerator kpg = KeyPairGenerator.getInstance("RSA");
+ kp = kpg.generateKeyPair();
+
+ byte[] encodedPublicKey = kp.getPublic().getEncoded();
+ TOKEN_PUBLIC_KEY = "data:;base64," +
Base64.getEncoder().encodeToString(encodedPublicKey);
+ ADMIN_TOKEN = generateToken(kp, "admin");
+ }
+
+
+ private String generateToken(KeyPair kp, String subject) {
+ PrivateKey pkey = kp.getPrivate();
+ long expMillis = System.currentTimeMillis() +
Duration.ofHours(1).toMillis();
+ Date exp = new Date(expMillis);
+
+ return Jwts.builder()
+ .setSubject(subject)
+ .setExpiration(exp)
+ .signWith(pkey, SignatureAlgorithm.forSigningKey(pkey))
+ .compact();
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setup() throws Exception {
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthorizationEnabled(true);
+
+ Set<String> superUserRoles = new HashSet<>();
+ superUserRoles.add("admin");
+ conf.setSuperUserRoles(superUserRoles);
+
+ Set<String> providers = new HashSet<>();
+ providers.add(AuthenticationProviderToken.class.getName());
+ conf.setAuthenticationProviders(providers);
+
+ // Set provider domain name
+ Properties properties = new Properties();
+ properties.setProperty("tokenPublicKey", TOKEN_PUBLIC_KEY);
+
+ conf.setProperties(properties);
+
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+ conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
+ setBrokerCount(1);
+ internalSetup();
+ setUpBase(1, 1, TOPIC, 1);
+
+ grantTxnLookupToRole("client");
+ admin.namespaces().grantPermissionOnNamespace(NAMESPACE1, "client",
+ EnumSet.allOf(AuthAction.class));
+ grantTxnLookupToRole("client2");
+ }
+
+ @SneakyThrows
+ private void grantTxnLookupToRole(String role) {
+ admin.namespaces().grantPermissionOnNamespace(
+ NamespaceName.SYSTEM_NAMESPACE.toString(),
+ role,
+ Sets.newHashSet(AuthAction.consume));
+ }
+
+ @Override
+ protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder)
throws PulsarClientException {
+ return clientBuilder
+ .enableTransaction(true)
+ .authentication(AuthenticationFactory.token(ADMIN_TOKEN))
+ .build();
+ }
+
+ @Override
+ protected PulsarAdmin createNewPulsarAdmin(PulsarAdminBuilder builder)
throws PulsarClientException {
+ return builder
+ .authentication(AuthenticationFactory.token(ADMIN_TOKEN))
+ .build();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ protected void cleanup() {
+ super.internalCleanup();
+ }
+
+ @DataProvider(name = "actors")
+ public Object[][] actors() {
+ return new Object[][]{
+ {"client", true},
+ {"client", false},
+ {"client2", true},
+ {"client2", false},
+ {"admin", true},
+ {"admin", false}
+ };
+ }
+
+ @Test(dataProvider = "actors")
+ public void testEndTxn(String actor, boolean afterUnload) throws Exception
{
+ @Cleanup final PulsarClient pulsarClientOwner = PulsarClient.builder()
+ .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+ .authentication(AuthenticationFactory.token(generateToken(kp,
"client")))
+ .enableTransaction(true)
+ .build();
+
+ @Cleanup final PulsarClient pulsarClientOther = PulsarClient.builder()
+ .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+ .authentication(AuthenticationFactory.token(generateToken(kp,
actor)))
+ .enableTransaction(true)
+ .build();
+ Transaction transaction = pulsarClientOwner.newTransaction()
+ .withTransactionTimeout(60, TimeUnit.SECONDS).build().get();
+
+ @Cleanup final Consumer<String> consumer = pulsarClientOwner
+ .newConsumer(Schema.STRING)
+ .subscriptionName("test")
+ .topic(TOPIC)
+ .subscribe();
+
+
+ @Cleanup final Producer<String> producer = pulsarClientOwner
+ .newProducer(Schema.STRING)
+ .sendTimeout(60, TimeUnit.SECONDS)
+ .topic(TOPIC)
+ .create();
+
+ producer.newMessage().value("beforetxn").send();
+ consumer.acknowledgeAsync(consumer.receive(5,
TimeUnit.SECONDS).getMessageId(), transaction);
+ producer.newMessage(transaction).value("message").send();
+ if (afterUnload) {
+ pulsarServiceList.get(0)
+ .getTransactionMetadataStoreService()
+ .removeTransactionMetadataStore(
+
TransactionCoordinatorID.get(transaction.getTxnID().getMostSigBits()));
+ }
+
+ final Throwable ex = syncGetException((
+ (PulsarClientImpl)
pulsarClientOther).getTcClient().commitAsync(transaction.getTxnID())
+ );
+ if (actor.equals("client") || actor.equals("admin")) {
+ Assert.assertNull(ex);
+ Assert.assertEquals(consumer.receive(5,
TimeUnit.SECONDS).getValue(), "message");
+ } else {
+ Assert.assertNotNull(ex);
+ Assert.assertTrue(ex instanceof
TransactionCoordinatorClientException, ex.getClass().getName());
+ Assert.assertNull(consumer.receive(5, TimeUnit.SECONDS));
+ transaction.commit().get();
+ Assert.assertEquals(consumer.receive(5,
TimeUnit.SECONDS).getValue(), "message");
+ }
+ }
+
+ @Test(dataProvider = "actors")
+ public void testAddPartitionToTxn(String actor, boolean afterUnload)
throws Exception {
+ @Cleanup final PulsarClient pulsarClientOwner = PulsarClient.builder()
+ .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+ .authentication(AuthenticationFactory.token(generateToken(kp,
"client")))
+ .enableTransaction(true)
+ .build();
+
+ @Cleanup final PulsarClient pulsarClientOther = PulsarClient.builder()
+ .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+ .authentication(AuthenticationFactory.token(generateToken(kp,
actor)))
+ .enableTransaction(true)
+ .build();
+ Transaction transaction = pulsarClientOwner.newTransaction()
+ .withTransactionTimeout(60, TimeUnit.SECONDS).build().get();
+
+ if (afterUnload) {
+ pulsarServiceList.get(0)
+ .getTransactionMetadataStoreService()
+ .removeTransactionMetadataStore(
+
TransactionCoordinatorID.get(transaction.getTxnID().getMostSigBits()));
+ }
+
+ final Throwable ex = syncGetException(((PulsarClientImpl)
pulsarClientOther)
+
.getTcClient().addPublishPartitionToTxnAsync(transaction.getTxnID(),
List.of(TOPIC)));
+
+ final TxnMeta txnMeta =
pulsarServiceList.get(0).getTransactionMetadataStoreService()
+ .getTxnMeta(transaction.getTxnID()).get();
+ if (actor.equals("client") || actor.equals("admin")) {
+ Assert.assertNull(ex);
+ Assert.assertEquals(txnMeta.producedPartitions(), List.of(TOPIC));
+ } else {
+ Assert.assertNotNull(ex);
+ Assert.assertTrue(ex instanceof
TransactionCoordinatorClientException);
+ Assert.assertTrue(txnMeta.producedPartitions().isEmpty());
+ }
+ }
+
+ @Test(dataProvider = "actors")
+ public void testAddSubscriptionToTxn(String actor, boolean afterUnload)
throws Exception {
+ @Cleanup final PulsarClient pulsarClientOwner = PulsarClient.builder()
+ .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+ .authentication(AuthenticationFactory.token(generateToken(kp,
"client")))
+ .enableTransaction(true)
+ .build();
+
+ @Cleanup final PulsarClient pulsarClientOther = PulsarClient.builder()
+ .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+ .authentication(AuthenticationFactory.token(generateToken(kp,
actor)))
+ .enableTransaction(true)
+ .build();
+ Transaction transaction = pulsarClientOwner.newTransaction()
+ .withTransactionTimeout(60, TimeUnit.SECONDS).build().get();
+
+ if (afterUnload) {
+ pulsarServiceList.get(0)
+ .getTransactionMetadataStoreService()
+ .removeTransactionMetadataStore(
+
TransactionCoordinatorID.get(transaction.getTxnID().getMostSigBits()));
+ }
+
+
+ final Throwable ex = syncGetException(((PulsarClientImpl)
pulsarClientOther)
+
.getTcClient().addSubscriptionToTxnAsync(transaction.getTxnID(), TOPIC, "sub"));
+
+ final TxnMeta txnMeta =
pulsarServiceList.get(0).getTransactionMetadataStoreService()
+ .getTxnMeta(transaction.getTxnID()).get();
+ if (actor.equals("client") || actor.equals("admin")) {
+ Assert.assertNull(ex);
+ Assert.assertEquals(txnMeta.ackedPartitions().size(), 1);
+ } else {
+ Assert.assertNotNull(ex);
+ Assert.assertTrue(ex instanceof
TransactionCoordinatorClientException);
+ Assert.assertTrue(txnMeta.ackedPartitions().isEmpty());
+ }
+ }
+
+ @Test
+ public void testNoAuth() throws Exception {
+ try {
+ @Cleanup final PulsarClient pulsarClient = PulsarClient.builder()
+ .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+ .enableTransaction(true)
+ .build();
+ Assert.fail("should have failed");
+ } catch (Exception t) {
+ Assert.assertTrue(Exceptions.areExceptionsPresentInChain(t,
+ PulsarClientException.AuthenticationException.class));
+ }
+ }
+
+ private static Throwable syncGetException(CompletableFuture<?> future) {
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ return e;
+ } catch (ExecutionException e) {
+ return FutureUtil.unwrapCompletionException(e);
+ }
+ return null;
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 444c68df055..fd49354342f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -35,7 +35,10 @@ import
org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -71,7 +74,9 @@ public abstract class TransactionTestBase extends
TestRetrySupport {
if (admin != null) {
admin.close();
}
- admin =
spy(PulsarAdmin.builder().serviceHttpUrl(pulsarServiceList.get(0).getWebServiceAddress()).build());
+ admin = spy(
+
createNewPulsarAdmin(PulsarAdmin.builder().serviceHttpUrl(pulsarServiceList.get(0).getWebServiceAddress()))
+ );
if (pulsarClient != null) {
pulsarClient.shutdown();
@@ -82,6 +87,15 @@ public abstract class TransactionTestBase extends
TestRetrySupport {
private void init() throws Exception {
startBroker();
}
+
+ protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder)
throws PulsarClientException {
+ return clientBuilder.build();
+ }
+
+ protected PulsarAdmin createNewPulsarAdmin(PulsarAdminBuilder builder)
throws PulsarClientException {
+ return builder.build();
+ }
+
protected void setUpBase(int numBroker,int numPartitionsOfTC, String
topic, int numPartitions) throws Exception{
setBrokerCount(numBroker);
internalSetup();
@@ -108,11 +122,10 @@ public abstract class TransactionTestBase extends
TestRetrySupport {
if (pulsarClient != null) {
pulsarClient.shutdown();
}
- pulsarClient = PulsarClient.builder()
+ pulsarClient = createNewPulsarClient(PulsarClient.builder()
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
- .enableTransaction(true)
- .build();
+ .enableTransaction(true));
}
protected void createTransactionCoordinatorAssign(int numPartitionsOfTC)
throws MetadataStoreException {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 663c1c50ce7..696a0a7957c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -896,7 +896,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
long timeoutCountOriginal = transactionMetadataStores.stream()
.mapToLong(store ->
store.getMetadataStoreStats().timeoutCount).sum();
TxnID txnID =
pulsarServiceList.get(0).getTransactionMetadataStoreService()
- .newTransaction(new TransactionCoordinatorID(0), 1).get();
+ .newTransaction(new TransactionCoordinatorID(0), 1,
null).get();
Awaitility.await().until(() -> {
try {
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txnID).get();
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionMetadata.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionMetadata.java
index 73fb43e45da..d22b956e8b0 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionMetadata.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionMetadata.java
@@ -41,4 +41,7 @@ public class TransactionMetadata {
/** The ackedPartitions of this transaction. */
public Map<String, Map<String, TransactionInPendingAckStats>>
ackedPartitions;
+
+ /** The owner of this transaction. */
+ public String owner;
}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
index 7f8280f5226..ff5adb4d409 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
@@ -56,11 +56,12 @@ public interface TransactionMetadataStore {
* Create a new transaction in the transaction metadata store.
*
* @param timeoutInMills the timeout duration of the transaction in mills
+* @param owner the role which is the owner of the transaction
* @return a future represents the result of creating a new transaction.
* it returns {@link TxnID} as the identifier for identifying the
* transaction.
*/
- CompletableFuture<TxnID> newTransaction(long timeoutInMills);
+ CompletableFuture<TxnID> newTransaction(long timeoutInMills, String owner);
/**
* Add the produced partitions to transaction identified by <tt>txnid</tt>.
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
index 104ae0ff1df..44f225b9448 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
@@ -107,4 +107,11 @@ public interface TxnMeta {
* @return transaction timeout at.
*/
long getTimeoutAt();
+
+ /**
+ * Return the transaction's owner.
+ *
+ * @return transaction's owner.
+ */
+ String getOwner();
}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
index a62df1df8c6..0f3c5e42d7a 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
@@ -73,12 +74,17 @@ class InMemTransactionMetadataStore implements
TransactionMetadataStore {
}
@Override
- public CompletableFuture<TxnID> newTransaction(long timeoutInMills) {
+ public CompletableFuture<TxnID> newTransaction(long timeoutInMills, String
owner) {
+ if (owner != null) {
+ if (StringUtils.isBlank(owner)) {
+ return CompletableFuture.failedFuture(new
IllegalArgumentException("Owner can't be blank"));
+ }
+ }
TxnID txnID = new TxnID(
tcID.getId(),
localID.getAndIncrement()
);
- TxnMetaImpl txn = new TxnMetaImpl(txnID, System.currentTimeMillis(),
timeoutInMills);
+ TxnMetaImpl txn = new TxnMetaImpl(txnID, System.currentTimeMillis(),
timeoutInMills, owner);
transactions.put(txnID, txn);
return CompletableFuture.completedFuture(txnID);
}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 903b04a1b5d..b6eaad2e3e3 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
@@ -149,8 +150,11 @@ public class MLTransactionMetadataStore
positions.add(position);
long openTimestamp =
transactionMetadataEntry.getStartTime();
long timeoutAt =
transactionMetadataEntry.getTimeoutMs();
- txnMetaMap.put(transactionId,
MutablePair.of(new TxnMetaImpl(txnID,
- openTimestamp, timeoutAt),
positions));
+ final String owner =
transactionMetadataEntry.hasOwner()
+ ?
transactionMetadataEntry.getOwner() : null;
+ final TxnMetaImpl left = new
TxnMetaImpl(txnID,
+ openTimestamp, timeoutAt, owner);
+ txnMetaMap.put(transactionId,
MutablePair.of(left, positions));
recoverTracker.handleOpenStatusTransaction(txnSequenceId,
timeoutAt + openTimestamp);
}
@@ -224,50 +228,58 @@ public class MLTransactionMetadataStore
}
@Override
- public CompletableFuture<TxnID> newTransaction(long timeOut) {
- if (this.maxActiveTransactionsPerCoordinator != 0
- && this.maxActiveTransactionsPerCoordinator <=
txnMetaMap.size()) {
+ public CompletableFuture<TxnID> newTransaction(long timeOut, String owner)
{
+ if (this.maxActiveTransactionsPerCoordinator == 0
+ || this.maxActiveTransactionsPerCoordinator >
txnMetaMap.size()) {
+ CompletableFuture<TxnID> completableFuture = new
CompletableFuture<>();
+ FutureUtil.safeRunAsync(() -> {
+ if (!checkIfReady()) {
+ completableFuture.completeExceptionally(new
CoordinatorException
+ .TransactionMetadataStoreStateException(tcID,
State.Ready, getState(), "new Transaction"));
+ return;
+ }
+
+ long mostSigBits = tcID.getId();
+ long leastSigBits = sequenceIdGenerator.generateSequenceId();
+ TxnID txnID = new TxnID(mostSigBits, leastSigBits);
+ long currentTimeMillis = System.currentTimeMillis();
+ TransactionMetadataEntry transactionMetadataEntry = new
TransactionMetadataEntry()
+ .setTxnidMostBits(mostSigBits)
+ .setTxnidLeastBits(leastSigBits)
+ .setStartTime(currentTimeMillis)
+ .setTimeoutMs(timeOut)
+
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
+ .setLastModificationTime(currentTimeMillis)
+
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+ if (owner != null) {
+ if (StringUtils.isBlank(owner)) {
+ completableFuture.completeExceptionally(new
IllegalArgumentException("Owner can't be blank"));
+ return;
+ }
+ transactionMetadataEntry.setOwner(owner);
+ }
+ transactionLog.append(transactionMetadataEntry)
+ .whenComplete((position, throwable) -> {
+ if (throwable != null) {
+
completableFuture.completeExceptionally(throwable);
+ } else {
+ appendLogCount.increment();
+ TxnMeta txn = new TxnMetaImpl(txnID,
currentTimeMillis, timeOut, owner);
+ List<Position> positions = new ArrayList<>();
+ positions.add(position);
+ Pair<TxnMeta, List<Position>> pair =
MutablePair.of(txn, positions);
+ txnMetaMap.put(leastSigBits, pair);
+
this.timeoutTracker.addTransaction(leastSigBits, timeOut);
+ createdTransactionCount.increment();
+ completableFuture.complete(txnID);
+ }
+ });
+ }, internalPinnedExecutor, completableFuture);
+ return completableFuture;
+ } else {
return FutureUtil.failedFuture(new
CoordinatorException.ReachMaxActiveTxnException("New txn op "
+ "reach max active txn! tcId : " +
getTransactionCoordinatorID().getId()));
}
- CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
- FutureUtil.safeRunAsync(() -> {
- if (!checkIfReady()) {
- completableFuture.completeExceptionally(new
CoordinatorException
- .TransactionMetadataStoreStateException(tcID,
State.Ready, getState(), "new Transaction"));
- return;
- }
-
- long mostSigBits = tcID.getId();
- long leastSigBits = sequenceIdGenerator.generateSequenceId();
- TxnID txnID = new TxnID(mostSigBits, leastSigBits);
- long currentTimeMillis = System.currentTimeMillis();
- TransactionMetadataEntry transactionMetadataEntry = new
TransactionMetadataEntry()
- .setTxnidMostBits(mostSigBits)
- .setTxnidLeastBits(leastSigBits)
- .setStartTime(currentTimeMillis)
- .setTimeoutMs(timeOut)
-
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
- .setLastModificationTime(currentTimeMillis)
-
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
- transactionLog.append(transactionMetadataEntry)
- .whenComplete((position, throwable) -> {
- if (throwable != null) {
- completableFuture.completeExceptionally(throwable);
- } else {
- appendLogCount.increment();
- TxnMeta txn = new TxnMetaImpl(txnID,
currentTimeMillis, timeOut);
- List<Position> positions = new ArrayList<>();
- positions.add(position);
- Pair<TxnMeta, List<Position>> pair =
MutablePair.of(txn, positions);
- txnMetaMap.put(leastSigBits, pair);
- this.timeoutTracker.addTransaction(leastSigBits,
timeOut);
- createdTransactionCount.increment();
- completableFuture.complete(txnID);
- }
- });
- }, internalPinnedExecutor, completableFuture);
- return completableFuture;
}
@Override
@@ -300,9 +312,9 @@ public class MLTransactionMetadataStore
promise.complete(null);
} catch (InvalidTxnStatusException e) {
transactionLog.deletePosition(Collections.singletonList(position));
- log.error("TxnID : " +
txnMetaListPair.getLeft().id().toString()
- + " add produced partition error with
TxnStatus : "
- +
txnMetaListPair.getLeft().status().name(), e);
+ log.error("TxnID {} add produced partition
error"
+ + " with TxnStatus: {}",
txnMetaListPair.getLeft().id().toString()
+ ,
txnMetaListPair.getLeft().status().name(), e);
promise.completeExceptionally(e);
}
});
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
index 8e0c793b588..ed38305abf0 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
@@ -42,11 +42,13 @@ class TxnMetaImpl implements TxnMeta {
private volatile TxnStatus txnStatus = TxnStatus.OPEN;
private final long openTimestamp;
private final long timeoutAt;
+ private final String owner;
- TxnMetaImpl(TxnID txnID, long openTimestamp, long timeoutAt) {
+ TxnMetaImpl(TxnID txnID, long openTimestamp, long timeoutAt, String owner)
{
this.txnID = txnID;
this.openTimestamp = openTimestamp;
this.timeoutAt = timeoutAt;
+ this.owner = owner;
}
@Override
@@ -161,4 +163,8 @@ class TxnMetaImpl implements TxnMeta {
return this.timeoutAt;
}
+ @Override
+ public String getOwner() {
+ return this.owner;
+ }
}
diff --git
a/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
b/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
index 6d506d48176..134d1cf3b51 100644
---
a/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
+++
b/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
@@ -51,6 +51,7 @@ message TransactionMetadataEntry {
optional uint64 start_time = 9;
optional uint64 last_modification_time = 10;
optional uint64 max_local_txn_id = 11;
+ optional string owner = 12;
}
message BatchedTransactionMetadataEntry{
diff --git
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index 2f806f0a474..3b831ad38ba 100644
---
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -100,7 +100,7 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
break;
}
if (transactionMetadataStore.checkIfReady()) {
- TxnID txnID =
transactionMetadataStore.newTransaction(5000).get();
+ TxnID txnID = transactionMetadataStore.newTransaction(5000,
null).get();
assertEquals(transactionMetadataStore.getTxnStatus(txnID).get(),
TxnStatus.OPEN);
List<String> partitions = new ArrayList<>();
@@ -181,7 +181,7 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
transactionMetadataStore.init(new
TransactionRecoverTrackerImpl()).get();
Awaitility.await().until(transactionMetadataStore::checkIfReady);
- TxnID txnID = transactionMetadataStore.newTransaction(20000).get();
+ TxnID txnID = transactionMetadataStore.newTransaction(20000,
null).get();
transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTING,
TxnStatus.OPEN, false).get();
if (isUseManagedLedgerProperties) {
transactionMetadataStore.updateTxnStatus(txnID,
TxnStatus.COMMITTED, TxnStatus.COMMITTING, false).get();
@@ -209,7 +209,7 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
transactionMetadataStore.init(new
TransactionRecoverTrackerImpl()).get();
Awaitility.await().until(transactionMetadataStore::checkIfReady);
- txnID = transactionMetadataStore.newTransaction(100000).get();
+ txnID = transactionMetadataStore.newTransaction(100000, null).get();
assertEquals(txnID.getLeastSigBits(), 1);
}
@@ -244,10 +244,8 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
break;
}
if (transactionMetadataStore.checkIfReady()) {
- CompletableFuture<TxnID> txIDFuture1 =
transactionMetadataStore.newTransaction(1000);
- CompletableFuture<TxnID> txIDFuture2 =
transactionMetadataStore.newTransaction(1000);
- TxnID txnID1 = txIDFuture1.get();
- TxnID txnID2 = txIDFuture2.get();
+ TxnID txnID1 = transactionMetadataStore.newTransaction(1000,
"user1").get();
+ TxnID txnID2 = transactionMetadataStore.newTransaction(1000,
"user2").get();
assertEquals(transactionMetadataStore.getTxnStatus(txnID1).get(),
TxnStatus.OPEN);
assertEquals(transactionMetadataStore.getTxnStatus(txnID2).get(),
TxnStatus.OPEN);
@@ -306,6 +304,9 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
assertEquals(txnMeta2.ackedPartitions().size(),
subscriptions.size());
Assert.assertTrue(subscriptions.containsAll(txnMeta1.ackedPartitions()));
Assert.assertTrue(subscriptions.containsAll(txnMeta2.ackedPartitions()));
+
+ assertEquals(txnMeta1.getOwner(), "user1");
+ assertEquals(txnMeta2.getOwner(), "user2");
assertEquals(txnMeta1.status(), TxnStatus.COMMITTING);
assertEquals(txnMeta2.status(), TxnStatus.COMMITTING);
transactionMetadataStoreTest
@@ -325,7 +326,7 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
} catch (ExecutionException e) {
Assert.assertTrue(e.getCause() instanceof
TransactionNotFoundException);
}
- TxnID txnID =
transactionMetadataStoreTest.newTransaction(1000).get();
+ TxnID txnID =
transactionMetadataStoreTest.newTransaction(1000, null).get();
assertEquals(txnID.getLeastSigBits(), 2L);
break;
} else {
@@ -370,10 +371,8 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
break;
}
if (transactionMetadataStore.checkIfReady()) {
- CompletableFuture<TxnID> txIDFuture1 =
transactionMetadataStore.newTransaction(1000);
- CompletableFuture<TxnID> txIDFuture2 =
transactionMetadataStore.newTransaction(1000);
- TxnID txnID1 = txIDFuture1.get();
- TxnID txnID2 = txIDFuture2.get();
+ TxnID txnID1 = transactionMetadataStore.newTransaction(1000,
null).get();
+ TxnID txnID2 = transactionMetadataStore.newTransaction(1000,
null).get();
assertEquals(transactionMetadataStore.getTxnStatus(txnID1).get(),
TxnStatus.OPEN);
assertEquals(transactionMetadataStore.getTxnStatus(txnID2).get(),
TxnStatus.OPEN);
@@ -447,9 +446,9 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
Awaitility.await().until(transactionMetadataStore::checkIfReady);
// txnID1 have not deleted from cursor, we can recover from
transaction log
- TxnID txnID1 = transactionMetadataStore.newTransaction(1000).get();
+ TxnID txnID1 = transactionMetadataStore.newTransaction(1000,
null).get();
// txnID2 have deleted from cursor.
- TxnID txnID2 = transactionMetadataStore.newTransaction(1000).get();
+ TxnID txnID2 = transactionMetadataStore.newTransaction(1000,
null).get();
transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTING,
TxnStatus.OPEN, false).get();
transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTED,
TxnStatus.ABORTING, false).get();
@@ -485,7 +484,7 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
transactionMetadataStore.init(new
TransactionRecoverTrackerImpl()).get();
Awaitility.await().until(transactionMetadataStore::checkIfReady);
- transactionMetadataStore.newTransaction(5000).get();
+ transactionMetadataStore.newTransaction(5000, null).get();
Field field =
MLTransactionLogImpl.class.getDeclaredField("managedLedger");
field.setAccessible(true);
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
field.get(mlTransactionLog);
@@ -494,12 +493,12 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
AtomicReferenceFieldUpdater state = (AtomicReferenceFieldUpdater)
field.get(managedLedger);
state.set(managedLedger, WriteFailed);
try {
- transactionMetadataStore.newTransaction(5000).get();
+ transactionMetadataStore.newTransaction(5000, null).get();
fail();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof
ManagedLedgerException.ManagedLedgerAlreadyClosedException);
}
- transactionMetadataStore.newTransaction(5000).get();
+ transactionMetadataStore.newTransaction(5000, null).get();
}
diff --git
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
index c7f9cd21ac8..04b2d2fe650 100644
---
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
+++
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
@@ -92,14 +92,14 @@ public class TransactionMetadataStoreProviderTest {
@Test
public void testGetTxnStatusSuccess() throws Exception {
- TxnID txnID = this.store.newTransaction(0L).get();
+ TxnID txnID = this.store.newTransaction(0L, null).get();
TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
assertEquals(txnStatus, TxnStatus.OPEN);
}
@Test
public void testUpdateTxnStatusSuccess() throws Exception {
- TxnID txnID = this.store.newTransaction(0L).get();
+ TxnID txnID = this.store.newTransaction(0L, null).get();
TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
assertEquals(txnStatus, TxnStatus.OPEN);
@@ -113,7 +113,7 @@ public class TransactionMetadataStoreProviderTest {
@Test
public void testUpdateTxnStatusNotExpectedStatus() throws Exception {
- TxnID txnID = this.store.newTransaction(0L).get();
+ TxnID txnID = this.store.newTransaction(0L, null).get();
TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
assertEquals(txnStatus, TxnStatus.OPEN);
@@ -132,7 +132,7 @@ public class TransactionMetadataStoreProviderTest {
@Test
public void testUpdateTxnStatusCannotTransition() throws Exception {
- TxnID txnID = this.store.newTransaction(0L).get();
+ TxnID txnID = this.store.newTransaction(0L, null).get();
TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
assertEquals(txnStatus, TxnStatus.OPEN);
@@ -151,7 +151,7 @@ public class TransactionMetadataStoreProviderTest {
@Test
public void testAddProducedPartition() throws Exception {
- TxnID txnID = this.store.newTransaction(0L).get();
+ TxnID txnID = this.store.newTransaction(0L, null).get();
TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
assertEquals(txnStatus, TxnStatus.OPEN);
@@ -205,7 +205,7 @@ public class TransactionMetadataStoreProviderTest {
@Test
public void testAddAckedPartition() throws Exception {
- TxnID txnID = this.store.newTransaction(0L).get();
+ TxnID txnID = this.store.newTransaction(0L, null).get();
TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
assertEquals(txnStatus, TxnStatus.OPEN);