gaoran10 commented on code in PR #17016:
URL: https://github.com/apache/pulsar/pull/17016#discussion_r943086415


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java:
##########
@@ -523,25 +523,31 @@ private CompletableFuture<Long> 
individualAckWithTransaction(CommandAck ack) {
         LongAdder totalAckCount = new LongAdder();
         for (int i = 0; i < ack.getMessageIdsCount(); i++) {
             MessageIdData msgId = ack.getMessageIdAt(i);
-            PositionImpl position;
+            PositionImpl position = PositionImpl.get(msgId.getLedgerId(), 
msgId.getEntryId());
+            // acked count at least one
             long ackedCount = 0;
-            long batchSize = getBatchSize(msgId);
+            long batchSize = 0;

Review Comment:
   The default value may be 1.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java:
##########
@@ -523,25 +523,31 @@ private CompletableFuture<Long> 
individualAckWithTransaction(CommandAck ack) {
         LongAdder totalAckCount = new LongAdder();
         for (int i = 0; i < ack.getMessageIdsCount(); i++) {
             MessageIdData msgId = ack.getMessageIdAt(i);
-            PositionImpl position;
+            PositionImpl position = PositionImpl.get(msgId.getLedgerId(), 
msgId.getEntryId());
+            // acked count at least one
             long ackedCount = 0;
-            long batchSize = getBatchSize(msgId);
+            long batchSize = 0;
+            if (msgId.hasBatchSize()) {
+                batchSize = msgId.getBatchSize();
+                // ack batch messages set ackeCount = 1, if has ackSets will 
recompute

Review Comment:
   ```suggestion
                   // ack batch messages set ackeCount = batchSize, if has 
ackSets will recompute
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java:
##########
@@ -523,25 +523,31 @@ private CompletableFuture<Long> 
individualAckWithTransaction(CommandAck ack) {
         LongAdder totalAckCount = new LongAdder();
         for (int i = 0; i < ack.getMessageIdsCount(); i++) {
             MessageIdData msgId = ack.getMessageIdAt(i);
-            PositionImpl position;
+            PositionImpl position = PositionImpl.get(msgId.getLedgerId(), 
msgId.getEntryId());
+            // acked count at least one
             long ackedCount = 0;
-            long batchSize = getBatchSize(msgId);
+            long batchSize = 0;
+            if (msgId.hasBatchSize()) {
+                batchSize = msgId.getBatchSize();
+                // ack batch messages set ackeCount = 1, if has ackSets will 
recompute
+                ackedCount = msgId.getBatchSize();
+                positionsAcked.add(new MutablePair<>(position, 
msgId.getBatchSize()));
+            } else {
+                // ack no batch message set ackedCount = 1
+                ackedCount = 1;
+                positionsAcked.add(new MutablePair<>(position, (int) 
batchSize));
+            }
             Consumer ackOwnerConsumer = 
getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
             if (msgId.getAckSetsCount() > 0) {
                 long[] ackSets = new long[msgId.getAckSetsCount()];
                 for (int j = 0; j < msgId.getAckSetsCount(); j++) {
                     ackSets[j] = msgId.getAckSetAt(j);
                 }
                 position = PositionImpl.get(msgId.getLedgerId(), 
msgId.getEntryId(), ackSets);
+                // recompute ackedCount
                 ackedCount = getAckedCountForTransactionAck(batchSize, 
ackSets);
             } else {
                 position = PositionImpl.get(msgId.getLedgerId(), 
msgId.getEntryId());

Review Comment:
   Maybe we can remove this line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to