This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 266243cae24 [improve][broker] Optimize 
PersistentTopic.getLastDispatchablePosition (#22707)
266243cae24 is described below

commit 266243cae246a6fa52b4b6c626932885ad44cbf4
Author: 道君 <[email protected]>
AuthorDate: Tue Jun 11 23:45:12 2024 +0800

    [improve][broker] Optimize PersistentTopic.getLastDispatchablePosition 
(#22707)
    
    ### Motivation
    
    
[PersistentTopic#getLastDispatchablePosition](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3776-L3788)
 is using by
    
[Reader#hasMessageAvailable](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java#L116)
 , 
[ConsumerImpl#hasMessageAvailable](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L2440-L2448),
 
[Consumer#getLastMessageIdAsync](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L591-L615).
    
    The current implementation is read entries from Bookkeeper(or sth else), 
which leads to low throughput, high latency and heavy load, this PR is for the 
purpose of optimization.
---
 .../broker/service/persistent/PersistentTopic.java | 66 ++++++++++++++++++----
 .../buffer/impl/InMemTransactionBuffer.java        | 14 ++++-
 .../buffer/impl/TopicTransactionBuffer.java        | 11 ++++
 .../buffer/impl/TransactionBufferDisable.java      | 14 ++++-
 4 files changed, 89 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index eb15e31b49b..d78dac899b7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -317,6 +317,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         Long estimatedOldestUnacknowledgedMessageTimestamp;
     }
 
+    // The last position that can be dispatched to consumers
+    private volatile Position lastDispatchablePosition;
+
     /***
      * We use 3 futures to prevent a new closing if there is an in-progress 
deletion or closing.  We make Pulsar return
      * the in-progress one when it is called the second time.
@@ -3792,18 +3795,57 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     @Override
     public CompletableFuture<Position> getLastDispatchablePosition() {
-        return 
ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, 
entry -> {
-            MessageMetadata md = 
Commands.parseMessageMetadata(entry.getDataBuffer());
-            // If a messages has marker will filter by 
AbstractBaseDispatcher.filterEntriesForConsumer
-            if (Markers.isServerOnlyMarker(md)) {
-                return false;
-            } else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
-                // Filter-out transaction aborted messages.
-                TxnID txnID = new TxnID(md.getTxnidMostBits(), 
md.getTxnidLeastBits());
-                return !isTxnAborted(txnID, (PositionImpl) 
entry.getPosition());
-            }
-            return true;
-        }, getMaxReadPosition());
+        if (lastDispatchablePosition != null) {
+            return CompletableFuture.completedFuture(lastDispatchablePosition);
+        }
+        return ManagedLedgerImplUtils
+                .asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry 
-> {
+                    MessageMetadata md = 
Commands.parseMessageMetadata(entry.getDataBuffer());
+                    // If a messages has marker will filter by 
AbstractBaseDispatcher.filterEntriesForConsumer
+                    if (Markers.isServerOnlyMarker(md)) {
+                        return false;
+                    } else if (md.hasTxnidMostBits() && 
md.hasTxnidLeastBits()) {
+                        // Filter-out transaction aborted messages.
+                        TxnID txnID = new TxnID(md.getTxnidMostBits(), 
md.getTxnidLeastBits());
+                        return !isTxnAborted(txnID, (PositionImpl) 
entry.getPosition());
+                    }
+                    return true;
+                }, getMaxReadPosition())
+                .thenApply(position -> {
+                    // Update lastDispatchablePosition to the given position
+                    updateLastDispatchablePosition(position);
+                    return position;
+                });
+    }
+
+    /**
+     * Update lastDispatchablePosition if the given position is greater than 
the lastDispatchablePosition.
+     *
+     * @param position
+     */
+    public synchronized void updateLastDispatchablePosition(Position position) 
{
+        // Update lastDispatchablePosition to null if the position is null, 
fallback to
+        // ManagedLedgerImplUtils#asyncGetLastValidPosition
+        if (position == null) {
+            lastDispatchablePosition = null;
+            return;
+        }
+
+        PositionImpl position0 = (PositionImpl) position;
+        // If the position is greater than the maxReadPosition, ignore
+        if (position0.compareTo(getMaxReadPosition()) > 0) {
+            return;
+        }
+        // If the lastDispatchablePosition is null, set it to the position
+        if (lastDispatchablePosition == null) {
+            lastDispatchablePosition = position;
+            return;
+        }
+        // If the position is greater than the lastDispatchablePosition, 
update it
+        PositionImpl lastDispatchablePosition0 = (PositionImpl) 
lastDispatchablePosition;
+        if (position0.compareTo(lastDispatchablePosition0) > 0) {
+            lastDispatchablePosition = position;
+        }
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index bab7b64c608..533d0716d41 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -377,8 +377,11 @@ class InMemTransactionBuffer implements TransactionBuffer {
 
     @Override
     public void syncMaxReadPositionForNormalPublish(PositionImpl position, 
boolean isMarkerMessage) {
-        if (!isMarkerMessage && maxReadPositionCallBack != null) {
-            maxReadPositionCallBack.maxReadPositionMovedForward(null, 
position);
+        if (!isMarkerMessage) {
+            updateLastDispatchablePosition(position);
+            if (maxReadPositionCallBack != null) {
+                maxReadPositionCallBack.maxReadPositionMovedForward(null, 
position);
+            }
         }
     }
 
@@ -436,4 +439,11 @@ class InMemTransactionBuffer implements TransactionBuffer {
                 .filter(txnBuffer -> 
txnBuffer.status.equals(TxnStatus.COMMITTED))
                 .count();
     }
+
+    // ThreadSafe
+    private void updateLastDispatchablePosition(Position position) {
+        if (topic instanceof PersistentTopic t) {
+            t.updateLastDispatchablePosition(position);
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index dfb73815e08..fbd4ddf7da0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -297,6 +297,11 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
         }
     }
 
+    // ThreadSafe
+    private void updateLastDispatchablePosition(Position position) {
+        topic.updateLastDispatchablePosition(position);
+    }
+
     @Override
     public CompletableFuture<TransactionBufferReader> 
openTransactionBufferReader(TxnID txnID, long startSequenceId) {
         return null;
@@ -459,6 +464,8 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
         } else {
             updateMaxReadPosition((PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry(), false);
         }
+        // Update the last dispatchable position to null if there is a TXN 
finished.
+        updateLastDispatchablePosition(null);
     }
 
     /**
@@ -523,6 +530,10 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                 }
             }
         }
+        // If the message is a normal message, update the last dispatchable 
position.
+        if (!isMarkerMessage) {
+            updateLastDispatchablePosition(position);
+        }
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index ebd61dbaa82..6f5dc0cd4d0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -99,8 +99,11 @@ public class TransactionBufferDisable implements 
TransactionBuffer {
 
     @Override
     public void syncMaxReadPositionForNormalPublish(PositionImpl position, 
boolean isMarkerMessage) {
-        if (!isMarkerMessage && maxReadPositionCallBack != null) {
-            maxReadPositionCallBack.maxReadPositionMovedForward(null, 
position);
+        if (!isMarkerMessage) {
+            updateLastDispatchablePosition(position);
+            if (maxReadPositionCallBack != null) {
+                maxReadPositionCallBack.maxReadPositionMovedForward(null, 
position);
+            }
         }
     }
 
@@ -148,4 +151,11 @@ public class TransactionBufferDisable implements 
TransactionBuffer {
     public long getCommittedTxnCount() {
         return 0;
     }
+
+    // ThreadSafe
+    private void updateLastDispatchablePosition(Position position) {
+        if (topic instanceof PersistentTopic t) {
+            t.updateLastDispatchablePosition(position);
+        }
+    }
 }

Reply via email to