This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 266243cae24 [improve][broker] Optimize
PersistentTopic.getLastDispatchablePosition (#22707)
266243cae24 is described below
commit 266243cae246a6fa52b4b6c626932885ad44cbf4
Author: 道君 <[email protected]>
AuthorDate: Tue Jun 11 23:45:12 2024 +0800
[improve][broker] Optimize PersistentTopic.getLastDispatchablePosition
(#22707)
### Motivation
[PersistentTopic#getLastDispatchablePosition](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3776-L3788)
is using by
[Reader#hasMessageAvailable](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java#L116)
,
[ConsumerImpl#hasMessageAvailable](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L2440-L2448),
[Consumer#getLastMessageIdAsync](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L591-L615).
The current implementation is read entries from Bookkeeper(or sth else),
which leads to low throughput, high latency and heavy load, this PR is for the
purpose of optimization.
---
.../broker/service/persistent/PersistentTopic.java | 66 ++++++++++++++++++----
.../buffer/impl/InMemTransactionBuffer.java | 14 ++++-
.../buffer/impl/TopicTransactionBuffer.java | 11 ++++
.../buffer/impl/TransactionBufferDisable.java | 14 ++++-
4 files changed, 89 insertions(+), 16 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index eb15e31b49b..d78dac899b7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -317,6 +317,9 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
Long estimatedOldestUnacknowledgedMessageTimestamp;
}
+ // The last position that can be dispatched to consumers
+ private volatile Position lastDispatchablePosition;
+
/***
* We use 3 futures to prevent a new closing if there is an in-progress
deletion or closing. We make Pulsar return
* the in-progress one when it is called the second time.
@@ -3792,18 +3795,57 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@Override
public CompletableFuture<Position> getLastDispatchablePosition() {
- return
ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger,
entry -> {
- MessageMetadata md =
Commands.parseMessageMetadata(entry.getDataBuffer());
- // If a messages has marker will filter by
AbstractBaseDispatcher.filterEntriesForConsumer
- if (Markers.isServerOnlyMarker(md)) {
- return false;
- } else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
- // Filter-out transaction aborted messages.
- TxnID txnID = new TxnID(md.getTxnidMostBits(),
md.getTxnidLeastBits());
- return !isTxnAborted(txnID, (PositionImpl)
entry.getPosition());
- }
- return true;
- }, getMaxReadPosition());
+ if (lastDispatchablePosition != null) {
+ return CompletableFuture.completedFuture(lastDispatchablePosition);
+ }
+ return ManagedLedgerImplUtils
+ .asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry
-> {
+ MessageMetadata md =
Commands.parseMessageMetadata(entry.getDataBuffer());
+ // If a messages has marker will filter by
AbstractBaseDispatcher.filterEntriesForConsumer
+ if (Markers.isServerOnlyMarker(md)) {
+ return false;
+ } else if (md.hasTxnidMostBits() &&
md.hasTxnidLeastBits()) {
+ // Filter-out transaction aborted messages.
+ TxnID txnID = new TxnID(md.getTxnidMostBits(),
md.getTxnidLeastBits());
+ return !isTxnAborted(txnID, (PositionImpl)
entry.getPosition());
+ }
+ return true;
+ }, getMaxReadPosition())
+ .thenApply(position -> {
+ // Update lastDispatchablePosition to the given position
+ updateLastDispatchablePosition(position);
+ return position;
+ });
+ }
+
+ /**
+ * Update lastDispatchablePosition if the given position is greater than
the lastDispatchablePosition.
+ *
+ * @param position
+ */
+ public synchronized void updateLastDispatchablePosition(Position position)
{
+ // Update lastDispatchablePosition to null if the position is null,
fallback to
+ // ManagedLedgerImplUtils#asyncGetLastValidPosition
+ if (position == null) {
+ lastDispatchablePosition = null;
+ return;
+ }
+
+ PositionImpl position0 = (PositionImpl) position;
+ // If the position is greater than the maxReadPosition, ignore
+ if (position0.compareTo(getMaxReadPosition()) > 0) {
+ return;
+ }
+ // If the lastDispatchablePosition is null, set it to the position
+ if (lastDispatchablePosition == null) {
+ lastDispatchablePosition = position;
+ return;
+ }
+ // If the position is greater than the lastDispatchablePosition,
update it
+ PositionImpl lastDispatchablePosition0 = (PositionImpl)
lastDispatchablePosition;
+ if (position0.compareTo(lastDispatchablePosition0) > 0) {
+ lastDispatchablePosition = position;
+ }
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index bab7b64c608..533d0716d41 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -377,8 +377,11 @@ class InMemTransactionBuffer implements TransactionBuffer {
@Override
public void syncMaxReadPositionForNormalPublish(PositionImpl position,
boolean isMarkerMessage) {
- if (!isMarkerMessage && maxReadPositionCallBack != null) {
- maxReadPositionCallBack.maxReadPositionMovedForward(null,
position);
+ if (!isMarkerMessage) {
+ updateLastDispatchablePosition(position);
+ if (maxReadPositionCallBack != null) {
+ maxReadPositionCallBack.maxReadPositionMovedForward(null,
position);
+ }
}
}
@@ -436,4 +439,11 @@ class InMemTransactionBuffer implements TransactionBuffer {
.filter(txnBuffer ->
txnBuffer.status.equals(TxnStatus.COMMITTED))
.count();
}
+
+ // ThreadSafe
+ private void updateLastDispatchablePosition(Position position) {
+ if (topic instanceof PersistentTopic t) {
+ t.updateLastDispatchablePosition(position);
+ }
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index dfb73815e08..fbd4ddf7da0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -297,6 +297,11 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
}
}
+ // ThreadSafe
+ private void updateLastDispatchablePosition(Position position) {
+ topic.updateLastDispatchablePosition(position);
+ }
+
@Override
public CompletableFuture<TransactionBufferReader>
openTransactionBufferReader(TxnID txnID, long startSequenceId) {
return null;
@@ -459,6 +464,8 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
} else {
updateMaxReadPosition((PositionImpl)
topic.getManagedLedger().getLastConfirmedEntry(), false);
}
+ // Update the last dispatchable position to null if there is a TXN
finished.
+ updateLastDispatchablePosition(null);
}
/**
@@ -523,6 +530,10 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
}
}
}
+ // If the message is a normal message, update the last dispatchable
position.
+ if (!isMarkerMessage) {
+ updateLastDispatchablePosition(position);
+ }
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index ebd61dbaa82..6f5dc0cd4d0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -99,8 +99,11 @@ public class TransactionBufferDisable implements
TransactionBuffer {
@Override
public void syncMaxReadPositionForNormalPublish(PositionImpl position,
boolean isMarkerMessage) {
- if (!isMarkerMessage && maxReadPositionCallBack != null) {
- maxReadPositionCallBack.maxReadPositionMovedForward(null,
position);
+ if (!isMarkerMessage) {
+ updateLastDispatchablePosition(position);
+ if (maxReadPositionCallBack != null) {
+ maxReadPositionCallBack.maxReadPositionMovedForward(null,
position);
+ }
}
}
@@ -148,4 +151,11 @@ public class TransactionBufferDisable implements
TransactionBuffer {
public long getCommittedTxnCount() {
return 0;
}
+
+ // ThreadSafe
+ private void updateLastDispatchablePosition(Position position) {
+ if (topic instanceof PersistentTopic t) {
+ t.updateLastDispatchablePosition(position);
+ }
+ }
}