BewareMyPower commented on code in PR #21928:
URL: https://github.com/apache/pulsar/pull/21928#discussion_r1461256290


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -231,29 +231,31 @@ private CompletableFuture<Void> 
addAcknowledgment(MessageIdAdv msgId,
             case Individual:
                 return addIndividualAcknowledgment(msgId,
                         batchMessageId,
-                        __ -> doIndividualAck(__, properties),
-                        __ -> doIndividualBatchAck(__, properties));
+                        __ -> doIndividualAck(__, properties, false),
+                        __ -> doIndividualBatchAck(__, properties, false));
             case Cumulative:
                 if (batchMessageId != null) {
                     consumer.onAcknowledgeCumulative(batchMessageId, null);
                 } else {
                     consumer.onAcknowledgeCumulative(msgId, null);
                 }
                 if (batchMessageId == null || 
MessageIdAdvUtils.acknowledge(batchMessageId, false)) {
-                    return doCumulativeAck(msgId, properties, null);
+                    return doCumulativeAck(msgId, properties, null, false);
                 } else if (batchIndexAckEnabled) {
-                    return doCumulativeBatchIndexAck(batchMessageId, 
properties);
+                    return doCumulativeBatchIndexAck(batchMessageId, 
properties, false);
                 } else {
-                    
doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties, 
null);
+                    
doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties, 
null, false);
                     return CompletableFuture.completedFuture(null);
                 }
             default:
                 throw new IllegalStateException("Unknown AckType: " + ackType);
         }
     }
 
-    private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId, 
Map<String, Long> properties) {
-        if (acknowledgementGroupTimeMicros == 0 || (properties != null && 
!properties.isEmpty())) {
+    private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId, 
Map<String, Long> properties,
+                                                    boolean 
queueDueToConnecting) {
+        if (!queueDueToConnecting
+                && (acknowledgementGroupTimeMicros == 0 || (properties != null 
&& !properties.isEmpty()))) {

Review Comment:
   > split PersistentAcknowledgmentsGroupingTracker into two implementations
   
   It's reasonable. BTW, the design of the C++ client is better that:
   - 
[AckGroupingEnabled](https://github.com/apache/pulsar-client-cpp/blob/main/lib/AckGroupingTrackerEnabled.cc):
 cache the acknowledgments and flush them due to the timeout or count
   - 
[AckGrouingDisabled](https://github.com/apache/pulsar-client-cpp/blob/main/lib/AckGroupingTrackerDisabled.cc):
 acknowledge immediately
   
   
https://github.com/apache/pulsar-client-cpp/blob/72b7311aeef32e28a28e926da686aaf948e8f948/lib/ConsumerImpl.cc#L201C1-L215C6
   
   Though the naming is not good (but consistent with other impl classes in the 
library)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to