poorbarcode commented on code in PR #21928:
URL: https://github.com/apache/pulsar/pull/21928#discussion_r1461095814


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -231,29 +231,31 @@ private CompletableFuture<Void> 
addAcknowledgment(MessageIdAdv msgId,
             case Individual:
                 return addIndividualAcknowledgment(msgId,
                         batchMessageId,
-                        __ -> doIndividualAck(__, properties),
-                        __ -> doIndividualBatchAck(__, properties));
+                        __ -> doIndividualAck(__, properties, false),
+                        __ -> doIndividualBatchAck(__, properties, false));
             case Cumulative:
                 if (batchMessageId != null) {
                     consumer.onAcknowledgeCumulative(batchMessageId, null);
                 } else {
                     consumer.onAcknowledgeCumulative(msgId, null);
                 }
                 if (batchMessageId == null || 
MessageIdAdvUtils.acknowledge(batchMessageId, false)) {
-                    return doCumulativeAck(msgId, properties, null);
+                    return doCumulativeAck(msgId, properties, null, false);
                 } else if (batchIndexAckEnabled) {
-                    return doCumulativeBatchIndexAck(batchMessageId, 
properties);
+                    return doCumulativeBatchIndexAck(batchMessageId, 
properties, false);
                 } else {
-                    
doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties, 
null);
+                    
doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties, 
null, false);
                     return CompletableFuture.completedFuture(null);
                 }
             default:
                 throw new IllegalStateException("Unknown AckType: " + ackType);
         }
     }
 
-    private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId, 
Map<String, Long> properties) {
-        if (acknowledgementGroupTimeMicros == 0 || (properties != null && 
!properties.isEmpty())) {
+    private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId, 
Map<String, Long> properties,
+                                                    boolean 
queueDueToConnecting) {
+        if (!queueDueToConnecting
+                && (acknowledgementGroupTimeMicros == 0 || (properties != null 
&& !properties.isEmpty()))) {

Review Comment:
   Marked current PR as `Draft`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to