This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 46d618aa305 [fix][broker] fix getMaxReadPosition in
TransactionBufferDisable should return latest (#24898)
46d618aa305 is described below
commit 46d618aa3055c0befd0295bfa6aabedeb0358f6b
Author: ken <[email protected]>
AuthorDate: Mon Nov 3 17:48:20 2025 +0800
[fix][broker] fix getMaxReadPosition in TransactionBufferDisable should
return latest (#24898)
Co-authored-by: fanjianye <[email protected]>
(cherry picked from commit b297f1f5f0332a892da742ca6e2ff87d2600296f)
---
.../apache/pulsar/broker/service/persistent/PersistentTopic.java | 8 +++++++-
.../broker/transaction/buffer/impl/TransactionBufferDisable.java | 2 +-
2 files changed, 8 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5b64e7deedc..7ae64c9655d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3670,6 +3670,12 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
if (lastDispatchablePosition != null) {
return CompletableFuture.completedFuture(lastDispatchablePosition);
}
+ PositionImpl lastPosition;
+ if (transactionBuffer instanceof TransactionBufferDisable) {
+ lastPosition = (PositionImpl) getLastPosition();
+ } else {
+ lastPosition = getMaxReadPosition();
+ }
return ManagedLedgerImplUtils
.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry
-> {
MessageMetadata md =
Commands.parseMessageMetadata(entry.getDataBuffer());
@@ -3682,7 +3688,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return !isTxnAborted(txnID, (PositionImpl)
entry.getPosition());
}
return true;
- }, getMaxReadPosition())
+ }, lastPosition)
.thenApply(position -> {
// Update lastDispatchablePosition to the given position
updateLastDispatchablePosition(position);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index 8fa308fccbb..72214b2cd84 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -107,7 +107,7 @@ public class TransactionBufferDisable implements
TransactionBuffer {
@Override
public PositionImpl getMaxReadPosition() {
- return (PositionImpl) topic.getLastPosition();
+ return PositionImpl.LATEST;
}
@Override