lhotari commented on code in PR #17833:
URL: https://github.com/apache/pulsar/pull/17833#discussion_r979686788


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -183,67 +187,57 @@ public CompletableFuture<Void> 
addAcknowledgment(MessageIdImpl msgId, AckType ac
                                                      Map<String, Long> 
properties) {
         if (msgId instanceof BatchMessageIdImpl) {
             BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
-            if (ackType == AckType.Individual) {
-                consumer.onAcknowledge(msgId, null);
-                // ack this ack carry bitSet index and judge bit set are all 
ack
-                if (batchMessageId.ackIndividual()) {
-                    MessageIdImpl messageId = 
modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
-                    return doIndividualAck(messageId, properties);
-                } else if (batchIndexAckEnabled){
-                    return doIndividualBatchAck(batchMessageId, properties);
-                } else {
-                    // if we prevent batchIndexAck, we can't send the ack 
command to broker when the batch message are
-                    // all ack complete
-                    return CompletableFuture.completedFuture(null);
-                }
-            } else {
-                consumer.onAcknowledgeCumulative(msgId, null);
-                if (batchMessageId.ackCumulative()) {
-                    return doCumulativeAck(msgId, properties, null);
-                } else {
-                    if (batchIndexAckEnabled) {
-                        return doCumulativeBatchIndexAck(batchMessageId, 
properties);
-                    } else {
-                        // ack the pre messageId, because we prevent the 
batchIndexAck, we can ensure pre messageId can
-                        // ack
-                        if (AckType.Cumulative == ackType
-                                && 
!batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
-                            
doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
-                            
batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
-                        }
-                        return CompletableFuture.completedFuture(null);
-                    }
-                }
-            }
+            return addAcknowledgment(batchMessageId.toMessageIdImpl(), 
ackType, properties, batchMessageId);
         } else {
-            if (ackType == AckType.Individual) {
-                consumer.onAcknowledge(msgId, null);
-                modifyMessageIdStatesInConsumer(msgId);
-                return doIndividualAck(msgId, properties);
-            } else {
-                consumer.onAcknowledgeCumulative(msgId, null);
-                return doCumulativeAck(msgId, properties, null);
-            }
+            return addAcknowledgment(msgId, ackType, properties, null);
         }
     }
 
-    private MessageIdImpl 
modifyBatchMessageIdAndStatesInConsumer(BatchMessageIdImpl batchMessageId) {
-        MessageIdImpl messageId = new 
MessageIdImpl(batchMessageId.getLedgerId(),
-                batchMessageId.getEntryId(), 
batchMessageId.getPartitionIndex());
-        
consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
-        clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
-        return messageId;
-    }
-
-    private void modifyMessageIdStatesInConsumer(MessageIdImpl messageId) {
-        consumer.getStats().incrementNumAcksSent(1);
-        clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
+    private CompletableFuture<Void> addIndividualAcknowledgment(
+            MessageIdImpl msgId,
+            @Nullable BatchMessageIdImpl batchMessageId,
+            Function<MessageIdImpl, CompletableFuture<Void>> 
individualAckFunction,
+            Function<BatchMessageIdImpl, CompletableFuture<Void>> 
batchAckFunction) {
+        consumer.onAcknowledge(msgId, null);

Review Comment:
   there's a change in behavior in calling `onAcknowledge`. Previous, the 
original message id instance got passed. Now some information is potentially 
lost in the case of a batch message id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to