This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b297f1f5f03 [fix][broker] fix getMaxReadPosition in 
TransactionBufferDisable should return latest (#24898)
b297f1f5f03 is described below

commit b297f1f5f0332a892da742ca6e2ff87d2600296f
Author: ken <[email protected]>
AuthorDate: Mon Nov 3 17:48:20 2025 +0800

    [fix][broker] fix getMaxReadPosition in TransactionBufferDisable should 
return latest (#24898)
    
    Co-authored-by: fanjianye <[email protected]>
---
 .../apache/pulsar/broker/service/persistent/PersistentTopic.java  | 8 +++++++-
 .../broker/transaction/buffer/impl/TransactionBufferDisable.java  | 3 ++-
 2 files changed, 9 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1f424df17a1..2f0255f4f2a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -4140,6 +4140,12 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         if (lastDispatchablePosition != null) {
             return CompletableFuture.completedFuture(lastDispatchablePosition);
         }
+        Position lastPosition;
+        if (transactionBuffer instanceof TransactionBufferDisable) {
+            lastPosition = getLastPosition();
+        } else {
+            lastPosition = getMaxReadPosition();
+        }
         return ledger.getLastDispatchablePosition(entry -> {
             MessageMetadata md = entry.getMessageMetadata();
             if (md == null) {
@@ -4154,7 +4160,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 return !isTxnAborted(txnID, entry.getPosition());
             }
             return true;
-        }, getMaxReadPosition()).thenApply(position -> {
+        }, lastPosition).thenApply(position -> {
             // Update lastDispatchablePosition to the given position
             updateLastDispatchablePosition(position);
             return position;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index d4fd071fef8..f6e2ad04e50 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -108,7 +109,7 @@ public class TransactionBufferDisable implements 
TransactionBuffer {
 
     @Override
     public Position getMaxReadPosition() {
-        return topic.getLastPosition();
+        return PositionFactory.LATEST;
     }
 
     @Override

Reply via email to