This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fb9eda20691ea435d741640c2bddd69f80017052 Author: ken <[email protected]> AuthorDate: Mon Nov 3 17:48:20 2025 +0800 [fix][broker] fix getMaxReadPosition in TransactionBufferDisable should return latest (#24898) Co-authored-by: fanjianye <[email protected]> (cherry picked from commit b297f1f5f0332a892da742ca6e2ff87d2600296f) --- .../apache/pulsar/broker/service/persistent/PersistentTopic.java | 8 +++++++- .../broker/transaction/buffer/impl/TransactionBufferDisable.java | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 221bb577d4b..c469a82b87b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -4042,6 +4042,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal if (lastDispatchablePosition != null) { return CompletableFuture.completedFuture(lastDispatchablePosition); } + PositionImpl lastPosition; + if (transactionBuffer instanceof TransactionBufferDisable) { + lastPosition = (PositionImpl) getLastPosition(); + } else { + lastPosition = getMaxReadPosition(); + } return ManagedLedgerImplUtils .asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> { MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer()); @@ -4054,7 +4060,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return !isTxnAborted(txnID, (PositionImpl) entry.getPosition()); } return true; - }, getMaxReadPosition()) + }, lastPosition) .thenApply(position -> { // Update lastDispatchablePosition to the given position updateLastDispatchablePosition(position); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java index 6f5dc0cd4d0..e226e0e4e79 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java @@ -109,7 +109,7 @@ public class TransactionBufferDisable implements TransactionBuffer { @Override public PositionImpl getMaxReadPosition() { - return (PositionImpl) topic.getLastPosition(); + return PositionImpl.LATEST; } @Override
