315157973 commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1058425020


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java:
##########
@@ -105,23 +105,11 @@ public byte[] toByteArray() {
         return toByteArray(batchIndex, batchSize);
     }
 
-    public boolean ackIndividual() {
-        return acker.ackIndividual(batchIndex);
-    }
-
     public boolean ackCumulative() {
         return acker.ackCumulative(batchIndex);
     }
 
-    public int getOutstandingAcksInSameBatch() {
-        return acker.getOutstandingAcks();
-    }
-
     public int getBatchSize() {
-        return acker.getBatchSize();
-    }
-

Review Comment:
   These are the APIs exposed by the client, can they be deleted directly 
without any compatibility?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -529,6 +534,52 @@ protected CompletableFuture<Messages<T>> 
internalBatchReceiveAsync() {
         return result;
     }
 
+    private void processMessageIdBeforeAcknowledge(MessageIdImpl messageId, 
AckType ackType, int numMessages) {
+        if (ackType == AckType.Individual) {
+            stats.incrementNumAcksSent(numMessages);
+            unAckedMessageTracker.remove(messageId);
+            if (possibleSendToDeadLetterTopicMessages != null) {
+                possibleSendToDeadLetterTopicMessages.remove(messageId);
+            }
+        } else {
+            
stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(messageId));
+        }
+    }
+
+    @Nullable
+    private MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl 
messageId, AckType ackType) {
+        final BatchMessageAcker acker;
+        if (messageId.getAcker() instanceof BatchMessageAckerDisabled) {
+            acker = batchMessageToAcker.computeIfAbsent(
+                    Pair.of(messageId.getLedgerId(), messageId.getEntryId()),
+                    __ -> 
BatchMessageAcker.newAcker(messageId.getBatchSize()));
+        } else {
+            acker = messageId.getAcker();
+        }
+        if (ackType == AckType.Individual) {
+            if (acker.ackIndividual(messageId.getBatchIndex())) {
+                batchMessageToAcker.remove(Pair.of(messageId.getLedgerId(), 
messageId.getEntryId()));
+                return messageId.toMessageIdImpl();
+            } else {
+                return conf.isBatchIndexAckEnabled() ? messageId : null;
+            }
+        } else {
+            if (acker.ackCumulative(messageId.getBatchIndex())) {
+                batchMessageToAcker.remove(Pair.of(messageId.getLedgerId(), 
messageId.getEntryId()));

Review Comment:
   Shouldn't everything before this msgId be removed?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -129,121 +124,53 @@ public boolean isDuplicate(MessageId messageId) {
     }
 
     @Override
-    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> 
messageIds,
-                                                         AckType ackType, 
Map<String, Long> properties) {
-        if (AckType.Cumulative.equals(ackType)) {
-            if (consumer.isAckReceiptEnabled()) {
-                Set<CompletableFuture<Void>> completableFutureSet = new 
HashSet<>();
-                messageIds.forEach(messageId ->
-                        
completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, 
properties)));
-                return FutureUtil.waitForAll(new 
ArrayList<>(completableFutureSet));
+    public CompletableFuture<Void> addListAcknowledgment(List<MessageIdImpl> 
messageIds,

Review Comment:
   A public API is directly modified, and it is incompatible. If a user 
encapsulates this API and passes in other MessageId implementation classes, the 
old interface will be unavailable



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -549,13 +600,33 @@ protected CompletableFuture<Void> doAcknowledge(MessageId 
messageId, AckType ack
             return doTransactionAcknowledgeForResponse(messageId, ackType, 
null, properties,
                     new TxnID(txn.getTxnIdMostBits(), 
txn.getTxnIdLeastBits()));
         }
-        return 
acknowledgmentsGroupingTracker.addAcknowledgment((MessageIdImpl) messageId, 
ackType, properties);
+        if (ackType == AckType.Individual) {
+            onAcknowledge(messageId, null);
+        } else {
+            onAcknowledgeCumulative(messageId, null);
+        }
+        if (messageId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+            MessageIdImpl messageIdImpl = 
getMessageIdToAcknowledge(batchMessageId, ackType);

Review Comment:
   Would it be better to change the name of this method to AcknowledgeAndGet? 
Just like Atomic's IncreaseAndGet



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to