liangyepianzhou commented on code in PR #15592:
URL: https://github.com/apache/pulsar/pull/15592#discussion_r876957202
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -384,30 +376,36 @@ public void addFailed(ManagedLedgerException exception,
Object ctx) {
}
private void handleLowWaterMark(TxnID txnID, long lowWaterMark) {
- if (!ongoingTxns.isEmpty()) {
- TxnID firstTxn = ongoingTxns.firstKey();
- if (firstTxn.getMostSigBits() == txnID.getMostSigBits() &&
lowWaterMark >= firstTxn.getLeastSigBits()) {
- ByteBuf abortMarker = Markers.newTxnAbortMarker(-1L,
- firstTxn.getMostSigBits(), firstTxn.getLeastSigBits());
- try {
- topic.getManagedLedger().asyncAddEntry(abortMarker, new
AsyncCallbacks.AddEntryCallback() {
- @Override
- public void addComplete(Position position, ByteBuf
entryData, Object ctx) {
- synchronized (TopicTransactionBuffer.this) {
- aborts.put(firstTxn, (PositionImpl) position);
- updateMaxReadPosition(firstTxn);
- }
- }
-
- @Override
- public void addFailed(ManagedLedgerException
exception, Object ctx) {
- log.error("Failed to abort low water mark for txn
{}", txnID, exception);
- }
- }, null);
- } finally {
- abortMarker.release();
+ lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark)
-> {
+ if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
+ return lowWaterMark;
+ } else {
+ return oldLowWaterMark;
+ }
+ });
+ if (handleLowWaterMark.tryAcquire()) {
+ if (!ongoingTxns.isEmpty()) {
+ TxnID firstTxn = ongoingTxns.firstKey();
+ long tCId = firstTxn.getMostSigBits();
+ Long lowWaterMarkOfFirstTxnId = lowWaterMarks.get(tCId);
+ if (lowWaterMarkOfFirstTxnId != null &&
firstTxn.getLeastSigBits() <= lowWaterMarkOfFirstTxnId) {
+ abortTxn(firstTxn, lowWaterMarkOfFirstTxnId)
+ .thenRun(() -> {
+ log.warn("Successes to abort low water mark
for txn [{}], topic [{}],"
+ + " lowWaterMark [{}]", firstTxn,
topic.getName(), lowWaterMarkOfFirstTxnId);
+ handleLowWaterMark.release();
+ })
+ .exceptionally(ex -> {
+ log.warn("Failed to abort low water mark for
txn {}, topic [{}], "
+ + "lowWaterMark [{}], ", firstTxn,
topic.getName(), lowWaterMarkOfFirstTxnId,
+ ex);
+ handleLowWaterMark.release();
+ return null;
+ });
+ return;
}
}
+ handleLowWaterMark.release();
Review Comment:
f () {
if () {
abort then release semaphore
return
}}
release semaphore
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]