This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 93dac25  Ack response implementation  (#8996)
93dac25 is described below

commit 93dac25ee0f1577d56b1f7abb969bc98b760da02
Author: congbo <[email protected]>
AuthorDate: Thu Jan 21 09:22:50 2021 +0800

    Ack response implementation  (#8996)
    
    ## Motivation
    in order to handle ack response implementation. When this PR commit, I will 
handle #8997.
    
    ## implement
    1. we implement a new PersistentAcknowledgmentsWithResponseGroupingTracker.
    2. we will add two ackRequests struct for async and sync flush.
    3. add a timer to handle timeout and the timeout task don't need to lock, 
because the timeout is sequential.
    ### Verifying this change
    Add the tests for it
---
 .../pulsar/broker/service/ServerCnxTest.java       |   2 +-
 .../pulsar/client/api/ConsumerAckListTest.java     |  22 +-
 .../pulsar/client/api/ConsumerCleanupTest.java     |  12 +-
 .../pulsar/client/api/ConsumerRedeliveryTest.java  |  21 +-
 .../client/api/SimpleProducerConsumerTest.java     |  71 ++-
 .../impl/BatchMessageIndexAckDisableTest.java      |  16 +-
 .../client/impl/BatchMessageIndexAckTest.java      |  16 +-
 .../pulsar/client/impl/MessageChunkingTest.java    |  26 +-
 .../apache/pulsar/client/api/ConsumerBuilder.java  |   8 +
 .../impl/AcknowledgmentsGroupingTracker.java       |  12 +-
 .../pulsar/client/impl/BatchMessageAcker.java      |   2 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  70 ++-
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |   6 +
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 230 +-------
 ...NonPersistentAcknowledgmentGroupingTracker.java |  16 +-
 .../PersistentAcknowledgmentsGroupingTracker.java  | 655 +++++++++++++--------
 .../impl/conf/ConsumerConfigurationData.java       |   2 +
 .../impl/AcknowledgementsGroupingTrackerTest.java  | 107 +++-
 .../impl/ClientCnxRequestTimeoutQueueTest.java     |  13 +
 .../apache/pulsar/common/protocol/Commands.java    |  14 +-
 pulsar-common/src/main/proto/PulsarApi.proto       |   1 +
 21 files changed, 724 insertions(+), 598 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index d2cb692..7ff0c14 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1256,7 +1256,7 @@ public class ServerCnxTest {
         PositionImpl pos = new PositionImpl(0, 0);
 
         clientCommand = Commands.newAck(1 /* consumer id */, 
pos.getLedgerId(), pos.getEntryId(), null, AckType.Individual,
-                                        null, Collections.emptyMap());
+                                        null, Collections.emptyMap(), -1);
         channel.writeInbound(clientCommand);
 
         // verify nothing is sent out on the wire after ack
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java
index d67f5b4..5238796 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java
@@ -22,6 +22,7 @@ import lombok.Cleanup;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import java.util.ArrayList;
@@ -46,15 +47,20 @@ public class ConsumerAckListTest extends 
ProducerConsumerBase {
         super.internalCleanup();
     }
 
-    @Test(timeOut = 30000)
-    public void testBatchListAck() throws Exception {
-        ackListMessage(true,true);
-        ackListMessage(true,false);
-        ackListMessage(false,false);
-        ackListMessage(false,true);
+    @DataProvider(name = "ackReceiptEnabled")
+    public Object[][] ackReceiptEnabled() {
+        return new Object[][] { { true }, { false } };
     }
 
-    public void ackListMessage(boolean isBatch, boolean isPartitioned) throws 
Exception {
+    @Test(timeOut = 30000, dataProvider = "ackReceiptEnabled")
+    public void testBatchListAck(boolean ackReceiptEnabled) throws Exception {
+        ackListMessage(true,true, ackReceiptEnabled);
+        ackListMessage(true,false, ackReceiptEnabled);
+        ackListMessage(false,false, ackReceiptEnabled);
+        ackListMessage(false,true, ackReceiptEnabled);
+    }
+
+    public void ackListMessage(boolean isBatch, boolean isPartitioned, boolean 
ackReceiptEnabled) throws Exception {
         final String topic = "persistent://my-property/my-ns/batch-ack-" + 
UUID.randomUUID();
         final String subName = "testBatchAck-sub" + UUID.randomUUID();
         final int messageNum = ThreadLocalRandom.current().nextInt(50, 100);
@@ -72,6 +78,8 @@ public class ConsumerAckListTest extends ProducerConsumerBase 
{
                 .topic(topic)
                 .negativeAckRedeliveryDelay(1001, TimeUnit.MILLISECONDS)
                 .subscriptionName(subName)
+                .enableBatchIndexAcknowledgment(ackReceiptEnabled)
+                .isAckReceiptEnabled(ackReceiptEnabled)
                 .subscribe();
         sendMessagesAsyncAndWait(producer, messageNum);
         List<MessageId> messages = new ArrayList<>();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
index 927af02..abe9354 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import java.util.UUID;
@@ -43,12 +44,19 @@ public class ConsumerCleanupTest extends 
ProducerConsumerBase {
         super.internalCleanup();
     }
 
-    @Test
-    public void testAllTimerTaskShouldCanceledAfterConsumerClosed() throws 
PulsarClientException, InterruptedException {
+    @DataProvider(name = "ackReceiptEnabled")
+    public Object[][] ackReceiptEnabled() {
+        return new Object[][] { { true }, { false } };
+    }
+
+    @Test(dataProvider = "ackReceiptEnabled")
+    public void testAllTimerTaskShouldCanceledAfterConsumerClosed(boolean 
ackReceiptEnabled)
+            throws PulsarClientException, InterruptedException {
         PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 1);
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic("persistent://public/default/" + 
UUID.randomUUID().toString())
                 .subscriptionName("test")
+                .isAckReceiptEnabled(ackReceiptEnabled)
                 .subscribe();
         consumer.close();
         Thread.sleep(2000);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
index e828598..37337a8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
@@ -29,8 +29,10 @@ import lombok.Cleanup;
 
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.Sets;
@@ -54,6 +56,11 @@ public class ConsumerRedeliveryTest extends 
ProducerConsumerBase {
         super.internalCleanup();
     }
 
+    @DataProvider(name = "ackReceiptEnabled")
+    public Object[][] ackReceiptEnabled() {
+        return new Object[][] { { true }, { false } };
+    }
+
     /**
      * It verifies that redelivered messages are sorted based on the 
ledger-ids.
      * <pre>
@@ -64,8 +71,8 @@ public class ConsumerRedeliveryTest extends 
ProducerConsumerBase {
      * </pre>
      * @throws Exception
      */
-    @Test
-    public void testOrderedRedelivery() throws Exception {
+    @Test(dataProvider = "ackReceiptEnabled")
+    public void testOrderedRedelivery(boolean ackReceiptEnabled) throws 
Exception {
         String topic = "persistent://my-property/my-ns/redelivery-" + 
System.currentTimeMillis();
 
         conf.setManagedLedgerMaxEntriesPerLedger(2);
@@ -77,7 +84,8 @@ public class ConsumerRedeliveryTest extends 
ProducerConsumerBase {
                 .producerName("my-producer-name")
                 .create();
         ConsumerBuilder<byte[]> consumerBuilder = 
pulsarClient.newConsumer().topic(topic).subscriptionName("s1")
-                .subscriptionType(SubscriptionType.Shared);
+                .subscriptionType(SubscriptionType.Shared)
+                .isAckReceiptEnabled(ackReceiptEnabled);
         ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) 
consumerBuilder.subscribe();
 
         final int totalMsgs = 100;
@@ -130,12 +138,14 @@ public class ConsumerRedeliveryTest extends 
ProducerConsumerBase {
         }
     }
 
-    @Test
-    public void testUnAckMessageRedeliveryWithReceiveAsync() throws 
PulsarClientException, ExecutionException, InterruptedException {
+    @Test(dataProvider = "ackReceiptEnabled")
+    public void testUnAckMessageRedeliveryWithReceiveAsync(boolean 
ackReceiptEnabled) throws PulsarClientException, ExecutionException, 
InterruptedException {
         String topic = "persistent://my-property/my-ns/async-unack-redelivery";
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
                 .subscriptionName("s1")
+                .isAckReceiptEnabled(ackReceiptEnabled)
+                .enableBatchIndexAcknowledgment(ackReceiptEnabled)
                 .ackTimeout(3, TimeUnit.SECONDS)
                 .subscribe();
 
@@ -165,7 +175,6 @@ public class ConsumerRedeliveryTest extends 
ProducerConsumerBase {
         }
 
         assertEquals(10, messageReceived);
-
         for (int i = 0; i < messages; i++) {
             Message<String> message = consumer.receive();
             assertNotNull(message);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 9fc14c0..da8bcf8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -129,6 +129,11 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         };
     }
 
+    @DataProvider(name = "ackReceiptEnabled")
+    public Object[][] ackReceiptEnabled() {
+        return new Object[][] { { true }, { false } };
+    }
+
     @AfterMethod(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
@@ -269,6 +274,11 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         }
     }
 
+    @DataProvider(name = "batchAndAckReceipt")
+    public Object[][] codecProviderWithAckReceipt() {
+        return new Object[][] { { 0, true}, { 1000, false }, { 0, true }, { 
1000, false }};
+    }
+
     @DataProvider(name = "batch")
     public Object[][] codecProvider() {
         return new Object[][] { { 0 }, { 1000 } };
@@ -311,10 +321,11 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         log.info("-- Exiting {} test --", methodName);
     }
 
-    @Test(dataProvider = "batch")
-    public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs) throws 
Exception {
+    @Test(dataProvider = "batchAndAckReceipt")
+    public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs, boolean 
ackReceiptEnabled) throws Exception {
         log.info("-- Starting {} test --", methodName);
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic2")
+                .isAckReceiptEnabled(ackReceiptEnabled)
                 .subscriptionName("my-subscriber-name").subscribe();
 
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
@@ -1033,8 +1044,8 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         log.info("-- Exiting {} test --", methodName);
     }
 
-    @Test
-    public void testDeactivatingBacklogConsumer() throws Exception {
+    @Test(dataProvider = "ackReceiptEnabled")
+    public void testDeactivatingBacklogConsumer(boolean ackReceiptEnabled) 
throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         final long batchMessageDelayMs = 100;
@@ -1047,10 +1058,12 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         // 1. Subscriber Faster subscriber: let it consume all messages 
immediately
         Consumer<byte[]> subscriber1 = pulsarClient.newConsumer()
                 .topic("persistent://my-property/my-ns/" + 
topicName).subscriptionName(sub1)
+                .isAckReceiptEnabled(ackReceiptEnabled)
                 
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(receiverSize).subscribe();
         // 1.b. Subscriber Slow subscriber:
         Consumer<byte[]> subscriber2 = pulsarClient.newConsumer()
                 .topic("persistent://my-property/my-ns/" + 
topicName).subscriptionName(sub2)
+                .isAckReceiptEnabled(ackReceiptEnabled)
                 
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(receiverSize).subscribe();
 
         ProducerBuilder<byte[]> producerBuilder = 
pulsarClient.newProducer().topic(topic);
@@ -1077,7 +1090,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         // 3. Consume messages: at Faster subscriber
         for (int i = 0; i < totalMsgs; i++) {
             msg = subscriber1.receive(100, TimeUnit.MILLISECONDS);
-            subscriber1.acknowledge(msg);
+            subscriber1.acknowledgeAsync(msg);
         }
 
         // wait : so message can be eligible to to be evict from cache
@@ -1096,7 +1109,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         // 6. consume messages : at slower subscriber
         for (int i = 0; i < totalMsgs; i++) {
             msg = subscriber2.receive(100, TimeUnit.MILLISECONDS);
-            subscriber2.acknowledge(msg);
+            subscriber2.acknowledgeAsync(msg);
         }
 
         topicRef.checkBackloggedCursors();
@@ -1258,13 +1271,14 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
      *
      * @throws Exception
      */
-    @Test(timeOut = 30000)
-    public void testSharedConsumerAckDifferentConsumer() throws Exception {
+    @Test(dataProvider = "ackReceiptEnabled", timeOut = 30000)
+    public void testSharedConsumerAckDifferentConsumer(boolean 
ackReceiptEnabled) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
                 
.topic("persistent://my-property/my-ns/my-topic1").subscriptionName("my-subscriber-name")
                 .receiverQueueSize(1).subscriptionType(SubscriptionType.Shared)
+                .isAckReceiptEnabled(ackReceiptEnabled)
                 .acknowledgmentGroupTime(0, TimeUnit.SECONDS);
         Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
         Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
@@ -1355,8 +1369,8 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
      *
      * @throws Exception
      */
-    @Test
-    public void testConsumerBlockingWithUnAckedMessages() throws Exception {
+    @Test(dataProvider = "ackReceiptEnabled")
+    public void testConsumerBlockingWithUnAckedMessages(boolean 
ackReceiptEnabled) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         int unAckedMessages = 
pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
@@ -1368,6 +1382,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
             Consumer<byte[]> consumer = pulsarClient.newConsumer()
                     
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
+                    .isAckReceiptEnabled(ackReceiptEnabled)
                     
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
 
             Producer<byte[]> producer = pulsarClient.newProducer()
@@ -1395,13 +1410,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             assertEquals(messages.size(), unAckedMessagesBufferSize);
 
             // start acknowledging messages
-            messages.forEach(m -> {
-                try {
-                    consumer.acknowledge(m);
-                } catch (PulsarClientException e) {
-                    fail("ack failed", e);
-                }
-            });
+            messages.forEach(consumer::acknowledgeAsync);
 
             // try to consume remaining messages
             int remainingMessages = totalProducedMsgs - messages.size();
@@ -1434,8 +1443,8 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
      *
      * @throws Exception
      */
-    @Test
-    public void testConsumerBlockingWithUnAckedMessagesMultipleIteration() 
throws Exception {
+    @Test(dataProvider = "ackReceiptEnabled")
+    public void 
testConsumerBlockingWithUnAckedMessagesMultipleIteration(boolean 
ackReceiptEnabled) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         int unAckedMessages = 
pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
@@ -1450,6 +1459,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             Consumer<byte[]> consumer = pulsarClient.newConsumer()
                     
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
                     
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared)
+                    .isAckReceiptEnabled(ackReceiptEnabled)
                     .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
 
             Producer<byte[]> producer = pulsarClient.newProducer()
@@ -1508,8 +1518,8 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
      *
      * @throws Exception
      */
-    @Test
-    public void testMutlipleSharedConsumerBlockingWithUnAckedMessages() throws 
Exception {
+    @Test(dataProvider = "ackReceiptEnabled")
+    public void testMutlipleSharedConsumerBlockingWithUnAckedMessages(boolean 
ackReceiptEnabled) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         int unAckedMessages = 
pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
@@ -1522,11 +1532,13 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessages);
             Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
                     
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
+                    .isAckReceiptEnabled(ackReceiptEnabled)
                     
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
 
             PulsarClient newPulsarClient = 
newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
             Consumer<byte[]> consumer2 = newPulsarClient.newConsumer()
                     
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
+                    .isAckReceiptEnabled(ackReceiptEnabled)
                     
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
 
             Producer<byte[]> producer = pulsarClient.newProducer()
@@ -1663,8 +1675,8 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         }
     }
 
-    @Test
-    public void testUnackBlockRedeliverMessages() throws Exception {
+    @Test(dataProvider = "ackReceiptEnabled")
+    public void testUnackBlockRedeliverMessages(boolean ackReceiptEnabled) 
throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         int unAckedMessages = 
pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
@@ -1677,6 +1689,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
             ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer()
                     
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
+                    .isAckReceiptEnabled(ackReceiptEnabled)
                     
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
 
             Producer<byte[]> producer = pulsarClient.newProducer()
@@ -1730,8 +1743,8 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         }
     }
 
-    @Test(dataProvider = "batch")
-    public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws 
Exception {
+    @Test(dataProvider = "batchAndAckReceipt")
+    public void testUnackedBlockAtBatch(int batchMessageDelayMs, boolean 
ackReceiptEnabled) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         int unAckedMessages = 
pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
@@ -1744,6 +1757,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessages);
             Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
                     
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
+                    .isAckReceiptEnabled(ackReceiptEnabled)
                     
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
 
             ProducerBuilder<byte[]> producerBuidler = 
pulsarClient.newProducer()
@@ -1800,7 +1814,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
                 if (msg != null) {
                     messages.add(msg);
                     totalReceiveMessages++;
-                    consumer1.acknowledge(msg);
+                    consumer1.acknowledgeAsync(msg);
                     log.info("Received message: " + new String(msg.getData()));
                 } else {
                     break;
@@ -2326,8 +2340,8 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         log.info("-- Exiting {} test --", methodName);
     }
 
-    @Test
-    public void testRedeliveryFailOverConsumer() throws Exception {
+    @Test(dataProvider = "ackReceiptEnabled")
+    public void testRedeliveryFailOverConsumer(boolean ackReceiptEnabled) 
throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         final int receiverQueueSize = 10;
@@ -2336,6 +2350,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer()
                 
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
                 
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Failover)
+                .isAckReceiptEnabled(ackReceiptEnabled)
                 .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
 
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckDisableTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckDisableTest.java
index 474a586..46f11fb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckDisableTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckDisableTest.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import java.util.ArrayList;
@@ -57,8 +58,13 @@ public class BatchMessageIndexAckDisableTest extends 
ProducerConsumerBase {
         super.internalCleanup();
     }
 
-    @Test
-    public void testBatchMessageIndexAckForSharedSubscription() throws 
PulsarClientException, ExecutionException, InterruptedException {
+    @DataProvider(name = "ackReceiptEnabled")
+    public Object[][] ackReceiptEnabled() {
+        return new Object[][] { { true }, { false } };
+    }
+
+    @Test(dataProvider = "ackReceiptEnabled")
+    public void testBatchMessageIndexAckForSharedSubscription(boolean 
ackReceiptEnabled) throws PulsarClientException, ExecutionException, 
InterruptedException {
         final String topic = "testBatchMessageIndexAckForSharedSubscription";
 
         @Cleanup
@@ -67,6 +73,7 @@ public class BatchMessageIndexAckDisableTest extends 
ProducerConsumerBase {
             .subscriptionName("sub")
             .receiverQueueSize(100)
             .subscriptionType(SubscriptionType.Shared)
+            .isAckReceiptEnabled(ackReceiptEnabled)
             .ackTimeout(1, TimeUnit.SECONDS)
             .subscribe();
 
@@ -97,8 +104,8 @@ public class BatchMessageIndexAckDisableTest extends 
ProducerConsumerBase {
         Assert.assertEquals(received.size(), 100);
     }
 
-    @Test
-    public void testBatchMessageIndexAckForExclusiveSubscription() throws 
PulsarClientException, ExecutionException, InterruptedException {
+    @Test(dataProvider = "ackReceiptEnabled")
+    public void testBatchMessageIndexAckForExclusiveSubscription(boolean 
ackReceiptEnabled) throws PulsarClientException, ExecutionException, 
InterruptedException {
         final String topic = 
"testBatchMessageIndexAckForExclusiveSubscription";
 
         @Cleanup
@@ -106,6 +113,7 @@ public class BatchMessageIndexAckDisableTest extends 
ProducerConsumerBase {
             .topic(topic)
             .subscriptionName("sub")
             .receiverQueueSize(100)
+            .isAckReceiptEnabled(ackReceiptEnabled)
             .subscribe();
 
         @Cleanup
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
index 6ff7d23..db9decb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import java.util.ArrayList;
@@ -58,8 +59,13 @@ public class BatchMessageIndexAckTest extends 
ProducerConsumerBase {
         super.internalCleanup();
     }
 
-    @Test
-    public void testBatchMessageIndexAckForSharedSubscription() throws 
Exception {
+    @DataProvider(name = "ackReceiptEnabled")
+    public Object[][] ackReceiptEnabled() {
+        return new Object[][] { { true }, { false } };
+    }
+
+    @Test(dataProvider = "ackReceiptEnabled")
+    public void testBatchMessageIndexAckForSharedSubscription(boolean 
ackReceiptEnabled) throws Exception {
         final String topic = "testBatchMessageIndexAckForSharedSubscription";
         final String subscriptionName = "sub";
 
@@ -68,6 +74,7 @@ public class BatchMessageIndexAckTest extends 
ProducerConsumerBase {
             .topic(topic)
             .subscriptionName(subscriptionName)
             .receiverQueueSize(100)
+            .isAckReceiptEnabled(ackReceiptEnabled)
             .subscriptionType(SubscriptionType.Shared)
             .enableBatchIndexAcknowledgment(true)
             .negativeAckRedeliveryDelay(2, TimeUnit.SECONDS)
@@ -138,8 +145,8 @@ public class BatchMessageIndexAckTest extends 
ProducerConsumerBase {
         Assert.assertEquals(received.size(), 100);
     }
 
-    @Test
-    public void testBatchMessageIndexAckForExclusiveSubscription() throws 
PulsarClientException, ExecutionException, InterruptedException {
+    @Test(dataProvider = "ackReceiptEnabled")
+    public void testBatchMessageIndexAckForExclusiveSubscription(boolean 
ackReceiptEnabled) throws PulsarClientException, ExecutionException, 
InterruptedException {
         final String topic = 
"testBatchMessageIndexAckForExclusiveSubscription";
 
         @Cleanup
@@ -147,6 +154,7 @@ public class BatchMessageIndexAckTest extends 
ProducerConsumerBase {
             .topic(topic)
             .subscriptionName("sub")
             .receiverQueueSize(100)
+            .isAckReceiptEnabled(ackReceiptEnabled)
             .enableBatchIndexAcknowledgment(true)
             .subscribe();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index f0294df..1d3af92 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -56,6 +56,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.Lists;
@@ -80,9 +81,26 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
         super.internalCleanup();
     }
 
+    @DataProvider(name = "ackReceiptEnabled")
+    public Object[][] ackReceiptEnabled() {
+        return new Object[][] { { true }, { false } };
+    }
 
     @Test
-    public void testLargeMessage() throws Exception {
+    public void testInvalidConfig() throws Exception {
+        final String topicName = "persistent://my-property/my-ns/my-topic1";
+        ProducerBuilder<byte[]> producerBuilder = 
pulsarClient.newProducer().topic(topicName);
+        // batching and chunking can't be enabled together
+        try {
+            Producer<byte[]> producer = 
producerBuilder.enableChunking(true).enableBatching(true).create();
+            fail("producer creation should have fail");
+        } catch (IllegalArgumentException ie) {
+            // Ok
+        }
+    }
+
+    @Test(dataProvider = "ackReceiptEnabled")
+    public void testLargeMessage(boolean ackReceiptEnabled) throws Exception {
 
         log.info("-- Starting {} test --", methodName);
         this.conf.setMaxMessageSize(5);
@@ -90,6 +108,7 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
         final String topicName = "persistent://my-property/my-ns/my-topic1";
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
+                .isAckReceiptEnabled(ackReceiptEnabled)
                 .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
 
         ProducerBuilder<byte[]> producerBuilder = 
pulsarClient.newProducer().topic(topicName);
@@ -145,8 +164,8 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
 
     }
 
-    @Test
-    public void testLargeMessageAckTimeOut() throws Exception {
+    @Test(dataProvider = "ackReceiptEnabled")
+    public void testLargeMessageAckTimeOut(boolean ackReceiptEnabled) throws 
Exception {
 
         log.info("-- Starting {} test --", methodName);
         this.conf.setMaxMessageSize(5);
@@ -155,6 +174,7 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
 
         ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topicName)
                 
.subscriptionName("my-subscriber-name").acknowledgmentGroupTime(0, 
TimeUnit.SECONDS)
+                .isAckReceiptEnabled(ackReceiptEnabled)
                 .ackTimeout(5, TimeUnit.SECONDS).subscribe();
 
         ProducerBuilder<byte[]> producerBuilder = 
pulsarClient.newProducer().topic(topicName);
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index c6a6555..246394a 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -187,6 +187,14 @@ public interface ConsumerBuilder<T> extends Cloneable {
     ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
 
     /**
+     * Ack will return receipt but does not mean that the message will not be 
resent after get receipt.
+     *
+     * @param isAckReceiptEnabled {@link Boolean} is enable ack for receipt
+     * @return the consumer builder instance
+     */
+    ConsumerBuilder<T> isAckReceiptEnabled(boolean isAckReceiptEnabled);
+
+    /**
      * Define the granularity of the ack-timeout redelivery.
      *
      * <p>By default, the tick time is set to 1 second. Using an higher tick 
time will
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
index 47df3df..319e327 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
@@ -20,8 +20,10 @@ package org.apache.pulsar.client.impl;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 
 /**
@@ -31,12 +33,10 @@ public interface AcknowledgmentsGroupingTracker extends 
AutoCloseable {
 
     boolean isDuplicate(MessageId messageId);
 
-    void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, 
Long> properties, TransactionImpl txn);
-
-    void addListAcknowledgment(List<MessageIdImpl> messageIds, AckType 
ackType, Map<String, Long> properties);
+    CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType 
ackType, Map<String, Long> properties);
 
-    void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, 
int batchSize, AckType ackType,
-                                     Map<String, Long> properties, 
TransactionImpl txn);
+    CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds, 
AckType ackType,
+                                                  Map<String, Long> 
properties);
 
     void flush();
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
index a16dcb4..8b178f2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
@@ -42,7 +42,7 @@ class BatchMessageAcker {
     // bitset shared across messages in the same batch.
     private final int batchSize;
     private final BitSet bitSet;
-    private boolean prevBatchCumulativelyAcked = false;
+    private volatile boolean prevBatchCumulativelyAcked = false;
 
     BatchMessageAcker(BitSet bitSet, int batchSize) {
         this.bitSet = bitSet;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 8a0a7f5..4e6ea0b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -170,7 +170,8 @@ public class ClientCnx extends PulsarHandler {
         GetLastMessageId,
         GetTopics,
         GetSchema,
-        GetOrCreateSchema;
+        GetOrCreateSchema,
+        AckResponse;
 
         String getDescription() {
             if (this == Command) {
@@ -399,12 +400,17 @@ public class ClientCnx extends PulsarHandler {
     protected void handleAckResponse(CommandAckResponse ackResponse) {
         checkArgument(state == State.Ready);
         checkArgument(ackResponse.getRequestId() >= 0);
-        long consumerId = ackResponse.getConsumerId();
-        if (!ackResponse.hasError()) {
-            consumers.get(consumerId).ackReceipt(ackResponse.getRequestId());
+        CompletableFuture<?> completableFuture = 
pendingRequests.remove(ackResponse.getRequestId());
+        if (completableFuture != null && !completableFuture.isDone()) {
+            if (!ackResponse.hasError()) {
+                completableFuture.complete(null);
+            } else {
+                completableFuture.completeExceptionally(
+                        getPulsarClientException(ackResponse.getError(), 
ackResponse.getMessage()));
+            }
         } else {
-            consumers.get(consumerId).ackError(ackResponse.getRequestId(),
-                    getPulsarClientException(ackResponse.getError(), 
ackResponse.getMessage()));
+            log.warn("AckResponse has complete when receive response! 
requestId : {}, consumerId : {}",
+                    ackResponse.getRequestId(), ackResponse.hasConsumerId());
         }
     }
 
@@ -741,7 +747,16 @@ public class ClientCnx extends PulsarHandler {
     }
 
     public CompletableFuture<List<String>> newGetTopicsOfNamespace(ByteBuf 
request, long requestId) {
-        return sendRequestAndHandleTimeout(request, requestId, 
RequestType.GetTopics);
+        return sendRequestAndHandleTimeout(request, requestId, 
RequestType.GetTopics, true);
+    }
+
+    public CompletableFuture<Void> newAckForReceipt(ByteBuf request, long 
requestId) {
+        return sendRequestAndHandleTimeout(request, requestId, 
RequestType.AckResponse,true);
+    }
+
+    public void newAckForReceiptWithFuture(ByteBuf request, long requestId,
+                                           TimedCompletableFuture<Void> 
future) {
+        sendRequestAndHandleTimeout(request, requestId, 
RequestType.AckResponse, false, future);
     }
 
     @Override
@@ -816,25 +831,39 @@ public class ClientCnx extends PulsarHandler {
     }
 
     CompletableFuture<ProducerResponse> sendRequestWithId(ByteBuf cmd, long 
requestId) {
-        return sendRequestAndHandleTimeout(cmd, requestId, 
RequestType.Command);
+        return sendRequestAndHandleTimeout(cmd, requestId, 
RequestType.Command, true);
     }
 
-    private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf 
requestMessage, long requestId, RequestType requestType) {
-        TimedCompletableFuture<T> future = new TimedCompletableFuture<>();
+    private <T> void sendRequestAndHandleTimeout(ByteBuf requestMessage, long 
requestId,
+                                                                 RequestType 
requestType, boolean flush,
+                                                                 
TimedCompletableFuture<T> future) {
         pendingRequests.put(requestId, future);
-        ctx.writeAndFlush(requestMessage).addListener(writeFuture -> {
-            if (!writeFuture.isSuccess()) {
-                log.warn("{} Failed to send {} to broker: {}", ctx.channel(), 
requestType.getDescription(), writeFuture.cause().getMessage());
-                pendingRequests.remove(requestId);
-                future.completeExceptionally(writeFuture.cause());
-            }
-        });
+        if (flush) {
+            ctx.writeAndFlush(requestMessage).addListener(writeFuture -> {
+                if (!writeFuture.isSuccess()) {
+                    CompletableFuture<?> newFuture = 
pendingRequests.remove(requestId);
+                    if (!newFuture.isDone()) {
+                        log.warn("{} Failed to send {} to broker: {}", 
ctx.channel(),
+                                requestType.getDescription(), 
writeFuture.cause().getMessage());
+                        future.completeExceptionally(writeFuture.cause());
+                    }
+                }
+            });
+        } else {
+            ctx.write(requestMessage, ctx().voidPromise());
+        }
         requestTimeoutQueue.add(new RequestTime(System.currentTimeMillis(), 
requestId, requestType));
+    }
+
+    private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf 
requestMessage, long requestId,
+                                                 RequestType requestType, 
boolean flush) {
+        TimedCompletableFuture<T> future = new TimedCompletableFuture<>();
+        sendRequestAndHandleTimeout(requestMessage, requestId, requestType, 
flush, future);
         return future;
     }
 
     public CompletableFuture<MessageIdData> sendGetLastMessageId(ByteBuf 
request, long requestId) {
-        return sendRequestAndHandleTimeout(request, requestId, 
RequestType.GetLastMessageId);
+        return sendRequestAndHandleTimeout(request, requestId, 
RequestType.GetLastMessageId, true);
     }
 
     public CompletableFuture<Optional<SchemaInfo>> sendGetSchema(ByteBuf 
request, long requestId) {
@@ -856,11 +885,12 @@ public class ClientCnx extends PulsarHandler {
     }
 
     public CompletableFuture<CommandGetSchemaResponse> 
sendGetRawSchema(ByteBuf request, long requestId) {
-        return sendRequestAndHandleTimeout(request, requestId, 
RequestType.GetSchema);
+        return sendRequestAndHandleTimeout(request, requestId, 
RequestType.GetSchema, true);
     }
 
     public CompletableFuture<byte[]> sendGetOrCreateSchema(ByteBuf request, 
long requestId) {
-        CompletableFuture<CommandGetOrCreateSchemaResponse> future = 
sendRequestAndHandleTimeout(request, requestId, RequestType.GetOrCreateSchema);
+        CompletableFuture<CommandGetOrCreateSchemaResponse> future = 
sendRequestAndHandleTimeout(request, requestId,
+                RequestType.GetOrCreateSchema, true);
         return future.thenCompose(response -> {
             if (response.hasErrorCode()) {
                 // Request has failed
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 8429598..1e987e1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -194,6 +194,12 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
     }
 
     @Override
+    public ConsumerBuilder<T> isAckReceiptEnabled(boolean isAckReceiptEnabled) 
{
+        conf.setAckReceiptEnabled(isAckReceiptEnabled);
+        return this;
+    }
+
+    @Override
     public ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit 
timeUnit) {
         checkArgument(timeUnit.toMillis(tickTime) >= MIN_TICK_TIME_MILLIS,
                 "Ack timeout tick time should be greater than " + 
MIN_TICK_TIME_MILLIS + " ms");
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 21f3f13..d8d0ec4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -60,7 +60,6 @@ import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
-import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageCrypto;
@@ -68,7 +67,6 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Messages;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
-import 
org.apache.pulsar.client.api.PulsarClientException.MessageAcknowledgeException;
 import 
org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -101,7 +99,6 @@ import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.SafeCollectionUtils;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
-import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
@@ -186,8 +183,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     private final boolean createTopicIfDoesNotExist;
 
-    private final ConcurrentLongHashMap<OpForAckCallBack> ackRequests;
-
     private final AtomicReference<ClientCnx> 
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
 
     static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
@@ -346,7 +341,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
-        this.ackRequests = new ConcurrentLongHashMap<>(16, 1);
 
         grabCnx();
     }
@@ -505,43 +499,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return result;
     }
 
-    boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType 
ackType,
-                                   Map<String,Long> properties, 
TransactionImpl txn) {
-        boolean isAllMsgsAcked;
-        if (ackType == AckType.Individual) {
-            isAllMsgsAcked = txn == null && batchMessageId.ackIndividual();
-        } else {
-            isAllMsgsAcked = batchMessageId.ackCumulative();
-        }
-        int outstandingAcks = 0;
-        if (log.isDebugEnabled()) {
-            outstandingAcks = batchMessageId.getOutstandingAcksInSameBatch();
-        }
-
-        int batchSize = batchMessageId.getBatchSize();
-        // all messages in this batch have been acked
-        if (isAllMsgsAcked) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] can ack message to broker {}, acktype {}, 
cardinality {}, length {}", subscription,
-                        consumerName, batchMessageId, ackType, 
outstandingAcks, batchSize);
-            }
-            return true;
-        } else {
-            if (AckType.Cumulative == ackType
-                && !batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
-                sendAcknowledge(batchMessageId.prevBatchMessageId(), 
AckType.Cumulative, properties, null);
-                batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
-            } else {
-                onAcknowledge(batchMessageId, null);
-            }
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] cannot ack message to broker {}, acktype 
{}, pending acks - {}", subscription,
-                        consumerName, batchMessageId, ackType, 
outstandingAcks);
-            }
-        }
-        return false;
-    }
-
     @Override
     protected CompletableFuture<Void> doAcknowledge(MessageId messageId, 
AckType ackType,
                                                     Map<String,Long> 
properties,
@@ -562,67 +519,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             return doTransactionAcknowledgeForResponse(messageId, ackType, 
null, properties,
                     new TxnID(txn.getTxnIdMostBits(), 
txn.getTxnIdLeastBits()));
         }
-
-        if (messageId instanceof BatchMessageIdImpl) {
-            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
-            if (markAckForBatchMessage(batchMessageId, ackType, properties, 
txn)) {
-                // all messages in batch have been acked so broker can be 
acked via sendAcknowledge()
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] [{}] acknowledging message - {}, acktype 
{}", subscription, consumerName, messageId,
-                            ackType);
-                }
-            } else {
-                if (conf.isBatchIndexAckEnabled()) {
-                    
acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId,
-                            batchMessageId.getBatchIndex(), 
batchMessageId.getBatchSize(),
-                            ackType, properties, txn);
-                }
-                // other messages in batch are still pending ack.
-                return CompletableFuture.completedFuture(null);
-            }
-        }
-        return sendAcknowledge(messageId, ackType, properties, txn);
+        return 
acknowledgmentsGroupingTracker.addAcknowledgment((MessageIdImpl) messageId, 
ackType, properties);
     }
 
     @Override
     protected CompletableFuture<Void> doAcknowledge(List<MessageId> 
messageIdList, AckType ackType, Map<String, Long> properties, TransactionImpl 
txn) {
-        if (AckType.Cumulative.equals(ackType)) {
-            List<CompletableFuture<Void>> completableFutures = new 
ArrayList<>();
-            messageIdList.forEach(messageId -> 
completableFutures.add(doAcknowledge(messageId, ackType, properties, txn)));
-            return CompletableFuture.allOf(completableFutures.toArray(new 
CompletableFuture[0]));
-        }
-        if (getState() != State.Ready && getState() != State.Connecting) {
-            stats.incrementNumAcksFailed();
-            PulsarClientException exception = new 
PulsarClientException("Consumer not ready. State: " + getState());
-            messageIdList.forEach(messageId -> onAcknowledge(messageId, 
exception));
-            return FutureUtil.failedFuture(exception);
-        }
-        List<MessageIdImpl> nonBatchMessageIds = new ArrayList<>();
-        for (MessageId messageId : messageIdList) {
-            MessageIdImpl messageIdImpl;
-            if (messageId instanceof BatchMessageIdImpl
-                    && !markAckForBatchMessage((BatchMessageIdImpl) messageId, 
ackType, properties, txn)) {
-                BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
messageId;
-                messageIdImpl = new 
MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId()
-                        , batchMessageId.getPartitionIndex());
-                
acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId, 
batchMessageId.getBatchIndex(),
-                        batchMessageId.getBatchSize(), ackType, properties, 
txn);
-                stats.incrementNumAcksSent(batchMessageId.getBatchSize());
-            } else {
-                messageIdImpl = (MessageIdImpl) messageId;
-                stats.incrementNumAcksSent(1);
-                nonBatchMessageIds.add(messageIdImpl);
-            }
-            unAckedMessageTracker.remove(messageIdImpl);
-            if (possibleSendToDeadLetterTopicMessages != null) {
-                possibleSendToDeadLetterTopicMessages.remove(messageIdImpl);
-            }
-            onAcknowledge(messageId, null);
-        }
-        if (nonBatchMessageIds.size() > 0) {
-            
acknowledgmentsGroupingTracker.addListAcknowledgment(nonBatchMessageIds, 
ackType, properties);
-        }
-        return CompletableFuture.completedFuture(null);
+        return 
this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdList, 
ackType, properties);
     }
 
 
@@ -749,44 +651,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return CompletableFuture.completedFuture(null);
     }
 
-    // TODO: handle transactional acknowledgements.
-    private CompletableFuture<Void> sendAcknowledge(MessageId messageId, 
AckType ackType,
-                                                    Map<String,Long> 
properties,
-                                                    TransactionImpl txnImpl) {
-        MessageIdImpl msgId = (MessageIdImpl) messageId;
-
-        if (ackType == AckType.Individual) {
-            if (messageId instanceof BatchMessageIdImpl) {
-                BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
messageId;
-
-                stats.incrementNumAcksSent(batchMessageId.getBatchSize());
-                unAckedMessageTracker.remove(new 
MessageIdImpl(batchMessageId.getLedgerId(),
-                        batchMessageId.getEntryId(), 
batchMessageId.getPartitionIndex()));
-                if (possibleSendToDeadLetterTopicMessages != null) {
-                    possibleSendToDeadLetterTopicMessages.remove(new 
MessageIdImpl(batchMessageId.getLedgerId(),
-                            batchMessageId.getEntryId(), 
batchMessageId.getPartitionIndex()));
-                }
-            } else {
-                // increment counter by 1 for non-batch msg
-                unAckedMessageTracker.remove(msgId);
-                if (possibleSendToDeadLetterTopicMessages != null) {
-                    possibleSendToDeadLetterTopicMessages.remove(msgId);
-                }
-                stats.incrementNumAcksSent(1);
-            }
-            onAcknowledge(messageId, null);
-        } else if (ackType == AckType.Cumulative) {
-            onAcknowledgeCumulative(messageId, null);
-            
stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(msgId));
-        }
-
-        acknowledgmentsGroupingTracker.addAcknowledgment(msgId, ackType, 
properties, txnImpl);
-
-        // Consumer acknowledgment operation immediately succeeds. In any 
case, if we're not able to send ack to broker,
-        // the messages will be re-delivered
-        return CompletableFuture.completedFuture(null);
-    }
-
     @Override
     public void negativeAcknowledge(MessageId messageId) {
         negativeAcksTracker.add(messageId);
@@ -1053,17 +917,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         if (possibleSendToDeadLetterTopicMessages != null) {
             possibleSendToDeadLetterTopicMessages.clear();
         }
-
-        if (!ackRequests.isEmpty()) {
-            ackRequests.forEach((key, value) -> {
-                value.callback
-                        .completeExceptionally(new 
MessageAcknowledgeException("Consumer has closed!"));
-                value.recycle();
-            });
-
-            ackRequests.clear();
-        }
-
         acknowledgmentsGroupingTracker.close();
         if (batchReceiveTimeout != null) {
             batchReceiveTimeout.cancel();
@@ -1679,7 +1532,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     private void discardMessage(MessageIdData messageId, ClientCnx currentCnx, 
ValidationError validationError) {
         ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), 
messageId.getEntryId(), null, AckType.Individual,
-                                      validationError, Collections.emptyMap());
+                                      validationError, Collections.emptyMap(), 
-1);
         currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise());
         increaseAvailablePermits(currentCnx);
         stats.incrementNumReceiveFailed();
@@ -2216,7 +2069,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     }
 
     @Override
-    public ConsumerStats getStats() {
+    public ConsumerStatsRecorder getStats() {
         return stats;
     }
 
@@ -2269,6 +2122,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         if (clientCnx != null) {
             this.connectionHandler.setClientCnx(clientCnx);
             clientCnx.registerConsumer(consumerId, this);
+            if (conf.isAckReceiptEnabled() &&
+                    
Commands.peerSupportsAckReceipt(clientCnx.getRemoteEndpointProtocolVersion())) {
+                log.warn("Server don't support ack for receipt! " +
+                        "ProtoVersion >=17 support! nowVersion : {}", 
clientCnx.getRemoteEndpointProtocolVersion());
+            }
         }
         ClientCnx previousClientCnx = 
clientCnxUsedForConsumerRegistration.getAndSet(clientCnx);
         if (previousClientCnx != null && previousClientCnx != clientCnx) {
@@ -2388,7 +2246,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     private CompletableFuture<Void> 
doTransactionAcknowledgeForResponse(MessageId messageId, AckType ackType,
                                                                         
ValidationError validationError,
                                                                         
Map<String, Long> properties, TxnID txnID) {
-        CompletableFuture<Void> callBack = new CompletableFuture<>();
         BitSetRecyclable bitSetRecyclable = null;
         long ledgerId;
         long entryId;
@@ -2418,79 +2275,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                     validationError, properties, txnID.getLeastSigBits(), 
txnID.getMostSigBits(), requestId);
         }
 
-        OpForAckCallBack op = OpForAckCallBack.create(cmd, callBack, messageId,
-                new TxnID(txnID.getMostSigBits(), txnID.getLeastSigBits()));
-        ackRequests.put(requestId, op);
         if (ackType == AckType.Cumulative) {
             unAckedMessageTracker.removeMessagesTill(messageId);
         } else {
             unAckedMessageTracker.remove(messageId);
         }
-        cmd.retain();
-        cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise());
-        return callBack;
+        return cnx().newAckForReceipt(cmd, requestId);
     }
 
-    protected void ackReceipt(long requestId) {
-        OpForAckCallBack callBackOp = ackRequests.remove(requestId);
-        if (callBackOp == null || callBackOp.callback.isDone()) {
-            log.info("Ack request has been handled requestId : {}", requestId);
-        } else {
-            callBackOp.callback.complete(null);
-            if (log.isDebugEnabled()) {
-                log.debug("MessageId : {} has ack by TxnId : {}", 
callBackOp.messageId.getLedgerId() + ":"
-                        + callBackOp.messageId.getEntryId(), 
callBackOp.txnID.toString());
-            }
-        }
-    }
-
-    protected void ackError(long requestId, PulsarClientException 
pulsarClientException) {
-        OpForAckCallBack callBackOp = ackRequests.remove(requestId);
-        if (callBackOp == null || callBackOp.callback.isDone()) {
-            log.info("Ack request has been handled requestId : {}", requestId);
-        } else {
-            callBackOp.callback.completeExceptionally(pulsarClientException);
-            if (log.isDebugEnabled()) {
-                log.debug("MessageId : {} has ack by TxnId : {}", 
callBackOp.messageId, callBackOp.txnID);
-            }
-            callBackOp.recycle();
-        }
-    }
-
-    private static class OpForAckCallBack {
-        protected ByteBuf cmd;
-        protected CompletableFuture<Void> callback;
-        protected MessageIdImpl messageId;
-        protected TxnID txnID;
-
-        static OpForAckCallBack create(ByteBuf cmd, CompletableFuture<Void> 
callback,
-                                       MessageId messageId, TxnID txnID) {
-            OpForAckCallBack op = RECYCLER.get();
-            op.callback = callback;
-            op.cmd = cmd;
-            op.messageId = (MessageIdImpl) messageId;
-            op.txnID = txnID;
-            return op;
-        }
-
-        private OpForAckCallBack(Recycler.Handle<OpForAckCallBack> 
recyclerHandle) {
-            this.recyclerHandle = recyclerHandle;
-        }
-
-        void recycle() {
-            if (cmd != null) {
-                ReferenceCountUtil.safeRelease(cmd);
-            }
-            recyclerHandle.recycle(this);
-        }
-
-        private final Recycler.Handle<OpForAckCallBack> recyclerHandle;
-        private static final Recycler<OpForAckCallBack> RECYCLER = new 
Recycler<OpForAckCallBack>() {
-            @Override
-            protected OpForAckCallBack newObject(Handle<OpForAckCallBack> 
handle) {
-                return new OpForAckCallBack(handle);
-            }
-        };
+    public Map<MessageIdImpl, List<MessageImpl<T>>> 
getPossibleSendToDeadLetterTopicMessages() {
+        return possibleSendToDeadLetterTopicMessages;
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerImpl.class);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
index 6f56f26..36cd99b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.client.impl;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
@@ -42,20 +44,16 @@ public class NonPersistentAcknowledgmentGroupingTracker 
implements Acknowledgmen
         return false;
     }
 
-    public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, 
Map<String,
-            Long> properties, TransactionImpl txnImpl) {
-        // no-op
-    }
-
-    @Override
-    public void addListAcknowledgment(List<MessageIdImpl> messageIds, AckType 
ackType, Map<String, Long> properties) {
+    public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, 
AckType ackType, Map<String,
+            Long> properties) {
         // no-op
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
-    public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int 
batchIndex, int batchSize,
-                                            AckType ackType, Map<String, Long> 
properties, TransactionImpl transaction) {
+    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> 
messageIds, AckType ackType, Map<String, Long> properties) {
         // no-op
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index bfa025b..ae3eb97 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -22,24 +22,30 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import io.netty.util.Recycler;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.client.impl.transaction.TransactionImpl;
-import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.client.util.TimedCompletableFuture;
+import org.apache.pulsar.common.api.proto.CommandAck;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
-import org.apache.pulsar.common.api.proto.CommandAck.ValidationError;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
 
@@ -58,18 +64,19 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
 
     private final long acknowledgementGroupTimeMicros;
 
-    /**
-     * Latest cumulative ack sent to broker
-     */
-    private volatile MessageIdImpl lastCumulativeAck = (MessageIdImpl) 
MessageId.earliest;
-    private volatile BitSetRecyclable lastCumulativeAckSet = null;
+    private volatile TimedCompletableFuture<Void> currentIndividualAckFuture;
+    private volatile TimedCompletableFuture<Void> currentCumulativeAckFuture;
+
+    private volatile LastCumulativeAck lastCumulativeAck =
+            LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, 
null);
+
     private volatile boolean cumulativeAckFlushRequired = false;
 
-    private static final 
AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker, 
MessageIdImpl> LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater
-            .newUpdater(PersistentAcknowledgmentsGroupingTracker.class, 
MessageIdImpl.class, "lastCumulativeAck");
-    private static final 
AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker, 
BitSetRecyclable> LAST_CUMULATIVE_ACK_SET_UPDATER = AtomicReferenceFieldUpdater
-        .newUpdater(PersistentAcknowledgmentsGroupingTracker.class, 
BitSetRecyclable.class, "lastCumulativeAckSet");
+    // When we flush the command, we should ensure current ack request will 
send correct
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
+    private static final 
AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker, 
LastCumulativeAck> LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater
+            .newUpdater(PersistentAcknowledgmentsGroupingTracker.class, 
LastCumulativeAck.class, "lastCumulativeAck");
 
     /**
      * This is a set of all the individual acks that the application has 
issued and that were not already sent to
@@ -78,10 +85,9 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
     private final ConcurrentSkipListSet<MessageIdImpl> pendingIndividualAcks;
     private final ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable> 
pendingIndividualBatchIndexAcks;
 
-    private final ConcurrentHashMap<TransactionImpl, 
ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>> 
pendingIndividualTransactionBatchIndexAcks;
-    private final ConcurrentSkipListSet<Triple<Long, Long, MessageIdImpl>> 
pendingIndividualTransactionAcks;
-
     private final ScheduledFuture<?> scheduledTask;
+    private final boolean batchIndexAckEnabled;
+    private final boolean ackReceiptEnabled;
 
     public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, 
ConsumerConfigurationData<?> conf,
                                                     EventLoopGroup 
eventLoopGroup) {
@@ -89,8 +95,10 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
         this.pendingIndividualAcks = new ConcurrentSkipListSet<>();
         this.pendingIndividualBatchIndexAcks = new ConcurrentHashMap<>();
         this.acknowledgementGroupTimeMicros = 
conf.getAcknowledgementsGroupTimeMicros();
-        this.pendingIndividualTransactionBatchIndexAcks = new 
ConcurrentHashMap<>();
-        this.pendingIndividualTransactionAcks = new ConcurrentSkipListSet<>();
+        this.batchIndexAckEnabled = conf.isBatchIndexAckEnabled();
+        this.ackReceiptEnabled = conf.isAckReceiptEnabled();
+        this.currentIndividualAckFuture = new TimedCompletableFuture<>();
+        this.currentCumulativeAckFuture = new TimedCompletableFuture<>();
 
         if (acknowledgementGroupTimeMicros > 0) {
             scheduledTask = 
eventLoopGroup.next().scheduleWithFixedDelay(this::flush, 
acknowledgementGroupTimeMicros,
@@ -106,7 +114,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
      */
     @Override
     public boolean isDuplicate(MessageId messageId) {
-        if (messageId.compareTo(lastCumulativeAck) <= 0) {
+        if (messageId.compareTo(lastCumulativeAck.messageId) <= 0) {
             // Already included in a cumulative ack
             return true;
         } else {
@@ -115,186 +123,302 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
     }
 
     @Override
-    public void addListAcknowledgment(List<MessageIdImpl> messageIds, AckType 
ackType, Map<String, Long> properties) {
-        if (ackType == AckType.Cumulative) {
-            messageIds.forEach(messageId -> doCumulativeAck(messageId, null));
-            return;
+    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> 
messageIds,
+                                                         AckType ackType, 
Map<String, Long> properties) {
+        if (AckType.Cumulative.equals(ackType)) {
+            if (isAckReceiptEnabled(consumer.getClientCnx())) {
+                Set<CompletableFuture<Void>> completableFutureSet = new 
HashSet<>();
+                messageIds.forEach(messageId ->
+                        
completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, 
properties)));
+                return FutureUtil.waitForAll(new 
ArrayList<>(completableFutureSet));
+            } else {
+                messageIds.forEach(messageId -> 
addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
+                return CompletableFuture.completedFuture(null);
+            }
+        } else {
+            if (isAckReceiptEnabled(consumer.getClientCnx())) {
+                try {
+                    // when flush the ack, we should bind the this ack in the 
currentFuture, during this time we can't
+                    // change currentFuture. but we can lock by the read lock, 
because the currentFuture is not change
+                    // any ack operation is allowed.
+                    this.lock.readLock().lock();
+                    addListAcknowledgment(messageIds);
+                    return this.currentIndividualAckFuture;
+                } finally {
+                    this.lock.readLock().unlock();
+                    if (acknowledgementGroupTimeMicros == 0 || 
pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                        flush();
+                    }
+                }
+            } else {
+                addListAcknowledgment(messageIds);
+                if (acknowledgementGroupTimeMicros == 0 || 
pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                    flush();
+                }
+                return CompletableFuture.completedFuture(null);
+            }
         }
-        messageIds.forEach(messageId -> {
+    }
+
+    private void addListAcknowledgment(List<MessageId> messageIds) {
+        for (MessageId messageId : messageIds) {
+            consumer.onAcknowledge(messageId, null);
             if (messageId instanceof BatchMessageIdImpl) {
                 BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
messageId;
-                pendingIndividualAcks.add(new 
MessageIdImpl(batchMessageId.getLedgerId(),
-                        batchMessageId.getEntryId(), 
batchMessageId.getPartitionIndex()));
+                if (!batchMessageId.ackIndividual()) {
+                    doIndividualBatchAckAsync((BatchMessageIdImpl) messageId);
+                } else {
+                    messageId = 
modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
+                    doIndividualAckAsync((MessageIdImpl) messageId);
+                }
             } else {
-                pendingIndividualAcks.add(messageId);
+                modifyMessageIdStatesInConsumer((MessageIdImpl) messageId);
+                doIndividualAckAsync((MessageIdImpl) messageId);
             }
-            pendingIndividualBatchIndexAcks.remove(messageId);
-            if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
-            }
-        });
-        if (acknowledgementGroupTimeMicros == 0) {
-            flush();
         }
     }
 
     @Override
-    public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, 
Map<String, Long> properties,
-                                  TransactionImpl txn) {
-        if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty() ||
-                (txn != null && ackType == AckType.Cumulative)) {
-                if (msgId instanceof BatchMessageIdImpl && txn != null) {
-                    BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
msgId;
-                    doImmediateBatchIndexAck(batchMessageId, 
batchMessageId.getBatchIndex(),
-                            batchMessageId.getBatchIndex(),
-                            ackType, properties, txn.getTxnIdMostBits(), 
txn.getTxnIdLeastBits());
-                    return;
+    public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, 
AckType ackType,
+                                                     Map<String, Long> 
properties) {
+        if (msgId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
+            if (ackType == AckType.Individual) {
+                consumer.onAcknowledge(msgId, null);
+                // ack this ack carry bitSet index and judge bit set are all 
ack
+                if (batchMessageId.ackIndividual()) {
+                    MessageIdImpl messageId = 
modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
+                    return doIndividualAck(messageId, properties);
+                } else if (batchIndexAckEnabled){
+                    return doIndividualBatchAck(batchMessageId, properties);
+                } else {
+                    // if we prevent batchIndexAck, we can't send the ack 
command to broker when the batch message are
+                    // all ack complete
+                    return CompletableFuture.completedFuture(null);
                 }
-            // We cannot group acks if the delay is 0 or when there are 
properties attached to it. Fortunately that's an
-            // uncommon condition since it's only used for the compaction 
subscription.
-            doImmediateAck(msgId, ackType, properties, txn);
-        } else if (ackType == AckType.Cumulative) {
-            doCumulativeAck(msgId, null);
-        } else {
-            // Individual ack
-            if (msgId instanceof BatchMessageIdImpl) {
-                pendingIndividualAcks.add(new 
MessageIdImpl(msgId.getLedgerId(),
-                        msgId.getEntryId(), msgId.getPartitionIndex()));
             } else {
-                if (txn != null) {
-                    pendingIndividualTransactionAcks
-                            .add(Triple.of(txn.getTxnIdMostBits(), 
txn.getTxnIdLeastBits(), msgId));
+                consumer.onAcknowledgeCumulative(msgId, null);
+                if (batchMessageId.ackCumulative()) {
+                    return doCumulativeAck(msgId, properties, null);
                 } else {
-                    pendingIndividualAcks.add(msgId);
+                    if (batchIndexAckEnabled) {
+                        return doCumulativeBatchIndexAck(batchMessageId, 
properties);
+                    } else {
+                        // ack the pre messageId, because we prevent the 
batchIndexAck, we can ensure pre messageId can
+                        // ack
+                        if (AckType.Cumulative == ackType
+                                && 
!batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
+                            
doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
+                            
batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
+                        }
+                        return CompletableFuture.completedFuture(null);
+                    }
                 }
             }
-            pendingIndividualBatchIndexAcks.remove(msgId);
-            if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
+        } else {
+            if (ackType == AckType.Individual) {
+                consumer.onAcknowledge(msgId, null);
+                modifyMessageIdStatesInConsumer(msgId);
+                return doIndividualAck(msgId, properties);
+            } else {
+                consumer.onAcknowledgeCumulative(msgId, null);
+                return doCumulativeAck(msgId, properties, null);
             }
         }
     }
 
-    public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int 
batchIndex, int batchSize, AckType ackType,
-                                            Map<String, Long> properties, 
TransactionImpl txn) {
-        if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
-            doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType, 
properties,
-                    txn == null ? -1 : txn.getTxnIdMostBits(),
-                    txn == null ? -1 : txn.getTxnIdLeastBits());
-        } else if (ackType == AckType.Cumulative) {
-            BitSetRecyclable bitSet = BitSetRecyclable.create();
-            bitSet.set(0, batchSize);
-            bitSet.clear(0, batchIndex + 1);
-            doCumulativeAck(msgId, bitSet);
-        } else if (ackType == AckType.Individual) {
-            ConcurrentBitSetRecyclable bitSet;
-            if (txn != null) {
-                synchronized (txn) {
-                    ConcurrentHashMap<MessageIdImpl, 
ConcurrentBitSetRecyclable> transactionIndividualBatchIndexAcks =
-                            pendingIndividualTransactionBatchIndexAcks
-                                    .computeIfAbsent(txn, (v) -> new 
ConcurrentHashMap<>());
-                    bitSet = 
transactionIndividualBatchIndexAcks.computeIfAbsent(msgId, (v) -> {
-                        ConcurrentBitSetRecyclable value;
-                        value = ConcurrentBitSetRecyclable.create();
-                        value.set(0, msgId.getAcker().getBatchSize());
-                        return value;
-                    });
-                    bitSet.clear(batchIndex);
+    private MessageIdImpl 
modifyBatchMessageIdAndStatesInConsumer(BatchMessageIdImpl batchMessageId) {
+        MessageIdImpl messageId = new 
MessageIdImpl(batchMessageId.getLedgerId(),
+                batchMessageId.getEntryId(), 
batchMessageId.getPartitionIndex());
+        
consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
+        clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
+        return messageId;
+    }
+
+    private void modifyMessageIdStatesInConsumer(MessageIdImpl messageId) {
+        consumer.getStats().incrementNumAcksSent(1);
+        clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
+    }
+
+    private void clearMessageIdFromUnAckTrackerAndDeadLetter(MessageIdImpl 
messageId) {
+        consumer.getUnAckedMessageTracker().remove(messageId);
+        if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
+            
consumer.getPossibleSendToDeadLetterTopicMessages().remove(messageId);
+        }
+    }
+
+    private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId, 
Map<String, Long> properties) {
+        if (acknowledgementGroupTimeMicros == 0 || (properties != null && 
!properties.isEmpty())) {
+            // We cannot group acks if the delay is 0 or when there are 
properties attached to it. Fortunately that's an
+            // uncommon condition since it's only used for the compaction 
subscription.
+            return doImmediateAck(messageId, AckType.Individual, properties, 
null);
+        } else {
+            if (isAckReceiptEnabled(consumer.getClientCnx())) {
+                // when flush the ack, we should bind the this ack in the 
currentFuture, during this time we can't
+                // change currentFuture. but we can lock by the read lock, 
because the currentFuture is not change
+                // any ack operation is allowed.
+                this.lock.readLock().lock();
+                try {
+                    doIndividualAckAsync(messageId);
+                    return this.currentIndividualAckFuture;
+                } finally {
+                    this.lock.readLock().unlock();
+                    if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                        flush();
+                    }
                 }
             } else {
-                bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
-                new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), 
msgId.getPartitionIndex()), (v) -> {
-                            ConcurrentBitSetRecyclable value;
-                            if (msgId.getAcker() != null && !(msgId.getAcker() 
instanceof BatchMessageAckerDisabled)) {
-                                value = 
ConcurrentBitSetRecyclable.create(msgId.getAcker().getBitSet());
-                            } else {
-                                value = ConcurrentBitSetRecyclable.create();
-                                value.set(0, batchSize);
-                            }
-                            return value;
-                        });
-                bitSet.clear(batchIndex);
+                doIndividualAckAsync(messageId);
+                if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                    flush();
+                }
+                return CompletableFuture.completedFuture(null);
             }
-            if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
+        }
+    }
+
+
+    private void doIndividualAckAsync(MessageIdImpl messageId) {
+        pendingIndividualAcks.add(messageId);
+        pendingIndividualBatchIndexAcks.remove(messageId);
+    }
+
+    private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl 
batchMessageId,
+                                                         Map<String, Long> 
properties) {
+        if (acknowledgementGroupTimeMicros == 0 || (properties != null && 
!properties.isEmpty())) {
+            return doImmediateBatchIndexAck(batchMessageId, 
batchMessageId.getBatchIndex(),
+                    batchMessageId.getBatchSize(), AckType.Individual, 
properties);
+        } else {
+            return doIndividualBatchAck(batchMessageId);
+        }
+    }
+
+    private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl 
batchMessageId) {
+        if (isAckReceiptEnabled(consumer.getClientCnx())) {
+            // when flush the ack, we should bind the this ack in the 
currentFuture, during this time we can't
+            // change currentFuture. but we can lock by the read lock, because 
the currentFuture is not change
+            // any ack operation is allowed.
+            this.lock.readLock().lock();
+            try {
+                doIndividualBatchAckAsync(batchMessageId);
+                return this.currentIndividualAckFuture;
+            } finally {
+                this.lock.readLock().unlock();
             }
+        } else {
+            doIndividualBatchAckAsync(batchMessageId);
+            return CompletableFuture.completedFuture(null);
         }
     }
 
-    private void doCumulativeAck(MessageIdImpl msgId, BitSetRecyclable bitSet) 
{
-        // Handle concurrent updates from different threads
-        while (true) {
-            MessageIdImpl lastCumlativeAck = this.lastCumulativeAck;
-            BitSetRecyclable lastBitSet = this.lastCumulativeAckSet;
-            if (msgId.compareTo(lastCumlativeAck) > 0) {
-                if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, 
lastCumlativeAck, msgId) && LAST_CUMULATIVE_ACK_SET_UPDATER.compareAndSet(this, 
lastBitSet, bitSet)) {
-                    if (lastBitSet != null) {
-                        try {
-                            lastBitSet.recycle();
-                        } catch (Exception ignore) {
-                            // no-op
-                        }
+    private CompletableFuture<Void> doCumulativeAck(MessageIdImpl messageId, 
Map<String, Long> properties,
+                                                    BitSetRecyclable bitSet) {
+        
consumer.getStats().incrementNumAcksSent(consumer.getUnAckedMessageTracker().removeMessagesTill(messageId));
+        if (acknowledgementGroupTimeMicros == 0 || (properties != null && 
!properties.isEmpty())) {
+            // We cannot group acks if the delay is 0 or when there are 
properties attached to it. Fortunately that's an
+            // uncommon condition since it's only used for the compaction 
subscription.
+            return doImmediateAck(messageId, AckType.Cumulative, properties, 
bitSet);
+        } else {
+            if (isAckReceiptEnabled(consumer.getClientCnx())) {
+                // when flush the ack, we should bind the this ack in the 
currentFuture, during this time we can't
+                // change currentFuture. but we can lock by the read lock, 
because the currentFuture is not change
+                // any ack operation is allowed.
+                this.lock.readLock().lock();
+                try {
+                    doCumulativeAckAsync(messageId, bitSet);
+                    return this.currentCumulativeAckFuture;
+                } finally {
+                    this.lock.readLock().unlock();
+                    if (pendingIndividualBatchIndexAcks.size() >= 
MAX_ACK_GROUP_SIZE) {
+                        flush();
                     }
-                    // Successfully updated the last cumulative ack. Next 
flush iteration will send this to broker.
-                    cumulativeAckFlushRequired = true;
-                    return;
                 }
             } else {
-                // message id acknowledging an before the current last 
cumulative ack
-                return;
+                doCumulativeAckAsync(messageId, bitSet);
+                if (pendingIndividualBatchIndexAcks.size() >= 
MAX_ACK_GROUP_SIZE) {
+                    flush();
+                }
+                return CompletableFuture.completedFuture(null);
             }
         }
     }
 
-    private void doTransactionCumulativeAck(MessageIdImpl msgId, 
BitSetRecyclable bitSet) {
+    private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {
+        ConcurrentBitSetRecyclable bitSet = 
pendingIndividualBatchIndexAcks.computeIfAbsent(
+                new MessageIdImpl(batchMessageId.getLedgerId(), 
batchMessageId.getEntryId(),
+                        batchMessageId.getPartitionIndex()), (v) -> {
+                    ConcurrentBitSetRecyclable value;
+                    if (batchMessageId.getAcker() != null &&
+                            !(batchMessageId.getAcker() instanceof 
BatchMessageAckerDisabled)) {
+                        value = 
ConcurrentBitSetRecyclable.create(batchMessageId.getAcker().getBitSet());
+                    } else {
+                        value = ConcurrentBitSetRecyclable.create();
+                        value.set(0, batchMessageId.getBatchIndex());
+                    }
+                    return value;
+                });
+        bitSet.clear(batchMessageId.getBatchIndex());
+    }
+
+    private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable 
bitSet) {
         // Handle concurrent updates from different threads
+        LastCumulativeAck currentCumulativeAck = 
LastCumulativeAck.create(msgId, bitSet);
         while (true) {
-            MessageIdImpl lastCumlativeAck = this.lastCumulativeAck;
-            BitSetRecyclable lastBitSet = this.lastCumulativeAckSet;
-            if (msgId.compareTo(lastCumlativeAck) > 0) {
-                if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, 
lastCumlativeAck, msgId) && LAST_CUMULATIVE_ACK_SET_UPDATER.compareAndSet(this, 
lastBitSet, bitSet)) {
-                    if (lastBitSet != null) {
+            LastCumulativeAck lastCumulativeAck = this.lastCumulativeAck;
+            if (msgId.compareTo(lastCumulativeAck.messageId) > 0) {
+                if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, 
this.lastCumulativeAck, currentCumulativeAck)) {
+                    if (lastCumulativeAck.bitSetRecyclable != null) {
                         try {
-                            lastBitSet.recycle();
+                            lastCumulativeAck.bitSetRecyclable.recycle();
                         } catch (Exception ignore) {
                             // no-op
                         }
+                        lastCumulativeAck.bitSetRecyclable = null;
                     }
+                    lastCumulativeAck.recycle();
                     // Successfully updated the last cumulative ack. Next 
flush iteration will send this to broker.
                     cumulativeAckFlushRequired = true;
                     return;
                 }
             } else {
+                currentCumulativeAck.recycle();
                 // message id acknowledging an before the current last 
cumulative ack
                 return;
             }
         }
     }
 
-    private boolean doImmediateAck(MessageIdImpl msgId, AckType ackType, 
Map<String, Long> properties,
-                                   TransactionImpl transaction) {
+    private CompletableFuture<Void> 
doCumulativeBatchIndexAck(BatchMessageIdImpl batchMessageId,
+                                                              Map<String, 
Long> properties) {
+        if (acknowledgementGroupTimeMicros == 0 || (properties != null && 
!properties.isEmpty())) {
+            return doImmediateBatchIndexAck(batchMessageId, 
batchMessageId.getBatchIndex(),
+                    batchMessageId.getBatchSize(), AckType.Cumulative, 
properties);
+        } else {
+            BitSetRecyclable bitSet = BitSetRecyclable.create();
+            bitSet.set(0, batchMessageId.getBatchSize());
+            bitSet.clear(0, batchMessageId.getBatchIndex() + 1);
+            return doCumulativeAck(batchMessageId, null, bitSet);
+        }
+    }
+
+    private CompletableFuture<Void> doImmediateAck(MessageIdImpl msgId, 
AckType ackType, Map<String, Long> properties,
+                                                   BitSetRecyclable bitSet) {
         ClientCnx cnx = consumer.getClientCnx();
 
         if (cnx == null) {
-            return false;
+            return FutureUtil.failedFuture(new PulsarClientException
+                    .ConnectException("Consumer connect fail! consumer state:" 
+ consumer.getState()));
         }
-        if (transaction != null) {
-            newAckCommand(consumer.consumerId, msgId, null, ackType, null,
-                    properties, cnx, true /* flush */, 
transaction.getTxnIdMostBits(),
-                    transaction.getTxnIdLeastBits());
-        } else {
-            newAckCommand(consumer.consumerId, msgId, null, ackType, null,
-                    properties, cnx, true /* flush */, -1, -1);
-        }
-        return true;
+        return newImmediateAckAndFlush(consumer.consumerId, msgId, bitSet, 
ackType, properties, cnx);
     }
 
-    private boolean doImmediateBatchIndexAck(BatchMessageIdImpl msgId, int 
batchIndex, int batchSize, AckType ackType,
-                                             Map<String, Long> properties, 
long txnidMostBits, long txnidLeastBits) {
+    private CompletableFuture<Void> 
doImmediateBatchIndexAck(BatchMessageIdImpl msgId, int batchIndex, int 
batchSize, AckType ackType,
+                                             Map<String, Long> properties) {
         ClientCnx cnx = consumer.getClientCnx();
 
         if (cnx == null) {
-            return false;
+            return FutureUtil.failedFuture(new PulsarClientException
+                    .ConnectException("Consumer connect fail! consumer state:" 
+ consumer.getState()));
         }
         BitSetRecyclable bitSet;
         if (msgId.getAcker() != null && !(msgId.getAcker() instanceof 
BatchMessageAckerDisabled)) {
@@ -309,11 +433,10 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
             bitSet.clear(batchIndex);
         }
 
-        final ByteBuf cmd = Commands.newAck(consumer.consumerId, 
msgId.ledgerId, msgId.entryId, bitSet, ackType,
-                null, properties, txnidLeastBits, txnidMostBits, -1);
+        CompletableFuture<Void> completableFuture = 
newMessageAckCommandAndWrite(cnx, consumer.consumerId,
+                msgId.ledgerId, msgId.entryId, bitSet, ackType, null, 
properties, true, null, null);
         bitSet.recycle();
-        cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
-        return true;
+        return completableFuture;
     }
 
     /**
@@ -330,16 +453,32 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
             return;
         }
 
+        if (isAckReceiptEnabled(consumer.getClientCnx())) {
+            this.lock.writeLock().lock();
+            try {
+                flushAsync(cnx);
+            } finally {
+                this.lock.writeLock().unlock();
+            }
+        } else {
+            flushAsync(cnx);
+        }
+    }
+
+    private void flushAsync(ClientCnx cnx) {
         boolean shouldFlush = false;
         if (cumulativeAckFlushRequired) {
-            newAckCommand(consumer.consumerId, lastCumulativeAck, 
lastCumulativeAckSet, AckType.Cumulative, null, Collections.emptyMap(), cnx, 
false /* flush */, -1, -1);
-            shouldFlush=true;
+            newMessageAckCommandAndWrite(cnx, consumer.consumerId, 
lastCumulativeAck.messageId.ledgerId,
+                    lastCumulativeAck.messageId.getEntryId(), 
lastCumulativeAck.bitSetRecyclable,
+                    AckType.Cumulative, null, Collections.emptyMap(), false,
+                    this.currentCumulativeAckFuture, null);
+            
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(lastCumulativeAck.messageId);
+            shouldFlush = true;
             cumulativeAckFlushRequired = false;
         }
 
         // Flush all individual acks
         List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = 
new ArrayList<>(pendingIndividualAcks.size() + 
pendingIndividualBatchIndexAcks.size());
-        HashMap<TransactionImpl, List<Triple<Long, Long, 
ConcurrentBitSetRecyclable>>> transactionEntriesToAck = new HashMap<>();
         if (!pendingIndividualAcks.isEmpty()) {
             if 
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()))
 {
                 // We can send 1 single protobuf command with all individual 
acks
@@ -371,8 +510,8 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
                     if (msgId == null) {
                         break;
                     }
-
-                    newAckCommand(consumer.consumerId, msgId, null, 
AckType.Individual, null, Collections.emptyMap(), cnx, false, -1, -1);
+                    newMessageAckCommandAndWrite(cnx, consumer.consumerId, 
msgId.getLedgerId(), msgId.getEntryId(), null,
+                            AckType.Individual, null, Collections.emptyMap(), 
false, null, null);
                     shouldFlush = true;
                 }
             }
@@ -388,88 +527,10 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
             }
         }
 
-        if (!pendingIndividualTransactionAcks.isEmpty()) {
-            if 
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()))
 {
-                // We can send 1 single protobuf command with all individual 
acks
-                while (true) {
-                    Triple<Long, Long, MessageIdImpl> entry = 
pendingIndividualTransactionAcks.pollFirst();
-                    if (entry == null) {
-                        break;
-                    }
-
-                    // if messageId is checked then all the chunked related to 
that msg also processed so, ack all of
-                    // them
-                    MessageIdImpl[] chunkMsgIds = 
this.consumer.unAckedChunkedMessageIdSequenceMap.get(entry.getRight());
-                    long mostSigBits = entry.getLeft();
-                    long leastSigBits = entry.getMiddle();
-                    MessageIdImpl messageId = entry.getRight();
-                    if (chunkMsgIds != null && chunkMsgIds.length > 1) {
-                        for (MessageIdImpl cMsgId : chunkMsgIds) {
-                            if (cMsgId != null) {
-                                newAckCommand(consumer.consumerId, cMsgId, 
null, AckType.Individual, null, Collections.emptyMap(), cnx, false, 
mostSigBits, leastSigBits);
-                            }
-                        }
-                        // messages will be acked so, remove checked message 
sequence
-                        
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(messageId);
-                    } else {
-                        newAckCommand(consumer.consumerId, messageId, null, 
AckType.Individual, null, Collections.emptyMap(), cnx, false, mostSigBits, 
leastSigBits);
-                    }
-                }
-            } else {
-                // When talking to older brokers, send the acknowledgements 
individually
-                while (true) {
-                    Triple<Long, Long, MessageIdImpl> entry = 
pendingIndividualTransactionAcks.pollFirst();
-                    if (entry == null) {
-                        break;
-                    }
-
-                    newAckCommand(consumer.consumerId, entry.getRight(), null, 
AckType.Individual,
-                            null, Collections.emptyMap(), cnx, false, 
entry.getLeft(), entry.getMiddle());
-                    shouldFlush = true;
-                }
-            }
-        }
-
-        if (!pendingIndividualTransactionBatchIndexAcks.isEmpty()) {
-            Iterator<Map.Entry<TransactionImpl, 
ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>>> 
transactionIterator = 
pendingIndividualTransactionBatchIndexAcks.entrySet().iterator();
-            while (transactionIterator.hasNext()) {
-                Map.Entry<TransactionImpl, ConcurrentHashMap<MessageIdImpl, 
ConcurrentBitSetRecyclable>> transactionEntry = transactionIterator.next();
-                TransactionImpl txn = transactionEntry.getKey();
-                synchronized (txn) {
-                    if 
(pendingIndividualTransactionBatchIndexAcks.containsKey(txn)) {
-                        List<Triple<Long, Long, ConcurrentBitSetRecyclable>> 
messageIdBitSetList = new ArrayList<>();
-                        transactionEntriesToAck.put(txn, messageIdBitSetList);
-                        Iterator<Map.Entry<MessageIdImpl, 
ConcurrentBitSetRecyclable>> messageIdIterator = 
transactionEntry.getValue().entrySet().iterator();
-                        while (messageIdIterator.hasNext()) {
-                            Map.Entry<MessageIdImpl, 
ConcurrentBitSetRecyclable> messageIdEntry = messageIdIterator.next();
-                            ConcurrentBitSetRecyclable 
concurrentBitSetRecyclable =
-                                    
ConcurrentBitSetRecyclable.create(messageIdEntry.getValue());
-                            MessageIdImpl messageId = messageIdEntry.getKey();
-                            
messageIdBitSetList.add(Triple.of(messageId.ledgerId, messageId.entryId, 
concurrentBitSetRecyclable));
-                            messageIdEntry.getValue().set(0, 
messageIdEntry.getValue().size());
-                            messageIdIterator.remove();
-                        }
-                        transactionIterator.remove();
-                    }
-                }
-            }
-        }
-
-        if (transactionEntriesToAck.size() > 0) {
-            Iterator<Map.Entry<TransactionImpl, List<Triple<Long, Long, 
ConcurrentBitSetRecyclable>>>> iterator =
-                    transactionEntriesToAck.entrySet().iterator();
-            while (iterator.hasNext()) {
-                Map.Entry<TransactionImpl, List<Triple<Long, Long, 
ConcurrentBitSetRecyclable>>> entry = iterator.next();
-                
cnx.ctx().write(Commands.newMultiTransactionMessageAck(consumer.consumerId,
-                        new TxnID(entry.getKey().getTxnIdMostBits(),
-                                entry.getKey().getTxnIdLeastBits()), 
entry.getValue()), cnx.ctx().voidPromise());
-                shouldFlush = true;
-            }
-        }
-
         if (entriesToAck.size() > 0) {
-            cnx.ctx().write(Commands.newMultiMessageAck(consumer.consumerId, 
entriesToAck),
-                cnx.ctx().voidPromise());
+
+            newMessageAckCommandAndWrite(cnx, consumer.consumerId, 0L, 0L,
+                    null, AckType.Individual, null, null, true, 
currentIndividualAckFuture, entriesToAck);
             shouldFlush = true;
         }
 
@@ -480,12 +541,13 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
             }
             cnx.ctx().flush();
         }
+
     }
 
     @Override
     public void flushAndClean() {
         flush();
-        lastCumulativeAck = (MessageIdImpl) MessageId.earliest;
+        lastCumulativeAck = LastCumulativeAck.create((MessageIdImpl) 
MessageIdImpl.earliest, null);
         pendingIndividualAcks.clear();
     }
 
@@ -497,46 +559,127 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
         }
     }
 
-    private void newAckCommand(long consumerId, MessageIdImpl msgId, 
BitSetRecyclable lastCumulativeAckSet,
-            AckType ackType, ValidationError validationError, Map<String, 
Long> map, ClientCnx cnx,
-                               boolean flush, long txnidMostBits, long 
txnidLeastBits) {
-
-        MessageIdImpl[] chunkMsgIds = 
this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId);
-        if (chunkMsgIds != null && txnidLeastBits < 0 && txnidMostBits < 0) {
-            if 
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())
-                    && ackType != AckType.Cumulative) {
+    private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId, 
MessageIdImpl msgId,
+                                                            BitSetRecyclable 
bitSet, AckType ackType,
+                                                            Map<String, Long> 
map, ClientCnx cnx) {
+        MessageIdImpl[] chunkMsgIds = 
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
+        final CompletableFuture<Void> completableFuture;
+        // cumulative ack chunk by the last messageId
+        if (chunkMsgIds != null &&  ackType != AckType.Cumulative) {
+            if 
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()))
 {
                 List<Triple<Long, Long, ConcurrentBitSetRecyclable>> 
entriesToAck = new ArrayList<>(chunkMsgIds.length);
                 for (MessageIdImpl cMsgId : chunkMsgIds) {
                     if (cMsgId != null && chunkMsgIds.length > 1) {
                         entriesToAck.add(Triple.of(cMsgId.getLedgerId(), 
cMsgId.getEntryId(), null));
                     }
                 }
-                ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, 
entriesToAck);
-                if (flush) {
-                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
-                } else {
-                    cnx.ctx().write(cmd, cnx.ctx().voidPromise());
-                }
+                completableFuture = newMessageAckCommandAndWrite(cnx, 
consumer.consumerId, 0L, 0L,
+                        null, ackType, null, null, true, null, entriesToAck);
             } else {
+                // if don't support multi message ack, it also support ack 
receipt, so we should not think about the
+                // ack receipt in this logic
                 for (MessageIdImpl cMsgId : chunkMsgIds) {
-                    ByteBuf cmd = Commands.newAck(consumerId, 
cMsgId.getLedgerId(), cMsgId.getEntryId(),
-                            lastCumulativeAckSet, ackType, validationError, 
map);
-                    if (flush) {
-                        cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
-                    } else {
-                        cnx.ctx().write(cmd, cnx.ctx().voidPromise());
-                    }
+                    newMessageAckCommandAndWrite(cnx, consumerId, 
cMsgId.getLedgerId(), cMsgId.getEntryId(),
+                            bitSet, ackType, null, map, true, null, null);
+                }
+                completableFuture = CompletableFuture.completedFuture(null);
+            }
+        } else {
+            completableFuture = newMessageAckCommandAndWrite(cnx, consumerId, 
msgId.ledgerId, msgId.getEntryId(),
+                    bitSet, ackType, null, map, true, null, null);
+        }
+        return completableFuture;
+    }
+
+    private CompletableFuture<Void> newMessageAckCommandAndWrite(ClientCnx 
cnx, long consumerId, long ledgerId,
+                                                                 long entryId, 
BitSetRecyclable ackSet, AckType ackType,
+                                                                 
CommandAck.ValidationError validationError,
+                                                                 Map<String, 
Long> properties, boolean flush,
+                                                                 
TimedCompletableFuture<Void> timedCompletableFuture,
+                                                                 
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck) {
+        if (isAckReceiptEnabled(consumer.getClientCnx())) {
+            final long requestId = consumer.getClient().newRequestId();
+            final ByteBuf cmd;
+            if (entriesToAck == null) {
+                cmd = Commands.newAck(consumerId, ledgerId, entryId, ackSet,
+                        ackType, null, properties, requestId);
+            } else {
+                cmd = Commands.newMultiMessageAck(consumerId, entriesToAck, 
requestId);
+            }
+            if (timedCompletableFuture == null) {
+                return cnx.newAckForReceipt(cmd, requestId);
+            } else {
+                if (ackType == AckType.Individual) {
+                    this.currentIndividualAckFuture = new 
TimedCompletableFuture<>();
+                } else {
+                    this.currentCumulativeAckFuture = new 
TimedCompletableFuture<>();
                 }
+                cnx.newAckForReceiptWithFuture(cmd, requestId, 
timedCompletableFuture);
+                return timedCompletableFuture;
             }
-            this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
         } else {
-            ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), 
msgId.getEntryId(), lastCumulativeAckSet,
-                    ackType, validationError, map, txnidLeastBits, 
txnidMostBits, -1);
+            // client cnx don't support ack receipt, if we don't complete the 
future, the client will block.
+            if (ackReceiptEnabled) {
+                synchronized (PersistentAcknowledgmentsGroupingTracker.this) {
+                    if (!this.currentCumulativeAckFuture.isDone()) {
+                        this.currentCumulativeAckFuture.complete(null);
+                    }
+
+                    if (!this.currentIndividualAckFuture.isDone()) {
+                        this.currentIndividualAckFuture.complete(null);
+                    }
+                }
+            }
+            final ByteBuf cmd;
+            if (entriesToAck == null) {
+                cmd = Commands.newAck(consumerId, ledgerId, entryId, ackSet,
+                        ackType, null, properties, -1);
+            } else {
+                cmd = Commands.newMultiMessageAck(consumerId, entriesToAck, 
-1);
+            }
             if (flush) {
                 cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
             } else {
                 cnx.ctx().write(cmd, cnx.ctx().voidPromise());
             }
+            return CompletableFuture.completedFuture(null);
         }
     }
+
+    private boolean isAckReceiptEnabled(ClientCnx cnx) {
+        return ackReceiptEnabled && cnx != null
+                && 
Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
+    }
+
+    private static class LastCumulativeAck {
+        private MessageIdImpl messageId;
+        private BitSetRecyclable bitSetRecyclable;
+
+        static LastCumulativeAck create(MessageIdImpl messageId, 
BitSetRecyclable bitSetRecyclable) {
+            LastCumulativeAck op = RECYCLER.get();
+            op.messageId = messageId;
+            op.bitSetRecyclable = bitSetRecyclable;
+            return op;
+        }
+
+        private LastCumulativeAck(Recycler.Handle<LastCumulativeAck> 
recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
+        }
+
+        void recycle() {
+            if (bitSetRecyclable != null) {
+                this.bitSetRecyclable.recycle();
+            }
+            this.messageId = null;
+            recyclerHandle.recycle(this);
+        }
+
+        private final Recycler.Handle<LastCumulativeAck> recyclerHandle;
+        private static final Recycler<LastCumulativeAck> RECYCLER = new 
Recycler<LastCumulativeAck>() {
+            @Override
+            protected LastCumulativeAck newObject(Handle<LastCumulativeAck> 
handle) {
+                return new LastCumulativeAck(handle);
+            }
+        };
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 665970c..b01cba4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -129,6 +129,8 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
 
     private boolean batchIndexAckEnabled = false;
 
+    private boolean ackReceiptEnabled = false;
+
     public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit 
timeUnit) {
         checkArgument(interval > 0, "interval needs to be > 0");
         this.autoUpdatePartitionsIntervalSeconds = 
timeUnit.toSeconds(interval);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index 9d95e98..a780a88 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -18,24 +18,31 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 
 import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.util.TimedCompletableFuture;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 public class AcknowledgementsGroupingTrackerTest {
@@ -45,25 +52,38 @@ public class AcknowledgementsGroupingTrackerTest {
     private EventLoopGroup eventLoopGroup;
 
     @BeforeClass
-    public void setup() {
+    public void setup() throws NoSuchFieldException, IllegalAccessException {
         eventLoopGroup = new NioEventLoopGroup(1);
         consumer = mock(ConsumerImpl.class);
         consumer.unAckedChunkedMessageIdSequenceMap = new 
ConcurrentOpenHashMap<>();
-        cnx = mock(ClientCnx.class);
+        cnx = spy(new ClientCnxTest(new ClientConfigurationData(), new 
NioEventLoopGroup()));
+        PulsarClientImpl client = mock(PulsarClientImpl.class);
+        doReturn(client).when(consumer).getClient();
+        doReturn(cnx).when(consumer).getClientCnx();
+        doReturn(new ConsumerStatsRecorderImpl()).when(consumer).getStats();
+        doReturn(new UnAckedMessageTracker().UNACKED_MESSAGE_TRACKER_DISABLED)
+                .when(consumer).getUnAckedMessageTracker();
         ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
         when(cnx.ctx()).thenReturn(ctx);
     }
 
+    @DataProvider(name = "isNeedReceipt")
+    public Object[][] isNeedReceipt() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+
     @AfterClass
     public void teardown() {
         eventLoopGroup.shutdownGracefully();
     }
 
-    @Test
-    public void testAckTracker() throws Exception {
+    @Test(dataProvider = "isNeedReceipt")
+    public void testAckTracker(boolean isNeedReceipt) throws Exception {
         ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
         conf.setAcknowledgementsGroupTimeMicros(TimeUnit.SECONDS.toMicros(10));
-        PersistentAcknowledgmentsGroupingTracker tracker = new 
PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
+        conf.setAckReceiptEnabled(isNeedReceipt);
+        AcknowledgmentsGroupingTracker tracker;
+        tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, 
eventLoopGroup);
 
         MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0);
         MessageIdImpl msg2 = new MessageIdImpl(5, 2, 0);
@@ -74,12 +94,12 @@ public class AcknowledgementsGroupingTrackerTest {
 
         assertFalse(tracker.isDuplicate(msg1));
 
-        tracker.addAcknowledgment(msg1, AckType.Individual, 
Collections.emptyMap(), null);
+        tracker.addAcknowledgment(msg1, AckType.Individual, 
Collections.emptyMap());
         assertTrue(tracker.isDuplicate(msg1));
 
         assertFalse(tracker.isDuplicate(msg2));
 
-        tracker.addAcknowledgment(msg5, AckType.Cumulative, 
Collections.emptyMap(), null);
+        tracker.addAcknowledgment(msg5, AckType.Cumulative, 
Collections.emptyMap());
         assertTrue(tracker.isDuplicate(msg1));
         assertTrue(tracker.isDuplicate(msg2));
         assertTrue(tracker.isDuplicate(msg3));
@@ -99,7 +119,7 @@ public class AcknowledgementsGroupingTrackerTest {
         assertTrue(tracker.isDuplicate(msg5));
         assertFalse(tracker.isDuplicate(msg6));
 
-        tracker.addAcknowledgment(msg6, AckType.Individual, 
Collections.emptyMap(), null);
+        tracker.addAcknowledgment(msg6, AckType.Individual, 
Collections.emptyMap());
         assertTrue(tracker.isDuplicate(msg6));
 
         when(consumer.getClientCnx()).thenReturn(cnx);
@@ -117,11 +137,13 @@ public class AcknowledgementsGroupingTrackerTest {
         tracker.close();
     }
 
-    @Test
-    public void testBatchAckTracker() throws Exception {
+    @Test(dataProvider = "isNeedReceipt")
+    public void testBatchAckTracker(boolean isNeedReceipt) throws Exception {
         ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
         conf.setAcknowledgementsGroupTimeMicros(TimeUnit.SECONDS.toMicros(10));
-        PersistentAcknowledgmentsGroupingTracker tracker = new 
PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
+        conf.setAckReceiptEnabled(isNeedReceipt);
+        AcknowledgmentsGroupingTracker tracker;
+        tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, 
eventLoopGroup);
 
         MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0);
         MessageIdImpl msg2 = new MessageIdImpl(5, 2, 0);
@@ -175,11 +197,13 @@ public class AcknowledgementsGroupingTrackerTest {
         tracker.close();
     }
 
-    @Test
-    public void testImmediateAckingTracker() throws Exception {
+    @Test(dataProvider = "isNeedReceipt")
+    public void testImmediateAckingTracker(boolean isNeedReceipt) throws 
Exception {
         ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
         conf.setAcknowledgementsGroupTimeMicros(0);
-        PersistentAcknowledgmentsGroupingTracker tracker = new 
PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
+        conf.setAckReceiptEnabled(isNeedReceipt);
+        AcknowledgmentsGroupingTracker tracker;
+        tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, 
eventLoopGroup);
 
         MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0);
         MessageIdImpl msg2 = new MessageIdImpl(5, 2, 0);
@@ -188,7 +212,7 @@ public class AcknowledgementsGroupingTrackerTest {
 
         when(consumer.getClientCnx()).thenReturn(null);
 
-        tracker.addAcknowledgment(msg1, AckType.Individual, 
Collections.emptyMap(), null);
+        tracker.addAcknowledgment(msg1, AckType.Individual, 
Collections.emptyMap());
         assertFalse(tracker.isDuplicate(msg1));
 
         when(consumer.getClientCnx()).thenReturn(cnx);
@@ -196,17 +220,19 @@ public class AcknowledgementsGroupingTrackerTest {
         tracker.flush();
         assertFalse(tracker.isDuplicate(msg1));
 
-        tracker.addAcknowledgment(msg2, AckType.Individual, 
Collections.emptyMap(), null);
+        tracker.addAcknowledgment(msg2, AckType.Individual, 
Collections.emptyMap());
         // Since we were connected, the ack went out immediately
         assertFalse(tracker.isDuplicate(msg2));
         tracker.close();
     }
 
-    @Test
-    public void testImmediateBatchAckingTracker() throws Exception {
+    @Test(dataProvider = "isNeedReceipt")
+    public void testImmediateBatchAckingTracker(boolean isNeedReceipt) throws 
Exception {
         ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
         conf.setAcknowledgementsGroupTimeMicros(0);
-        PersistentAcknowledgmentsGroupingTracker tracker = new 
PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
+        conf.setAckReceiptEnabled(isNeedReceipt);
+        AcknowledgmentsGroupingTracker tracker;
+        tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, 
eventLoopGroup);
 
         MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0);
         MessageIdImpl msg2 = new MessageIdImpl(5, 2, 0);
@@ -216,8 +242,6 @@ public class AcknowledgementsGroupingTrackerTest {
         when(consumer.getClientCnx()).thenReturn(null);
 
         tracker.addListAcknowledgment(Collections.singletonList(msg1), 
AckType.Individual, Collections.emptyMap());
-        tracker.flush();
-        //cnx is null can not flush
         assertTrue(tracker.isDuplicate(msg1));
 
         when(consumer.getClientCnx()).thenReturn(cnx);
@@ -226,16 +250,20 @@ public class AcknowledgementsGroupingTrackerTest {
         assertFalse(tracker.isDuplicate(msg1));
 
         tracker.addListAcknowledgment(Collections.singletonList(msg2), 
AckType.Individual, Collections.emptyMap());
+
+        tracker.flush();
         // Since we were connected, the ack went out immediately
         assertFalse(tracker.isDuplicate(msg2));
         tracker.close();
     }
 
-    @Test
-    public void testAckTrackerMultiAck() throws Exception {
+    @Test(dataProvider = "isNeedReceipt")
+    public void testAckTrackerMultiAck(boolean isNeedReceipt) throws Exception 
{
         ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
         conf.setAcknowledgementsGroupTimeMicros(TimeUnit.SECONDS.toMicros(10));
-        PersistentAcknowledgmentsGroupingTracker tracker = new 
PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
+        conf.setAckReceiptEnabled(isNeedReceipt);
+        AcknowledgmentsGroupingTracker tracker;
+        tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, 
eventLoopGroup);
 
         
when(cnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12_VALUE);
 
@@ -248,12 +276,12 @@ public class AcknowledgementsGroupingTrackerTest {
 
         assertFalse(tracker.isDuplicate(msg1));
 
-        tracker.addAcknowledgment(msg1, AckType.Individual, 
Collections.emptyMap(), null);
+        tracker.addAcknowledgment(msg1, AckType.Individual, 
Collections.emptyMap());
         assertTrue(tracker.isDuplicate(msg1));
 
         assertFalse(tracker.isDuplicate(msg2));
 
-        tracker.addAcknowledgment(msg5, AckType.Cumulative, 
Collections.emptyMap(), null);
+        tracker.addAcknowledgment(msg5, AckType.Cumulative, 
Collections.emptyMap());
         assertTrue(tracker.isDuplicate(msg1));
         assertTrue(tracker.isDuplicate(msg2));
         assertTrue(tracker.isDuplicate(msg3));
@@ -273,7 +301,7 @@ public class AcknowledgementsGroupingTrackerTest {
         assertTrue(tracker.isDuplicate(msg5));
         assertFalse(tracker.isDuplicate(msg6));
 
-        tracker.addAcknowledgment(msg6, AckType.Individual, 
Collections.emptyMap(), null);
+        tracker.addAcknowledgment(msg6, AckType.Individual, 
Collections.emptyMap());
         assertTrue(tracker.isDuplicate(msg6));
 
         when(consumer.getClientCnx()).thenReturn(cnx);
@@ -291,11 +319,13 @@ public class AcknowledgementsGroupingTrackerTest {
         tracker.close();
     }
 
-    @Test
-    public void testBatchAckTrackerMultiAck() throws Exception {
+    @Test(dataProvider = "isNeedReceipt")
+    public void testBatchAckTrackerMultiAck(boolean isNeedReceipt) throws 
Exception {
         ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
         conf.setAcknowledgementsGroupTimeMicros(TimeUnit.SECONDS.toMicros(10));
-        PersistentAcknowledgmentsGroupingTracker tracker = new 
PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
+        conf.setAckReceiptEnabled(isNeedReceipt);
+        AcknowledgmentsGroupingTracker tracker;
+        tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, 
eventLoopGroup);
 
         
when(cnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12_VALUE);
 
@@ -350,4 +380,21 @@ public class AcknowledgementsGroupingTrackerTest {
 
         tracker.close();
     }
+
+    public class ClientCnxTest extends ClientCnx {
+
+        public ClientCnxTest(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup) {
+            super(conf, eventLoopGroup);
+        }
+
+        @Override
+        public CompletableFuture<Void> newAckForReceipt(ByteBuf request, long 
requestId) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public void newAckForReceiptWithFuture(ByteBuf request, long requestId,
+                                               TimedCompletableFuture<Void> 
future) {
+        }
+    }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java
index d0bd9b5..eb5207b 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java
@@ -23,6 +23,7 @@ import io.netty.channel.*;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.util.TimedCompletableFuture;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.testng.annotations.*;
 
@@ -86,6 +87,18 @@ public class ClientCnxRequestTimeoutQueueTest {
     }
 
     @Test
+    void testNewAckForResponseNoFlushTimeout() {
+        assertFutureTimesOut(cnx.newAckForReceipt(requestMessage, 1L));
+    }
+
+    @Test
+    void testNewAckForResponseFlushTimeout() {
+        TimedCompletableFuture<Void> timedCompletableFuture = new 
TimedCompletableFuture<>();
+        cnx.newAckForReceiptWithFuture(requestMessage, 1L, 
timedCompletableFuture);
+        assertFutureTimesOut(timedCompletableFuture);
+    }
+
+    @Test
     void testGetSchemaRequestTimeout() {
         assertFutureTimesOut(cnx.sendGetRawSchema(requestMessage, 1L));
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index e16ed9e..a29ceb1 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -858,18 +858,22 @@ public class Commands {
     }
 
     public static ByteBuf newMultiMessageAck(long consumerId,
-            List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) {
+                                             List<Triple<Long, Long, 
ConcurrentBitSetRecyclable>> entries,
+                                             long requestId) {
         BaseCommand cmd = newMultiMessageAckCommon(entries);
         cmd.getAck()
                 .setConsumerId(consumerId)
                 .setAckType(AckType.Individual);
+            if (requestId >= 0) {
+                cmd.getAck().setRequestId(requestId);
+            }
         return serializeWithSize(cmd);
     }
 
     public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, 
BitSetRecyclable ackSet, AckType ackType,
-                                 ValidationError validationError, Map<String, 
Long> properties) {
+                                 ValidationError validationError, Map<String, 
Long> properties, long requestId) {
         return newAck(consumerId, ledgerId, entryId, ackSet, ackType, 
validationError,
-                properties, -1L, -1L, -1L, -1);
+                properties, -1L, -1L, requestId, -1);
     }
 
     public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, 
BitSetRecyclable ackSet, AckType ackType,
@@ -1693,6 +1697,10 @@ public class Commands {
         return peerVersion >= ProtocolVersion.v15.getValue();
     }
 
+    public static boolean peerSupportsAckReceipt(int peerVersion) {
+        return peerVersion >= ProtocolVersion.v17.getValue();
+    }
+
     private static org.apache.pulsar.common.api.proto.ProducerAccessMode 
convertProducerAccessMode(ProducerAccessMode accessMode) {
         switch (accessMode) {
         case Exclusive:
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 5f3d427..ee5ac70 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -255,6 +255,7 @@ enum ProtocolVersion {
               // Added Key_Shared subscription
     v15 = 15; // Add CommandGetOrCreateSchema and 
CommandGetOrCreateSchemaResponse
     v16 = 16; // Add support for raw message metadata
+    v17 = 17; // Added support ack receipt
 }
 
 message CommandConnect {

Reply via email to