This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9937d22ffea [improve][broker] Use atomic counter for ongoing
transaction count (#25053)
9937d22ffea is described below
commit 9937d22ffea22b199fd5782153ebb4a9c2f25391
Author: Ruimin MA <[email protected]>
AuthorDate: Mon Dec 15 09:19:32 2025 +0800
[improve][broker] Use atomic counter for ongoing transaction count (#25053)
---
.../TransactionMetadataStoreServiceTest.java | 47 +++++++++-------------
.../impl/MLTransactionMetadataStore.java | 27 ++++++++++---
.../coordinator/impl/MLTransactionLogImplTest.java | 2 +-
3 files changed, 40 insertions(+), 36 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
index df61d37564c..9f3776ca7cb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -173,10 +174,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
(MLTransactionMetadataStore)
pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0));
checkTransactionMetadataStoreReady(transactionMetadataStore);
- Field field =
MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
- field.setAccessible(true);
- ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
- (ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>)
field.get(transactionMetadataStore);
+ ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
transactionMetadataStore.getTxnMetaMap();
int i = -1;
while (++i < 1000) {
try {
@@ -189,7 +187,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
txnMap.forEach((txnID, txnMetaListPair) ->
Assert.assertEquals(txnMetaListPair.getLeft().status(),
TxnStatus.OPEN));
Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS)
- .until(() -> txnMap.size() == 0);
+ .until(() ->
transactionMetadataStore.getOnGoingTxnCount().intValue() == 0);
}
private TxnID newTransactionWithTimeoutOf(long timeout)
@@ -209,25 +207,23 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
(MLTransactionMetadataStore)
pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0));
checkTransactionMetadataStoreReady(transactionMetadataStore);
- Field field =
MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
- field.setAccessible(true);
- ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
- (ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>)
field.get(transactionMetadataStore);
+ ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
transactionMetadataStore.getTxnMetaMap();
+ LongAdder onGoingTxtCount =
transactionMetadataStore.getOnGoingTxnCount();
newTransactionWithTimeoutOf(2000);
- assertEquals(txnMap.size(), 1);
+ assertEquals(onGoingTxtCount.intValue(), 1);
txnMap.forEach((txnID, txnMetaListPair) ->
Assert.assertEquals(txnMetaListPair.getLeft().status(),
TxnStatus.OPEN));
- Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() ->
txnMap.size() == 0);
+ Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() ->
onGoingTxtCount.intValue() == 0);
newTransactionWithTimeoutOf(2000);
- assertEquals(txnMap.size(), 1);
+ assertEquals(onGoingTxtCount.intValue(), 1);
txnMap.forEach((txnID, txnMetaListPair) ->
Assert.assertEquals(txnMetaListPair.getLeft().status(),
TxnStatus.OPEN));
- Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() ->
txnMap.size() == 0);
+ Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() ->
onGoingTxtCount.intValue() == 0);
}
@Test
@@ -241,10 +237,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
.getStores().get(TransactionCoordinatorID.get(0));
checkTransactionMetadataStoreReady(transactionMetadataStore);
- Field field =
MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
- field.setAccessible(true);
- ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
- (ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>)
field.get(transactionMetadataStore);
+ LongAdder onGoingTxtCount =
transactionMetadataStore.getOnGoingTxnCount();
new Thread(() -> {
int i = -1;
while (++i < 100) {
@@ -289,15 +282,15 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
}
}).start();
- checkoutTimeout(txnMap, 300);
- checkoutTimeout(txnMap, 200);
- checkoutTimeout(txnMap, 100);
- checkoutTimeout(txnMap, 0);
+ checkoutTimeout(onGoingTxtCount, 300);
+ checkoutTimeout(onGoingTxtCount, 200);
+ checkoutTimeout(onGoingTxtCount, 100);
+ checkoutTimeout(onGoingTxtCount, 0);
}
- private void checkoutTimeout(ConcurrentSkipListMap<Long, Pair<TxnMeta,
List<Position>>> txnMap, int time) {
+ private void checkoutTimeout(LongAdder onGoingTxtCount, int time) {
Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS)
- .until(() -> txnMap.size() == time);
+ .until(() -> onGoingTxtCount.intValue() == time);
}
@Test
@@ -326,12 +319,8 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
.getStores().get(TransactionCoordinatorID.get(0));
checkTransactionMetadataStoreReady(transactionMetadataStore);
-
- Field field =
MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
- field.setAccessible(true);
- ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
- (ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>)
field.get(transactionMetadataStore);
- Awaitility.await().until(() -> txnMap.size() == 0);
+ LongAdder onGoingTxtCount =
transactionMetadataStore.getOnGoingTxnCount();
+ Awaitility.await().until(() -> onGoingTxtCount.intValue() == 0);
}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 6bd7a947e38..526fe0fcb7f 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
+import lombok.Getter;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.StringUtils;
@@ -72,6 +73,7 @@ public class MLTransactionMetadataStore
private final TransactionCoordinatorID tcID;
private final MLTransactionLogImpl transactionLog;
@VisibleForTesting
+ @Getter
final ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>
txnMetaMap = new ConcurrentSkipListMap<>();
private final TransactionTimeoutTracker timeoutTracker;
private final TransactionMetadataStoreStats transactionMetadataStoreStats;
@@ -80,6 +82,9 @@ public class MLTransactionMetadataStore
private final LongAdder abortedTransactionCount;
private final LongAdder transactionTimeoutCount;
private final LongAdder appendLogCount;
+ @Getter
+ @VisibleForTesting
+ private final LongAdder onGoingTxnCount;
private final MLTransactionSequenceIdGenerator sequenceIdGenerator;
private final ExecutorService internalPinnedExecutor;
public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();
@@ -108,6 +113,7 @@ public class MLTransactionMetadataStore
this.abortedTransactionCount = new LongAdder();
this.transactionTimeoutCount = new LongAdder();
this.appendLogCount = new LongAdder();
+ this.onGoingTxnCount = new LongAdder();
DefaultThreadFactory threadFactory = new
DefaultThreadFactory("transaction_coordinator_"
+ tcID.toString() + "thread_factory");
this.internalPinnedExecutor =
Executors.newSingleThreadScheduledExecutor(threadFactory);
@@ -162,6 +168,7 @@ public class MLTransactionMetadataStore
final TxnMetaImpl left = new
TxnMetaImpl(txnID,
openTimestamp, timeoutAt, owner);
txnMetaMap.put(transactionId,
MutablePair.of(left, positions));
+ onGoingTxnCount.increment();
recoverTracker.handleOpenStatusTransaction(txnSequenceId,
timeoutAt + openTimestamp);
}
@@ -197,8 +204,12 @@ public class MLTransactionMetadataStore
recoverTracker.updateTransactionStatus(txnID.getLeastSigBits(), newStatus);
if (newStatus == TxnStatus.COMMITTED ||
newStatus == TxnStatus.ABORTED) {
transactionLog.deletePosition(txnMetaMap
-
.get(transactionId).getRight()).thenAccept(v ->
-
txnMetaMap.remove(transactionId).getLeft());
+
.get(transactionId).getRight()).thenAccept(v -> {
+ if
(txnMetaMap.remove(transactionId) != null) {
+
onGoingTxnCount.decrement();
+ }
+ }
+ );
}
}
break;
@@ -237,7 +248,7 @@ public class MLTransactionMetadataStore
@Override
public CompletableFuture<TxnID> newTransaction(long timeOut, String owner)
{
if (this.maxActiveTransactionsPerCoordinator == 0
- || this.maxActiveTransactionsPerCoordinator >
txnMetaMap.size()) {
+ || this.maxActiveTransactionsPerCoordinator >
onGoingTxnCount.longValue()) {
CompletableFuture<TxnID> completableFuture = new
CompletableFuture<>();
FutureUtil.safeRunAsync(() -> {
if (!checkIfReady()) {
@@ -276,6 +287,7 @@ public class MLTransactionMetadataStore
positions.add(position);
Pair<TxnMeta, List<Position>> pair =
MutablePair.of(txn, positions);
txnMetaMap.put(leastSigBits, pair);
+ onGoingTxnCount.increment();
this.timeoutTracker.addTransaction(leastSigBits, timeOut);
createdTransactionCount.increment();
completableFuture.complete(txnID);
@@ -422,7 +434,9 @@ public class MLTransactionMetadataStore
} else {
abortedTransactionCount.increment();
}
- txnMetaMap.remove(txnID.getLeastSigBits());
+ if
(txnMetaMap.remove(txnID.getLeastSigBits()) != null) {
+ onGoingTxnCount.decrement();
+ }
transactionLog.deletePosition(txnMetaListPair.getRight()).exceptionally(ex -> {
log.warn("Failed to delete transaction
log position "
+ "at end transaction [{}]",
txnID);
@@ -466,7 +480,7 @@ public class MLTransactionMetadataStore
transactionCoordinatorstats.setLowWaterMark(getLowWaterMark());
transactionCoordinatorstats.setState(getState().name());
transactionCoordinatorstats.setLeastSigBits(sequenceIdGenerator.getCurrentSequenceId());
- transactionCoordinatorstats.ongoingTxnSize = txnMetaMap.size();
+ transactionCoordinatorstats.ongoingTxnSize =
onGoingTxnCount.longValue();
transactionCoordinatorstats.recoverStartTime =
recoverTime.getRecoverStartTime();
transactionCoordinatorstats.recoverEndTime =
recoverTime.getRecoverEndTime();
return transactionCoordinatorstats;
@@ -490,6 +504,7 @@ public class MLTransactionMetadataStore
internalPinnedExecutor.shutdown();
return transactionLog.closeAsync().thenCompose(v -> {
txnMetaMap.clear();
+ onGoingTxnCount.reset();
this.timeoutTracker.close();
if (!this.changeToCloseState()) {
return FutureUtil.failedFuture(
@@ -508,7 +523,7 @@ public class MLTransactionMetadataStore
@Override
public TransactionMetadataStoreStats getMetadataStoreStats() {
this.transactionMetadataStoreStats.setCoordinatorId(tcID.getId());
- this.transactionMetadataStoreStats.setActives(txnMetaMap.size());
+
this.transactionMetadataStoreStats.setActives(onGoingTxnCount.intValue());
this.transactionMetadataStoreStats.setCreatedCount(this.createdTransactionCount.longValue());
this.transactionMetadataStoreStats.setCommittedCount(this.committedTransactionCount.longValue());
this.transactionMetadataStoreStats.setAbortedCount(this.abortedTransactionCount.longValue());
diff --git
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java
index 5355256d369..052b2d193b9 100644
---
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java
+++
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java
@@ -171,7 +171,7 @@ public class MLTransactionLogImplTest extends
MockedBookKeeperTestCase {
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLogForRecover, timeoutTracker,
sequenceIdGenerator, Integer.MAX_VALUE);
transactionMetadataStoreForRecover.init(recoverTracker).get(2000,
TimeUnit.SECONDS);
-
Assert.assertEquals(transactionMetadataStoreForRecover.txnMetaMap.size(),
expectedMapping.size());
+
Assert.assertEquals(transactionMetadataStoreForRecover.getOnGoingTxnCount().intValue(),
expectedMapping.size());
Iterator<Integer> txnIdSet = expectedMapping.keySet().iterator();
while (txnIdSet.hasNext()){
int txnId = txnIdSet.next();