codelipenghui commented on code in PR #21928:
URL: https://github.com/apache/pulsar/pull/21928#discussion_r1458193299


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java:
##########
@@ -4692,4 +4696,57 @@ public void flush(ChannelHandlerContext ctx) throws 
Exception {
         consumer.close();
         admin.topics().delete(topic, false);
     }
+
+    @DataProvider(name = "ackArgs")
+    public Object[] ackArgs() {
+        int batchSize = 10;
+        BitSet bitSet = new BitSet(batchSize);
+        bitSet.set(0);
+        return new Object[][]{
+            // no batch.
+            {CommandAck.AckType.Cumulative, new MessageIdImpl(1,1,1)},
+            {CommandAck.AckType.Individual, new MessageIdImpl(1,1,1)},
+            // batch without ackSet.
+            {CommandAck.AckType.Individual, new BatchMessageIdImpl(1,1,1,0)},
+            {CommandAck.AckType.Cumulative, new BatchMessageIdImpl(1,1,1,0)},
+            // batch with ackSe.

Review Comment:
   ```suggestion
               // batch with ackSet.
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java:
##########
@@ -4692,4 +4696,57 @@ public void flush(ChannelHandlerContext ctx) throws 
Exception {
         consumer.close();
         admin.topics().delete(topic, false);
     }
+
+    @DataProvider(name = "ackArgs")
+    public Object[] ackArgs() {
+        int batchSize = 10;
+        BitSet bitSet = new BitSet(batchSize);
+        bitSet.set(0);
+        return new Object[][]{
+            // no batch.
+            {CommandAck.AckType.Cumulative, new MessageIdImpl(1,1,1)},
+            {CommandAck.AckType.Individual, new MessageIdImpl(1,1,1)},
+            // batch without ackSet.
+            {CommandAck.AckType.Individual, new BatchMessageIdImpl(1,1,1,0)},
+            {CommandAck.AckType.Cumulative, new BatchMessageIdImpl(1,1,1,0)},
+            // batch with ackSe.
+            {CommandAck.AckType.Cumulative, new BatchMessageIdImpl(1,1,1,0, 
batchSize, bitSet)},
+            {CommandAck.AckType.Individual, new BatchMessageIdImpl(1,1,1,0, 
batchSize, bitSet)}
+        };
+    }
+
+    @Test(dataProvider = "ackArgs")
+    public void testImmediateAckWhenReconnecting(CommandAck.AckType ackType, 
MessageId messageId) throws Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+        final String subscriptionName = "s1";
+        PulsarClient delayConnectClient = createDelayReconnectClient();

Review Comment:
   The client should be closed?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -231,29 +231,31 @@ private CompletableFuture<Void> 
addAcknowledgment(MessageIdAdv msgId,
             case Individual:
                 return addIndividualAcknowledgment(msgId,
                         batchMessageId,
-                        __ -> doIndividualAck(__, properties),
-                        __ -> doIndividualBatchAck(__, properties));
+                        __ -> doIndividualAck(__, properties, false),
+                        __ -> doIndividualBatchAck(__, properties, false));
             case Cumulative:
                 if (batchMessageId != null) {
                     consumer.onAcknowledgeCumulative(batchMessageId, null);
                 } else {
                     consumer.onAcknowledgeCumulative(msgId, null);
                 }
                 if (batchMessageId == null || 
MessageIdAdvUtils.acknowledge(batchMessageId, false)) {
-                    return doCumulativeAck(msgId, properties, null);
+                    return doCumulativeAck(msgId, properties, null, false);
                 } else if (batchIndexAckEnabled) {
-                    return doCumulativeBatchIndexAck(batchMessageId, 
properties);
+                    return doCumulativeBatchIndexAck(batchMessageId, 
properties, false);
                 } else {
-                    
doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties, 
null);
+                    
doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties, 
null, false);
                     return CompletableFuture.completedFuture(null);
                 }
             default:
                 throw new IllegalStateException("Unknown AckType: " + ackType);
         }
     }
 
-    private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId, 
Map<String, Long> properties) {
-        if (acknowledgementGroupTimeMicros == 0 || (properties != null && 
!properties.isEmpty())) {
+    private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId, 
Map<String, Long> properties,
+                                                    boolean 
queueDueToConnecting) {
+        if (!queueDueToConnecting
+                && (acknowledgementGroupTimeMicros == 0 || (properties != null 
&& !properties.isEmpty()))) {

Review Comment:
   Will it break the behavior of 
   
   ```
   // We cannot group acks if the delay is 0 or when there are properties 
attached to it. Fortunately that's an
   // uncommon condition since it's only used for the compaction subscription.
   ```
   If the consumer is reconnecting but the ack has properties. We will also 
group the acks which is not expected?
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -871,6 +871,7 @@ public CompletableFuture<Void> connectionOpened(final 
ClientCnx cnx) {
                 if (!(firstTimeConnect && hasParentConsumer) && 
getCurrentReceiverQueueSize() != 0) {
                     increaseAvailablePermits(cnx, 
getCurrentReceiverQueueSize());
                 }
+                acknowledgmentsGroupingTracker.afterConsumerReconnected();

Review Comment:
   Why not call `flush()` method directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to