gaoran10 commented on a change in pull request #13383:
URL: https://github.com/apache/pulsar/pull/13383#discussion_r777548420



##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
##########
@@ -673,7 +673,7 @@ public void 
testBatchAndNonBatchCumulativeAcks(BatcherBuilder builder) throws Ex
      *
      * @throws Exception
      */
-    @Test(dataProvider = "containerBuilder", timeOut = 3000)

Review comment:
       Do we need to delete the test timeout?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -776,6 +863,12 @@ public void 
redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {
             LongPair longPair = pendingAcks.get(position.getLedgerId(), 
position.getEntryId());
             if (longPair != null) {
                 long batchSize = longPair.first;
+                if (isDeletionAtBatchIndexLevelEnabled()) {
+                    long[] cursorAckSet = getCursorAckSet(position);
+                    if (cursorAckSet != null) {
+                        batchSize -= 
BitSet.valueOf(cursorAckSet).cardinality();

Review comment:
       The `cardinality` of the `cursorAckSet` means un-acked message count, 
right?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -476,6 +499,46 @@ public void doUnsubscribe(final long requestId) {
         return completableFuture;
     }
 
+    private long getBatchSize(MessageIdData msgId) {
+        long batchSize = 1;
+        if (Subscription.isIndividualAckMode(subType)) {
+            LongPair longPair = pendingAcks.get(msgId.getLedgerId(), 
msgId.getEntryId());
+            // Consumer may ack the msg that not belongs to it.
+            if (longPair == null) {
+                Consumer ackOwnerConsumer = 
getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
+                longPair = 
ackOwnerConsumer.getPendingAcks().get(msgId.getLedgerId(), msgId.getEntryId());
+                if (longPair != null) {
+                    batchSize = longPair.first;
+                }
+            } else {
+                batchSize = longPair.first;
+            }
+        }
+        return batchSize;
+    }
+
+    private long getAckedCount(PositionImpl position, long batchSize, long[] 
ackSets) {
+        long ackedCount;
+        if (isDeletionAtBatchIndexLevelEnabled()) {
+            long[] cursorAckSet = getCursorAckSet(position);
+            if (cursorAckSet != null) {
+                BitSetRecyclable cursorBitSet = 
BitSetRecyclable.create().resetWords(cursorAckSet);
+                int lastCardinality = cursorBitSet.cardinality();
+                BitSetRecyclable givenBitSet = 
BitSetRecyclable.create().resetWords(ackSets);
+                cursorBitSet.and(givenBitSet);
+                givenBitSet.recycle();
+                int currentCardinality = cursorBitSet.cardinality();
+                ackedCount = lastCardinality - currentCardinality;

Review comment:
       The `lastCardinality` is the last un-ack message count and the 
`currentCardinality` is the current un-ack message count, right? Could we use 
the meaning of the variable as the name? Maybe it's easier to understand. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to