This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 77b6378ae8b [improve][broker] Optimize the performance of individual 
acknowledgments (#23072)
77b6378ae8b is described below

commit 77b6378ae8b9ac83962f71063ad44d6ac57f8e32
Author: Baodi Shi <[email protected]>
AuthorDate: Mon Jul 29 22:02:29 2024 +0800

    [improve][broker] Optimize the performance of individual acknowledgments 
(#23072)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 151 ++++++++++-----------
 1 file changed, 69 insertions(+), 82 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index dca64395d86..7f46e8969eb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -47,6 +47,7 @@ import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.bookkeeper.mledger.impl.AckSetStateUtil;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -531,14 +532,16 @@ public class Consumer {
 
     //this method is for individual ack not carry the transaction
     private CompletableFuture<Long> individualAckNormal(CommandAck ack, 
Map<String, Long> properties) {
-        List<Position> positionsAcked = new ArrayList<>();
+        List<Pair<Consumer, Position>> positionsAcked = new ArrayList<>();
         long totalAckCount = 0;
         for (int i = 0; i < ack.getMessageIdsCount(); i++) {
             MessageIdData msgId = ack.getMessageIdAt(i);
             Position position;
-            long ackedCount = 0;
-            long batchSize = getBatchSize(msgId);
-            Consumer ackOwnerConsumer = 
getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
+            Pair<Consumer, Long> ackOwnerConsumerAndBatchSize =
+                    getAckOwnerConsumerAndBatchSize(msgId.getLedgerId(), 
msgId.getEntryId());
+            Consumer ackOwnerConsumer = ackOwnerConsumerAndBatchSize.getLeft();
+            long ackedCount;
+            long batchSize = ackOwnerConsumerAndBatchSize.getRight();
             if (msgId.getAckSetsCount() > 0) {
                 long[] ackSets = new long[msgId.getAckSetsCount()];
                 for (int j = 0; j < msgId.getAckSetsCount(); j++) {
@@ -557,28 +560,32 @@ public class Consumer {
             } else {
                 position = PositionFactory.create(msgId.getLedgerId(), 
msgId.getEntryId());
                 ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, 
position, ackOwnerConsumer);
-                if (checkCanRemovePendingAcksAndHandle(position, msgId)) {
+                if (checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, 
position, msgId)) {
                     addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
                 }
             }
 
-            positionsAcked.add(position);
+            positionsAcked.add(Pair.of(ackOwnerConsumer, position));
 
             checkAckValidationError(ack, position);
 
             totalAckCount += ackedCount;
         }
-        subscription.acknowledgeMessage(positionsAcked, AckType.Individual, 
properties);
+        subscription.acknowledgeMessage(positionsAcked.stream()
+                .map(Pair::getRight)
+                .collect(Collectors.toList()), AckType.Individual, properties);
         CompletableFuture<Long> completableFuture = new CompletableFuture<>();
         completableFuture.complete(totalAckCount);
         if (isTransactionEnabled() && 
Subscription.isIndividualAckMode(subType)) {
-            completableFuture.whenComplete((v, e) -> 
positionsAcked.forEach(position -> {
+            completableFuture.whenComplete((v, e) -> 
positionsAcked.forEach(positionPair -> {
+                Consumer ackOwnerConsumer = positionPair.getLeft();
+                Position position = positionPair.getRight();
                 //check if the position can remove from the consumer pending 
acks.
                 // the bit set is empty in pending ack handle.
                 if (AckSetStateUtil.hasAckSet(position)) {
                     if (((PersistentSubscription) subscription)
                             .checkIsCanDeleteConsumerPendingAck(position)) {
-                        removePendingAcks(position);
+                        removePendingAcks(ackOwnerConsumer, position);
                     }
                 }
             }));
@@ -590,7 +597,7 @@ public class Consumer {
     //this method is for individual ack carry the transaction
     private CompletableFuture<Long> individualAckWithTransaction(CommandAck 
ack) {
         // Individual ack
-        List<MutablePair<Position, Integer>> positionsAcked = new 
ArrayList<>();
+        List<Pair<Consumer, MutablePair<Position, Integer>>> positionsAcked = 
new ArrayList<>();
         if (!isTransactionEnabled()) {
             return FutureUtil.failedFuture(
                     new BrokerServiceException.NotAllowedException("Server 
don't support transaction ack!"));
@@ -600,20 +607,23 @@ public class Consumer {
         for (int i = 0; i < ack.getMessageIdsCount(); i++) {
             MessageIdData msgId = ack.getMessageIdAt(i);
             Position position = 
AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(), 
msgId.getEntryId(), null);
+            Consumer ackOwnerConsumer = 
getAckOwnerConsumerAndBatchSize(msgId.getLedgerId(),
+                    msgId.getEntryId()).getLeft();
             // acked count at least one
-            long ackedCount = 0;
-            long batchSize = 0;
+            long ackedCount;
+            long batchSize;
             if (msgId.hasBatchSize()) {
                 batchSize = msgId.getBatchSize();
                 // ack batch messages set ackeCount = batchSize
                 ackedCount = msgId.getBatchSize();
-                positionsAcked.add(new MutablePair<>(position, 
msgId.getBatchSize()));
+                positionsAcked.add(Pair.of(ackOwnerConsumer, new 
MutablePair<>(position, msgId.getBatchSize())));
             } else {
                 // ack no batch message set ackedCount = 1
+                batchSize = 0;
                 ackedCount = 1;
-                positionsAcked.add(new MutablePair<>(position, (int) 
batchSize));
+                positionsAcked.add(Pair.of(ackOwnerConsumer, new 
MutablePair<>(position, (int) batchSize)));
             }
-            Consumer ackOwnerConsumer = 
getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
+
             if (msgId.getAckSetsCount() > 0) {
                 long[] ackSets = new long[msgId.getAckSetsCount()];
                 for (int j = 0; j < msgId.getAckSetsCount(); j++) {
@@ -625,7 +635,7 @@ public class Consumer {
 
             addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
 
-            checkCanRemovePendingAcksAndHandle(position, msgId);
+            checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position, 
msgId);
 
             checkAckValidationError(ack, position);
 
@@ -633,14 +643,16 @@ public class Consumer {
         }
 
         CompletableFuture<Void> completableFuture = 
transactionIndividualAcknowledge(ack.getTxnidMostBits(),
-                ack.getTxnidLeastBits(), positionsAcked);
+                ack.getTxnidLeastBits(), 
positionsAcked.stream().map(Pair::getRight).collect(Collectors.toList()));
         if (Subscription.isIndividualAckMode(subType)) {
             completableFuture.whenComplete((v, e) ->
-                    positionsAcked.forEach(positionLongMutablePair -> {
+                    positionsAcked.forEach(positionPair -> {
+                        Consumer ackOwnerConsumer = positionPair.getLeft();
+                        MutablePair<Position, Integer> positionLongMutablePair 
= positionPair.getRight();
                         if 
(AckSetStateUtil.hasAckSet(positionLongMutablePair.getLeft())) {
                             if (((PersistentSubscription) subscription)
                                     
.checkIsCanDeleteConsumerPendingAck(positionLongMutablePair.left)) {
-                                
removePendingAcks(positionLongMutablePair.left);
+                                removePendingAcks(ackOwnerConsumer, 
positionLongMutablePair.left);
                             }
                         }
                     }));
@@ -648,24 +660,6 @@ public class Consumer {
         return completableFuture.thenApply(__ -> totalAckCount.sum());
     }
 
-    private long getBatchSize(MessageIdData msgId) {
-        long batchSize = 1;
-        if (Subscription.isIndividualAckMode(subType)) {
-            LongPair longPair = pendingAcks.get(msgId.getLedgerId(), 
msgId.getEntryId());
-            // Consumer may ack the msg that not belongs to it.
-            if (longPair == null) {
-                Consumer ackOwnerConsumer = 
getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
-                longPair = 
ackOwnerConsumer.getPendingAcks().get(msgId.getLedgerId(), msgId.getEntryId());
-                if (longPair != null) {
-                    batchSize = longPair.first;
-                }
-            } else {
-                batchSize = longPair.first;
-            }
-        }
-        return batchSize;
-    }
-
     private long getAckedCountForMsgIdNoAckSets(long batchSize, Position 
position, Consumer consumer) {
         if (isAcknowledgmentAtBatchIndexLevelEnabled && 
Subscription.isIndividualAckMode(subType)) {
             long[] cursorAckSet = getCursorAckSet(position);
@@ -725,26 +719,39 @@ public class Consumer {
         }
     }
 
-    private boolean checkCanRemovePendingAcksAndHandle(Position position, 
MessageIdData msgId) {
+    private boolean checkCanRemovePendingAcksAndHandle(Consumer 
ackOwnedConsumer,
+                                                       Position position, 
MessageIdData msgId) {
         if (Subscription.isIndividualAckMode(subType) && 
msgId.getAckSetsCount() == 0) {
-            return removePendingAcks(position);
+            return removePendingAcks(ackOwnedConsumer, position);
         }
         return false;
     }
 
-    private Consumer getAckOwnerConsumer(long ledgerId, long entryId) {
-        Consumer ackOwnerConsumer = this;
+    /**
+     * Retrieves the acknowledgment owner consumer and batch size for the 
specified ledgerId and entryId.
+     *
+     * @param ledgerId The ID of the ledger.
+     * @param entryId The ID of the entry.
+     * @return Pair<Consumer, BatchSize>
+     */
+    private Pair<Consumer, Long> getAckOwnerConsumerAndBatchSize(long 
ledgerId, long entryId) {
         if (Subscription.isIndividualAckMode(subType)) {
-            if (!getPendingAcks().containsKey(ledgerId, entryId)) {
+            LongPair longPair = getPendingAcks().get(ledgerId, entryId);
+            if (longPair != null) {
+                return Pair.of(this, longPair.first);
+            } else {
+                // If there are more consumers, this step will consume more 
CPU, and it should be optimized later.
                 for (Consumer consumer : subscription.getConsumers()) {
-                    if (consumer != this && 
consumer.getPendingAcks().containsKey(ledgerId, entryId)) {
-                        ackOwnerConsumer = consumer;
-                        break;
+                    if (consumer != this) {
+                        longPair = consumer.getPendingAcks().get(ledgerId, 
entryId);
+                        if (longPair != null) {
+                            return Pair.of(consumer, longPair.first);
+                        }
                     }
                 }
             }
         }
-        return ackOwnerConsumer;
+        return Pair.of(this, 1L);
     }
 
     private long[] getCursorAckSet(Position position) {
@@ -1019,44 +1026,24 @@ public class Consumer {
      *
      * @param position
      */
-    private boolean removePendingAcks(Position position) {
-        Consumer ackOwnedConsumer = null;
-        if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) == 
null) {
-            for (Consumer consumer : subscription.getConsumers()) {
-                if (!consumer.equals(this) && 
consumer.getPendingAcks().containsKey(position.getLedgerId(),
-                        position.getEntryId())) {
-                    ackOwnedConsumer = consumer;
-                    break;
-                }
-            }
-        } else {
-            ackOwnedConsumer = this;
+    private boolean removePendingAcks(Consumer ackOwnedConsumer, Position 
position) {
+        if (!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), 
position.getEntryId())) {
+            // Message was already removed by the other consumer
+            return false;
         }
-
-        // remove pending message from appropriate consumer and unblock 
unAckMsg-flow if requires
-        LongPair ackedPosition = ackOwnedConsumer != null
-                ? 
ackOwnedConsumer.getPendingAcks().get(position.getLedgerId(), 
position.getEntryId())
-                : null;
-        if (ackedPosition != null) {
-            if 
(!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), 
position.getEntryId())) {
-                // Message was already removed by the other consumer
-                return false;
-            }
-            if (log.isDebugEnabled()) {
-                log.debug("[{}-{}] consumer {} received ack {}", topicName, 
subscription, consumerId, position);
-            }
-            // unblock consumer-throttling when limit check is disabled or 
receives half of maxUnackedMessages =>
-            // consumer can start again consuming messages
-            int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
-            if ((((unAckedMsgs <= getMaxUnackedMessages() / 2) && 
ackOwnedConsumer.blockedConsumerOnUnackedMsgs)
-                    && ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs())
-                    || !shouldBlockConsumerOnUnackMsgs()) {
-                ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
-                flowConsumerBlockedPermits(ackOwnedConsumer);
-            }
-            return true;
+        if (log.isDebugEnabled()) {
+            log.debug("[{}-{}] consumer {} received ack {}", topicName, 
subscription, consumerId, position);
         }
-        return false;
+        // unblock consumer-throttling when limit check is disabled or 
receives half of maxUnackedMessages =>
+        // consumer can start again consuming messages
+        int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
+        if ((((unAckedMsgs <= getMaxUnackedMessages() / 2) && 
ackOwnedConsumer.blockedConsumerOnUnackedMsgs)
+                && ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs())
+                || !shouldBlockConsumerOnUnackMsgs()) {
+            ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
+            flowConsumerBlockedPermits(ackOwnedConsumer);
+        }
+        return true;
     }
 
     public ConcurrentLongLongPairHashMap getPendingAcks() {

Reply via email to