This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fb9eda20691ea435d741640c2bddd69f80017052
Author: ken <[email protected]>
AuthorDate: Mon Nov 3 17:48:20 2025 +0800

    [fix][broker] fix getMaxReadPosition in TransactionBufferDisable should 
return latest (#24898)
    
    Co-authored-by: fanjianye <[email protected]>
    (cherry picked from commit b297f1f5f0332a892da742ca6e2ff87d2600296f)
---
 .../apache/pulsar/broker/service/persistent/PersistentTopic.java  | 8 +++++++-
 .../broker/transaction/buffer/impl/TransactionBufferDisable.java  | 2 +-
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 221bb577d4b..c469a82b87b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -4042,6 +4042,12 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         if (lastDispatchablePosition != null) {
             return CompletableFuture.completedFuture(lastDispatchablePosition);
         }
+        PositionImpl lastPosition;
+        if (transactionBuffer instanceof TransactionBufferDisable) {
+            lastPosition = (PositionImpl) getLastPosition();
+        } else {
+            lastPosition = getMaxReadPosition();
+        }
         return ManagedLedgerImplUtils
                 .asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry 
-> {
                     MessageMetadata md = 
Commands.parseMessageMetadata(entry.getDataBuffer());
@@ -4054,7 +4060,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                         return !isTxnAborted(txnID, (PositionImpl) 
entry.getPosition());
                     }
                     return true;
-                }, getMaxReadPosition())
+                }, lastPosition)
                 .thenApply(position -> {
                     // Update lastDispatchablePosition to the given position
                     updateLastDispatchablePosition(position);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index 6f5dc0cd4d0..e226e0e4e79 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -109,7 +109,7 @@ public class TransactionBufferDisable implements 
TransactionBuffer {
 
     @Override
     public PositionImpl getMaxReadPosition() {
-        return (PositionImpl) topic.getLastPosition();
+        return PositionImpl.LATEST;
     }
 
     @Override

Reply via email to