This is an automated email from the ASF dual-hosted git repository.

houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 97f707ba837 [improve][broker] Remove check ack.getMessageIdsCount() == 
1 for cumulative ack (#18333)
97f707ba837 is described below

commit 97f707ba837c15f3ab70d128c4b436a5d4c050b2
Author: houxiaoyu <[email protected]>
AuthorDate: Mon Nov 7 10:33:09 2022 +0800

    [improve][broker] Remove check ack.getMessageIdsCount() == 1 for cumulative 
ack (#18333)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 22 +++++++++++-----------
 1 file changed, 11 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index f43f16ef587..ac924725f7e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -425,19 +425,19 @@ public class Consumer {
                         subscription, consumerId);
                 return CompletableFuture.completedFuture(null);
             }
-            PositionImpl position = PositionImpl.EARLIEST;
-            if (ack.getMessageIdsCount() == 1) {
-                MessageIdData msgId = ack.getMessageIdAt(0);
-                if (msgId.getAckSetsCount() > 0) {
-                    long[] ackSets = new long[msgId.getAckSetsCount()];
-                    for (int j = 0; j < msgId.getAckSetsCount(); j++) {
-                        ackSets[j] = msgId.getAckSetAt(j);
-                    }
-                    position = PositionImpl.get(msgId.getLedgerId(), 
msgId.getEntryId(), ackSets);
-                } else {
-                    position = PositionImpl.get(msgId.getLedgerId(), 
msgId.getEntryId());
+
+            PositionImpl position;
+            MessageIdData msgId = ack.getMessageIdAt(0);
+            if (msgId.getAckSetsCount() > 0) {
+                long[] ackSets = new long[msgId.getAckSetsCount()];
+                for (int j = 0; j < msgId.getAckSetsCount(); j++) {
+                    ackSets[j] = msgId.getAckSetAt(j);
                 }
+                position = PositionImpl.get(msgId.getLedgerId(), 
msgId.getEntryId(), ackSets);
+            } else {
+                position = PositionImpl.get(msgId.getLedgerId(), 
msgId.getEntryId());
             }
+
             if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
                 List<PositionImpl> positionsAcked = 
Collections.singletonList(position);
                 future = 
transactionCumulativeAcknowledge(ack.getTxnidMostBits(),

Reply via email to