liangyepianzhou commented on code in PR #15592:
URL: https://github.com/apache/pulsar/pull/15592#discussion_r876957202


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -384,30 +376,36 @@ public void addFailed(ManagedLedgerException exception, 
Object ctx) {
     }
 
     private void handleLowWaterMark(TxnID txnID, long lowWaterMark) {
-        if (!ongoingTxns.isEmpty()) {
-            TxnID firstTxn = ongoingTxns.firstKey();
-            if (firstTxn.getMostSigBits() == txnID.getMostSigBits() && 
lowWaterMark >= firstTxn.getLeastSigBits()) {
-                ByteBuf abortMarker = Markers.newTxnAbortMarker(-1L,
-                        firstTxn.getMostSigBits(), firstTxn.getLeastSigBits());
-                try {
-                    topic.getManagedLedger().asyncAddEntry(abortMarker, new 
AsyncCallbacks.AddEntryCallback() {
-                        @Override
-                        public void addComplete(Position position, ByteBuf 
entryData, Object ctx) {
-                            synchronized (TopicTransactionBuffer.this) {
-                                aborts.put(firstTxn, (PositionImpl) position);
-                                updateMaxReadPosition(firstTxn);
-                            }
-                        }
-
-                        @Override
-                        public void addFailed(ManagedLedgerException 
exception, Object ctx) {
-                            log.error("Failed to abort low water mark for txn 
{}", txnID, exception);
-                        }
-                    }, null);
-                } finally {
-                    abortMarker.release();
+        lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark) 
-> {
+            if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
+                return lowWaterMark;
+            } else {
+                return oldLowWaterMark;
+            }
+        });
+        if (handleLowWaterMark.tryAcquire()) {
+            if (!ongoingTxns.isEmpty()) {
+                TxnID firstTxn = ongoingTxns.firstKey();
+                long tCId = firstTxn.getMostSigBits();
+                Long lowWaterMarkOfFirstTxnId = lowWaterMarks.get(tCId);
+                if (lowWaterMarkOfFirstTxnId != null && 
firstTxn.getLeastSigBits() <= lowWaterMarkOfFirstTxnId) {
+                    abortTxn(firstTxn, lowWaterMarkOfFirstTxnId)
+                            .thenRun(() -> {
+                                log.warn("Successes to abort low water mark 
for txn [{}], topic [{}],"
+                                        + " lowWaterMark [{}]", firstTxn, 
topic.getName(), lowWaterMarkOfFirstTxnId);
+                                handleLowWaterMark.release();
+                            })
+                            .exceptionally(ex -> {
+                                log.warn("Failed to abort low water mark for 
txn {}, topic [{}], "
+                                        + "lowWaterMark [{}], ", firstTxn, 
topic.getName(), lowWaterMarkOfFirstTxnId,
+                                        ex);
+                                handleLowWaterMark.release();
+                                return null;
+                            });
+                    return;
                 }
             }
+            handleLowWaterMark.release();

Review Comment:
   f () {
   if () {
   abort then release semaphore
   return
   }}
   release semaphore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to