eolivelli commented on a change in pull request #13383:
URL: https://github.com/apache/pulsar/pull/13383#discussion_r776330462



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -233,7 +236,7 @@ public boolean readCompacted() {
             writePromise.setSuccess(null);
             return writePromise;
         }
-
+        int unackedMessage = totalMessages;

Review comment:
       Typo: change to unackedMessages (plural)

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -489,6 +552,31 @@ private void 
checkCanRemovePendingAcksAndHandle(PositionImpl position, MessageId
         }
     }
 
+    private Consumer getAckOwnerConsumer(long ledgerId, long entryId) {
+        Consumer ackOwnerConsumer = this;
+        if (Subscription.isIndividualAckMode(subType)) {
+            for (Consumer consumer : subscription.getConsumers()) {
+                if (!consumer.equals(this) && 
consumer.getPendingAcks().containsKey(ledgerId, entryId)) {

Review comment:
       Here it is better to use '==' for this comparison against 'this'

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -476,6 +499,46 @@ public void doUnsubscribe(final long requestId) {
         return completableFuture;
     }
 
+    private long getBatchSize(MessageIdData msgId) {
+        long batchSize = 1;
+        if (Subscription.isIndividualAckMode(subType)) {
+            LongPair longPair = pendingAcks.get(msgId.getLedgerId(), 
msgId.getEntryId());
+            // Consumer may ack the msg that not belongs to it.
+            if (longPair == null) {
+                Consumer ackOwnerConsumer = 
getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
+                longPair = 
ackOwnerConsumer.getPendingAcks().get(msgId.getLedgerId(), msgId.getEntryId());
+                if (longPair != null) {
+                    batchSize = longPair.first;
+                }
+            } else {
+                batchSize = longPair.first;
+            }
+        }
+        return batchSize;
+    }
+
+    private long getAckedCount(PositionImpl position, long batchSize, long[] 
ackSets) {
+        long ackedCount;
+        if (isDeletionAtBatchIndexLevelEnabled()) {
+            long[] cursorAckSet = getCursorAckSet(position);
+            if (cursorAckSet != null) {
+                BitSetRecyclable cursorBitSet = 
BitSetRecyclable.create().resetWords(cursorAckSet);
+                int lastCardinality = cursorBitSet.cardinality();
+                BitSetRecyclable givenBitSet = 
BitSetRecyclable.create().resetWords(ackSets);
+                cursorBitSet.and(givenBitSet);
+                givenBitSet.recycle();
+                int currentCardinality = cursorBitSet.cardinality();
+                ackedCount = Math.abs(currentCardinality - lastCardinality);

Review comment:
       Why do we need to use 'abs' here?
   
   If an underflow is possible then we are going to set a meaningless value

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
##########
@@ -66,6 +68,7 @@
     @BeforeClass
     @Override
     protected void setup() throws Exception {
+        conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);

Review comment:
       Here here two problems:
   - this line affects all the tests in this class, so you are changing 
existing tests
   prwe are not testing what happens when the value is 'false' (so some of your 
new branches won't be touched by this test)
   
   So probably it is better to run this class with both 'true' and 'false'

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -1815,13 +1815,13 @@ public void testUnackedBlockAtBatch(int 
batchMessageDelayMs, boolean ackReceiptE
                     messages.add(msg);
                     totalReceiveMessages++;
                     consumer1.acknowledgeAsync(msg);
-                    log.info("Received message: " + new String(msg.getData()));
                 } else {
                     break;
                 }
             }
             // verify total-consumer messages = total-produce messages
-            assertEquals(totalProducedMsgs, totalReceiveMessages);
+            final int finalTotalReceiveMessages = totalReceiveMessages;
+            Awaitility.await().untilAsserted(() -> 
assertEquals(totalProducedMsgs, finalTotalReceiveMessages));

Review comment:
       Using awaitility seems useless here as it looks like you are comparing 
two constants




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to