This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 77b6378ae8b [improve][broker] Optimize the performance of individual
acknowledgments (#23072)
77b6378ae8b is described below
commit 77b6378ae8b9ac83962f71063ad44d6ac57f8e32
Author: Baodi Shi <[email protected]>
AuthorDate: Mon Jul 29 22:02:29 2024 +0800
[improve][broker] Optimize the performance of individual acknowledgments
(#23072)
---
.../org/apache/pulsar/broker/service/Consumer.java | 151 ++++++++++-----------
1 file changed, 69 insertions(+), 82 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index dca64395d86..7f46e8969eb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -47,6 +47,7 @@ import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.AckSetStateUtil;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -531,14 +532,16 @@ public class Consumer {
//this method is for individual ack not carry the transaction
private CompletableFuture<Long> individualAckNormal(CommandAck ack,
Map<String, Long> properties) {
- List<Position> positionsAcked = new ArrayList<>();
+ List<Pair<Consumer, Position>> positionsAcked = new ArrayList<>();
long totalAckCount = 0;
for (int i = 0; i < ack.getMessageIdsCount(); i++) {
MessageIdData msgId = ack.getMessageIdAt(i);
Position position;
- long ackedCount = 0;
- long batchSize = getBatchSize(msgId);
- Consumer ackOwnerConsumer =
getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
+ Pair<Consumer, Long> ackOwnerConsumerAndBatchSize =
+ getAckOwnerConsumerAndBatchSize(msgId.getLedgerId(),
msgId.getEntryId());
+ Consumer ackOwnerConsumer = ackOwnerConsumerAndBatchSize.getLeft();
+ long ackedCount;
+ long batchSize = ackOwnerConsumerAndBatchSize.getRight();
if (msgId.getAckSetsCount() > 0) {
long[] ackSets = new long[msgId.getAckSetsCount()];
for (int j = 0; j < msgId.getAckSetsCount(); j++) {
@@ -557,28 +560,32 @@ public class Consumer {
} else {
position = PositionFactory.create(msgId.getLedgerId(),
msgId.getEntryId());
ackedCount = getAckedCountForMsgIdNoAckSets(batchSize,
position, ackOwnerConsumer);
- if (checkCanRemovePendingAcksAndHandle(position, msgId)) {
+ if (checkCanRemovePendingAcksAndHandle(ackOwnerConsumer,
position, msgId)) {
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
}
}
- positionsAcked.add(position);
+ positionsAcked.add(Pair.of(ackOwnerConsumer, position));
checkAckValidationError(ack, position);
totalAckCount += ackedCount;
}
- subscription.acknowledgeMessage(positionsAcked, AckType.Individual,
properties);
+ subscription.acknowledgeMessage(positionsAcked.stream()
+ .map(Pair::getRight)
+ .collect(Collectors.toList()), AckType.Individual, properties);
CompletableFuture<Long> completableFuture = new CompletableFuture<>();
completableFuture.complete(totalAckCount);
if (isTransactionEnabled() &&
Subscription.isIndividualAckMode(subType)) {
- completableFuture.whenComplete((v, e) ->
positionsAcked.forEach(position -> {
+ completableFuture.whenComplete((v, e) ->
positionsAcked.forEach(positionPair -> {
+ Consumer ackOwnerConsumer = positionPair.getLeft();
+ Position position = positionPair.getRight();
//check if the position can remove from the consumer pending
acks.
// the bit set is empty in pending ack handle.
if (AckSetStateUtil.hasAckSet(position)) {
if (((PersistentSubscription) subscription)
.checkIsCanDeleteConsumerPendingAck(position)) {
- removePendingAcks(position);
+ removePendingAcks(ackOwnerConsumer, position);
}
}
}));
@@ -590,7 +597,7 @@ public class Consumer {
//this method is for individual ack carry the transaction
private CompletableFuture<Long> individualAckWithTransaction(CommandAck
ack) {
// Individual ack
- List<MutablePair<Position, Integer>> positionsAcked = new
ArrayList<>();
+ List<Pair<Consumer, MutablePair<Position, Integer>>> positionsAcked =
new ArrayList<>();
if (!isTransactionEnabled()) {
return FutureUtil.failedFuture(
new BrokerServiceException.NotAllowedException("Server
don't support transaction ack!"));
@@ -600,20 +607,23 @@ public class Consumer {
for (int i = 0; i < ack.getMessageIdsCount(); i++) {
MessageIdData msgId = ack.getMessageIdAt(i);
Position position =
AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(),
msgId.getEntryId(), null);
+ Consumer ackOwnerConsumer =
getAckOwnerConsumerAndBatchSize(msgId.getLedgerId(),
+ msgId.getEntryId()).getLeft();
// acked count at least one
- long ackedCount = 0;
- long batchSize = 0;
+ long ackedCount;
+ long batchSize;
if (msgId.hasBatchSize()) {
batchSize = msgId.getBatchSize();
// ack batch messages set ackeCount = batchSize
ackedCount = msgId.getBatchSize();
- positionsAcked.add(new MutablePair<>(position,
msgId.getBatchSize()));
+ positionsAcked.add(Pair.of(ackOwnerConsumer, new
MutablePair<>(position, msgId.getBatchSize())));
} else {
// ack no batch message set ackedCount = 1
+ batchSize = 0;
ackedCount = 1;
- positionsAcked.add(new MutablePair<>(position, (int)
batchSize));
+ positionsAcked.add(Pair.of(ackOwnerConsumer, new
MutablePair<>(position, (int) batchSize)));
}
- Consumer ackOwnerConsumer =
getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
+
if (msgId.getAckSetsCount() > 0) {
long[] ackSets = new long[msgId.getAckSetsCount()];
for (int j = 0; j < msgId.getAckSetsCount(); j++) {
@@ -625,7 +635,7 @@ public class Consumer {
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
- checkCanRemovePendingAcksAndHandle(position, msgId);
+ checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position,
msgId);
checkAckValidationError(ack, position);
@@ -633,14 +643,16 @@ public class Consumer {
}
CompletableFuture<Void> completableFuture =
transactionIndividualAcknowledge(ack.getTxnidMostBits(),
- ack.getTxnidLeastBits(), positionsAcked);
+ ack.getTxnidLeastBits(),
positionsAcked.stream().map(Pair::getRight).collect(Collectors.toList()));
if (Subscription.isIndividualAckMode(subType)) {
completableFuture.whenComplete((v, e) ->
- positionsAcked.forEach(positionLongMutablePair -> {
+ positionsAcked.forEach(positionPair -> {
+ Consumer ackOwnerConsumer = positionPair.getLeft();
+ MutablePair<Position, Integer> positionLongMutablePair
= positionPair.getRight();
if
(AckSetStateUtil.hasAckSet(positionLongMutablePair.getLeft())) {
if (((PersistentSubscription) subscription)
.checkIsCanDeleteConsumerPendingAck(positionLongMutablePair.left)) {
-
removePendingAcks(positionLongMutablePair.left);
+ removePendingAcks(ackOwnerConsumer,
positionLongMutablePair.left);
}
}
}));
@@ -648,24 +660,6 @@ public class Consumer {
return completableFuture.thenApply(__ -> totalAckCount.sum());
}
- private long getBatchSize(MessageIdData msgId) {
- long batchSize = 1;
- if (Subscription.isIndividualAckMode(subType)) {
- LongPair longPair = pendingAcks.get(msgId.getLedgerId(),
msgId.getEntryId());
- // Consumer may ack the msg that not belongs to it.
- if (longPair == null) {
- Consumer ackOwnerConsumer =
getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
- longPair =
ackOwnerConsumer.getPendingAcks().get(msgId.getLedgerId(), msgId.getEntryId());
- if (longPair != null) {
- batchSize = longPair.first;
- }
- } else {
- batchSize = longPair.first;
- }
- }
- return batchSize;
- }
-
private long getAckedCountForMsgIdNoAckSets(long batchSize, Position
position, Consumer consumer) {
if (isAcknowledgmentAtBatchIndexLevelEnabled &&
Subscription.isIndividualAckMode(subType)) {
long[] cursorAckSet = getCursorAckSet(position);
@@ -725,26 +719,39 @@ public class Consumer {
}
}
- private boolean checkCanRemovePendingAcksAndHandle(Position position,
MessageIdData msgId) {
+ private boolean checkCanRemovePendingAcksAndHandle(Consumer
ackOwnedConsumer,
+ Position position,
MessageIdData msgId) {
if (Subscription.isIndividualAckMode(subType) &&
msgId.getAckSetsCount() == 0) {
- return removePendingAcks(position);
+ return removePendingAcks(ackOwnedConsumer, position);
}
return false;
}
- private Consumer getAckOwnerConsumer(long ledgerId, long entryId) {
- Consumer ackOwnerConsumer = this;
+ /**
+ * Retrieves the acknowledgment owner consumer and batch size for the
specified ledgerId and entryId.
+ *
+ * @param ledgerId The ID of the ledger.
+ * @param entryId The ID of the entry.
+ * @return Pair<Consumer, BatchSize>
+ */
+ private Pair<Consumer, Long> getAckOwnerConsumerAndBatchSize(long
ledgerId, long entryId) {
if (Subscription.isIndividualAckMode(subType)) {
- if (!getPendingAcks().containsKey(ledgerId, entryId)) {
+ LongPair longPair = getPendingAcks().get(ledgerId, entryId);
+ if (longPair != null) {
+ return Pair.of(this, longPair.first);
+ } else {
+ // If there are more consumers, this step will consume more
CPU, and it should be optimized later.
for (Consumer consumer : subscription.getConsumers()) {
- if (consumer != this &&
consumer.getPendingAcks().containsKey(ledgerId, entryId)) {
- ackOwnerConsumer = consumer;
- break;
+ if (consumer != this) {
+ longPair = consumer.getPendingAcks().get(ledgerId,
entryId);
+ if (longPair != null) {
+ return Pair.of(consumer, longPair.first);
+ }
}
}
}
}
- return ackOwnerConsumer;
+ return Pair.of(this, 1L);
}
private long[] getCursorAckSet(Position position) {
@@ -1019,44 +1026,24 @@ public class Consumer {
*
* @param position
*/
- private boolean removePendingAcks(Position position) {
- Consumer ackOwnedConsumer = null;
- if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) ==
null) {
- for (Consumer consumer : subscription.getConsumers()) {
- if (!consumer.equals(this) &&
consumer.getPendingAcks().containsKey(position.getLedgerId(),
- position.getEntryId())) {
- ackOwnedConsumer = consumer;
- break;
- }
- }
- } else {
- ackOwnedConsumer = this;
+ private boolean removePendingAcks(Consumer ackOwnedConsumer, Position
position) {
+ if (!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(),
position.getEntryId())) {
+ // Message was already removed by the other consumer
+ return false;
}
-
- // remove pending message from appropriate consumer and unblock
unAckMsg-flow if requires
- LongPair ackedPosition = ackOwnedConsumer != null
- ?
ackOwnedConsumer.getPendingAcks().get(position.getLedgerId(),
position.getEntryId())
- : null;
- if (ackedPosition != null) {
- if
(!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(),
position.getEntryId())) {
- // Message was already removed by the other consumer
- return false;
- }
- if (log.isDebugEnabled()) {
- log.debug("[{}-{}] consumer {} received ack {}", topicName,
subscription, consumerId, position);
- }
- // unblock consumer-throttling when limit check is disabled or
receives half of maxUnackedMessages =>
- // consumer can start again consuming messages
- int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
- if ((((unAckedMsgs <= getMaxUnackedMessages() / 2) &&
ackOwnedConsumer.blockedConsumerOnUnackedMsgs)
- && ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs())
- || !shouldBlockConsumerOnUnackMsgs()) {
- ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
- flowConsumerBlockedPermits(ackOwnedConsumer);
- }
- return true;
+ if (log.isDebugEnabled()) {
+ log.debug("[{}-{}] consumer {} received ack {}", topicName,
subscription, consumerId, position);
}
- return false;
+ // unblock consumer-throttling when limit check is disabled or
receives half of maxUnackedMessages =>
+ // consumer can start again consuming messages
+ int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
+ if ((((unAckedMsgs <= getMaxUnackedMessages() / 2) &&
ackOwnedConsumer.blockedConsumerOnUnackedMsgs)
+ && ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs())
+ || !shouldBlockConsumerOnUnackMsgs()) {
+ ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
+ flowConsumerBlockedPermits(ackOwnedConsumer);
+ }
+ return true;
}
public ConcurrentLongLongPairHashMap getPendingAcks() {