poorbarcode commented on code in PR #21928:
URL: https://github.com/apache/pulsar/pull/21928#discussion_r1461095231


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java:
##########
@@ -4692,4 +4696,57 @@ public void flush(ChannelHandlerContext ctx) throws 
Exception {
         consumer.close();
         admin.topics().delete(topic, false);
     }
+
+    @DataProvider(name = "ackArgs")
+    public Object[] ackArgs() {
+        int batchSize = 10;
+        BitSet bitSet = new BitSet(batchSize);
+        bitSet.set(0);
+        return new Object[][]{
+            // no batch.
+            {CommandAck.AckType.Cumulative, new MessageIdImpl(1,1,1)},
+            {CommandAck.AckType.Individual, new MessageIdImpl(1,1,1)},
+            // batch without ackSet.
+            {CommandAck.AckType.Individual, new BatchMessageIdImpl(1,1,1,0)},
+            {CommandAck.AckType.Cumulative, new BatchMessageIdImpl(1,1,1,0)},
+            // batch with ackSe.

Review Comment:
   Fixed



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java:
##########
@@ -4692,4 +4696,57 @@ public void flush(ChannelHandlerContext ctx) throws 
Exception {
         consumer.close();
         admin.topics().delete(topic, false);
     }
+
+    @DataProvider(name = "ackArgs")
+    public Object[] ackArgs() {
+        int batchSize = 10;
+        BitSet bitSet = new BitSet(batchSize);
+        bitSet.set(0);
+        return new Object[][]{
+            // no batch.
+            {CommandAck.AckType.Cumulative, new MessageIdImpl(1,1,1)},
+            {CommandAck.AckType.Individual, new MessageIdImpl(1,1,1)},
+            // batch without ackSet.
+            {CommandAck.AckType.Individual, new BatchMessageIdImpl(1,1,1,0)},
+            {CommandAck.AckType.Cumulative, new BatchMessageIdImpl(1,1,1,0)},
+            // batch with ackSe.
+            {CommandAck.AckType.Cumulative, new BatchMessageIdImpl(1,1,1,0, 
batchSize, bitSet)},
+            {CommandAck.AckType.Individual, new BatchMessageIdImpl(1,1,1,0, 
batchSize, bitSet)}
+        };
+    }
+
+    @Test(dataProvider = "ackArgs")
+    public void testImmediateAckWhenReconnecting(CommandAck.AckType ackType, 
MessageId messageId) throws Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+        final String subscriptionName = "s1";
+        PulsarClient delayConnectClient = createDelayReconnectClient();

Review Comment:
   Fixed



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -871,6 +871,7 @@ public CompletableFuture<Void> connectionOpened(final 
ClientCnx cnx) {
                 if (!(firstTimeConnect && hasParentConsumer) && 
getCurrentReceiverQueueSize() != 0) {
                     increaseAvailablePermits(cnx, 
getCurrentReceiverQueueSize());
                 }
+                acknowledgmentsGroupingTracker.afterConsumerReconnected();

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to