codelipenghui commented on a change in pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#discussion_r557265929



##########
File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
##########
@@ -864,6 +864,32 @@ public TransactionConflictException(String msg) {
         }
     }
 
+    /**
+     * Consumer ack for response timeout.
+     */
+    public static class AckResponseTimeoutException extends 
PulsarClientException {

Review comment:
       Is the `PulsarClientException.TimeoutException` works?

##########
File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
##########
@@ -186,6 +186,14 @@
      */
     ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
 
+    /**
+     * Ack will return response but does not mean that the message will not be 
resent after get response.
+     *
+     * @param ackResponseEnabled {@link Boolean} is enable ack for response
+     * @return the consumer builder instance
+     */
+    ConsumerBuilder<T> enableAckResponse(boolean ackResponseEnabled);

Review comment:
       ```suggestion
       ConsumerBuilder<T> isAckReceiptEnabled(boolean ackReceiptEnabled);
   ```

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -330,16 +462,41 @@ public void flush() {
             return;
         }
 
+        if (ackResponseEnabled) {
+            this.lock.writeLock().lock();
+            try {
+                flushAsync(cnx);
+            } finally {
+                this.lock.writeLock().unlock();
+            }
+        } else {
+            flushAsync(cnx);
+        }
+    }
+
+    private void flushAsync(ClientCnx cnx) {
         boolean shouldFlush = false;
         if (cumulativeAckFlushRequired) {
-            newAckCommand(consumer.consumerId, lastCumulativeAck, 
lastCumulativeAckSet, AckType.Cumulative, null, Collections.emptyMap(), cnx, 
false /* flush */, -1, -1);
+            if (ackResponseEnabled) {
+                long requestId = consumer.getClient().newRequestId();
+                ByteBuf cmd = Commands.newAck(consumer.consumerId, 
lastCumulativeAck.messageId.ledgerId,
+                        lastCumulativeAck.messageId.getEntryId(), 
lastCumulativeAck.bitSetRecyclable,
+                        AckType.Cumulative, null, Collections.emptyMap(), 
requestId);
+                cnx.newAckForResponseWithFuture(cmd, requestId, 
currentCumulativeAckFuture);
+                this.currentCumulativeAckFuture = new 
TimedCompletableFuture<>();
+            } else {
+                ByteBuf cmd = Commands.newAck(consumer.consumerId, 
lastCumulativeAck.messageId.ledgerId,
+                        lastCumulativeAck.messageId.getEntryId(), 
lastCumulativeAck.bitSetRecyclable,
+                        AckType.Cumulative, null, Collections.emptyMap(), -1);
+                cnx.ctx().write(cmd, cnx.ctx().voidPromise());

Review comment:
       Please optimize duplicate code

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -115,186 +122,302 @@ public boolean isDuplicate(MessageId messageId) {
     }
 
     @Override
-    public void addListAcknowledgment(List<MessageIdImpl> messageIds, AckType 
ackType, Map<String, Long> properties) {
-        if (ackType == AckType.Cumulative) {
-            messageIds.forEach(messageId -> doCumulativeAck(messageId, null));
-            return;
+    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> 
messageIds,
+                                                         AckType ackType, 
Map<String, Long> properties) {
+        if (AckType.Cumulative.equals(ackType)) {
+            if (ackResponseEnabled) {
+                Set<CompletableFuture<Void>> completableFutureSet = new 
HashSet<>();
+                messageIds.forEach(messageId ->
+                        
completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, 
properties)));
+                return FutureUtil.waitForAll(new 
ArrayList<>(completableFutureSet));
+            } else {
+                messageIds.forEach(messageId -> 
addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
+                return CompletableFuture.completedFuture(null);
+            }
+        } else {
+            if (ackResponseEnabled) {
+                try {
+                    // when flush the ack, we should bind the this ack in the 
currentFuture, during this time we can't
+                    // change currentFuture. but we can lock by the read lock, 
because the currentFuture is not change
+                    // any ack operation is allowed.
+                    this.lock.readLock().lock();
+                    addListAcknowledgment(messageIds);
+                    return this.currentIndividualAckFuture;
+                } finally {
+                    this.lock.readLock().unlock();
+                    if (acknowledgementGroupTimeMicros == 0 || 
pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                        flush();
+                    }
+                }
+            } else {
+                addListAcknowledgment(messageIds);
+                if (acknowledgementGroupTimeMicros == 0 || 
pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                    flush();
+                }
+                return CompletableFuture.completedFuture(null);
+            }
         }
-        messageIds.forEach(messageId -> {
+    }
+
+    private void addListAcknowledgment(List<MessageId> messageIds) {
+        for (MessageId messageId : messageIds) {
+            consumer.onAcknowledge(messageId, null);
             if (messageId instanceof BatchMessageIdImpl) {
                 BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
messageId;
-                pendingIndividualAcks.add(new 
MessageIdImpl(batchMessageId.getLedgerId(),
-                        batchMessageId.getEntryId(), 
batchMessageId.getPartitionIndex()));
+                if (!batchMessageId.ackIndividual()) {
+                    doIndividualBatchAckAsync((BatchMessageIdImpl) messageId);
+                } else {
+                    messageId = 
modifyBatchMessageIdAndStatusInConsumer(batchMessageId);
+                    doIndividualAckAsync((MessageIdImpl) messageId);
+                }
             } else {
-                pendingIndividualAcks.add(messageId);
-            }
-            pendingIndividualBatchIndexAcks.remove(messageId);
-            if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
+                modifyMessageIdStatusInConsumer((MessageIdImpl) messageId);
+                doIndividualAckAsync((MessageIdImpl) messageId);
             }
-        });
-        if (acknowledgementGroupTimeMicros == 0) {
-            flush();
         }
     }
 
     @Override
-    public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, 
Map<String, Long> properties,
-                                  TransactionImpl txn) {
-        if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty() ||
-                (txn != null && ackType == AckType.Cumulative)) {
-                if (msgId instanceof BatchMessageIdImpl && txn != null) {
-                    BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
msgId;
-                    doImmediateBatchIndexAck(batchMessageId, 
batchMessageId.getBatchIndex(),
-                            batchMessageId.getBatchIndex(),
-                            ackType, properties, txn.getTxnIdMostBits(), 
txn.getTxnIdLeastBits());
-                    return;
+    public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, 
AckType ackType,
+                                                     Map<String, Long> 
properties) {
+        if (msgId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
+            if (ackType == AckType.Individual) {
+                consumer.onAcknowledge(msgId, null);
+                // ack this ack carry bitSet index and judge bit set are all 
ack
+                if (batchMessageId.ackIndividual()) {
+                    MessageIdImpl messageId = 
modifyBatchMessageIdAndStatusInConsumer(batchMessageId);
+                    return doIndividualAck(messageId, properties);
+                } else if (batchIndexAckEnabled){
+                    return doIndividualBatchAck(batchMessageId, properties);
+                } else {
+                    // if we prevent batchIndexAck, we can't send the ack 
command to broker when the batch message are
+                    // all ack complete
+                    return CompletableFuture.completedFuture(null);
                 }
-            // We cannot group acks if the delay is 0 or when there are 
properties attached to it. Fortunately that's an
-            // uncommon condition since it's only used for the compaction 
subscription.
-            doImmediateAck(msgId, ackType, properties, txn);
-        } else if (ackType == AckType.Cumulative) {
-            doCumulativeAck(msgId, null);
-        } else {
-            // Individual ack
-            if (msgId instanceof BatchMessageIdImpl) {
-                pendingIndividualAcks.add(new 
MessageIdImpl(msgId.getLedgerId(),
-                        msgId.getEntryId(), msgId.getPartitionIndex()));
             } else {
-                if (txn != null) {
-                    pendingIndividualTransactionAcks
-                            .add(Triple.of(txn.getTxnIdMostBits(), 
txn.getTxnIdLeastBits(), msgId));
+                consumer.onAcknowledgeCumulative(msgId, null);
+                if (((BatchMessageIdImpl) msgId).ackCumulative()) {
+                    return doCumulativeAck(msgId, properties, null);
                 } else {
-                    pendingIndividualAcks.add(msgId);
+                    if (batchIndexAckEnabled) {
+                        return doCumulativeBatchAck(batchMessageId, 
properties);
+                    } else {
+                        // ack the pre messageId, because we prevent the 
batchIndexAck, we can ensure pre messageId can
+                        // ack
+                        if (AckType.Cumulative == ackType
+                                && 
!batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
+                            
doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
+                            
batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
+                        }
+                        return CompletableFuture.completedFuture(null);
+                    }
                 }
             }
-            pendingIndividualBatchIndexAcks.remove(msgId);
-            if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
+        } else {
+            if (ackType == AckType.Individual) {
+                consumer.onAcknowledge(msgId, null);
+                modifyMessageIdStatusInConsumer(msgId);
+                return doIndividualAck(msgId, properties);
+            } else {
+                consumer.onAcknowledgeCumulative(msgId, null);
+                return doCumulativeAck(msgId, properties, null);
             }
         }
     }
 
-    public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int 
batchIndex, int batchSize, AckType ackType,
-                                            Map<String, Long> properties, 
TransactionImpl txn) {
-        if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
-            doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType, 
properties,
-                    txn == null ? -1 : txn.getTxnIdMostBits(),
-                    txn == null ? -1 : txn.getTxnIdLeastBits());
-        } else if (ackType == AckType.Cumulative) {
-            BitSetRecyclable bitSet = BitSetRecyclable.create();
-            bitSet.set(0, batchSize);
-            bitSet.clear(0, batchIndex + 1);
-            doCumulativeAck(msgId, bitSet);
-        } else if (ackType == AckType.Individual) {
-            ConcurrentBitSetRecyclable bitSet;
-            if (txn != null) {
-                synchronized (txn) {
-                    ConcurrentHashMap<MessageIdImpl, 
ConcurrentBitSetRecyclable> transactionIndividualBatchIndexAcks =
-                            pendingIndividualTransactionBatchIndexAcks
-                                    .computeIfAbsent(txn, (v) -> new 
ConcurrentHashMap<>());
-                    bitSet = 
transactionIndividualBatchIndexAcks.computeIfAbsent(msgId, (v) -> {
-                        ConcurrentBitSetRecyclable value;
-                        value = ConcurrentBitSetRecyclable.create();
-                        value.set(0, msgId.getAcker().getBatchSize());
-                        return value;
-                    });
-                    bitSet.clear(batchIndex);
+    private MessageIdImpl 
modifyBatchMessageIdAndStatusInConsumer(BatchMessageIdImpl batchMessageId) {
+        MessageIdImpl messageId = new 
MessageIdImpl(batchMessageId.getLedgerId(),
+                batchMessageId.getEntryId(), 
batchMessageId.getPartitionIndex());
+        
consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
+        modifyMessageIdStatusInConsumerCommon(messageId);
+        return messageId;
+    }
+
+    private void modifyMessageIdStatusInConsumer(MessageIdImpl messageId) {
+        consumer.getStats().incrementNumAcksSent(1);
+        modifyMessageIdStatusInConsumerCommon(messageId);
+    }
+
+    private void modifyMessageIdStatusInConsumerCommon(MessageIdImpl 
messageId) {
+        consumer.getUnAckedMessageTracker().remove(messageId);
+        if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
+            
consumer.getPossibleSendToDeadLetterTopicMessages().remove(messageId);
+        }
+    }

Review comment:
       These methods not clear here. Stats, not status. And should split the 
update stats method and cleanup consumer method stay independent, this will 
improve the code readability

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -497,46 +586,84 @@ public void close() {
         }
     }
 
-    private void newAckCommand(long consumerId, MessageIdImpl msgId, 
BitSetRecyclable lastCumulativeAckSet,
-            AckType ackType, ValidationError validationError, Map<String, 
Long> map, ClientCnx cnx,
-                               boolean flush, long txnidMostBits, long 
txnidLeastBits) {
-
-        MessageIdImpl[] chunkMsgIds = 
this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId);
-        if (chunkMsgIds != null && txnidLeastBits < 0 && txnidMostBits < 0) {
-            if 
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())
-                    && ackType != AckType.Cumulative) {
+    private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId, 
MessageIdImpl msgId,
+                                                            BitSetRecyclable 
bitSet, AckType ackType,
+                                                            Map<String, Long> 
map, ClientCnx cnx) {
+        MessageIdImpl[] chunkMsgIds = 
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
+        final CompletableFuture<Void> completableFuture;
+        // cumulative ack chunk by the last messageId
+        if (chunkMsgIds != null &&  ackType != AckType.Cumulative) {
+            if 
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()))
 {
                 List<Triple<Long, Long, ConcurrentBitSetRecyclable>> 
entriesToAck = new ArrayList<>(chunkMsgIds.length);
                 for (MessageIdImpl cMsgId : chunkMsgIds) {
                     if (cMsgId != null && chunkMsgIds.length > 1) {
                         entriesToAck.add(Triple.of(cMsgId.getLedgerId(), 
cMsgId.getEntryId(), null));
                     }
                 }
-                ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, 
entriesToAck);
-                if (flush) {
-                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                if (ackResponseEnabled) {
+                    long requestId = consumer.getClient().newRequestId();
+                    ByteBuf cmd = 
Commands.newMultiMessageAck(consumer.consumerId, entriesToAck, requestId);
+                    completableFuture = cnx.newAckForResponse(cmd, requestId);

Review comment:
       Should write the command to the broker?  And please consider reducing 
the duplicate code 

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
##########
@@ -816,25 +833,35 @@ SocketAddress serverAddrees() {
     }
 
     CompletableFuture<ProducerResponse> sendRequestWithId(ByteBuf cmd, long 
requestId) {
-        return sendRequestAndHandleTimeout(cmd, requestId, 
RequestType.Command);
+        return sendRequestAndHandleTimeout(cmd, requestId, 
RequestType.Command, true,
+                new TimedCompletableFuture<>());
     }
 
-    private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf 
requestMessage, long requestId, RequestType requestType) {
-        TimedCompletableFuture<T> future = new TimedCompletableFuture<>();
+    private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf 
requestMessage, long requestId,
+                                                                 RequestType 
requestType, boolean flush,
+                                                                 
TimedCompletableFuture<T> future) {

Review comment:
       Do we need to return the future again?

##########
File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
##########
@@ -186,6 +186,14 @@
      */
     ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
 
+    /**
+     * Ack will return response but does not mean that the message will not be 
resent after get response.
+     *
+     * @param ackResponseEnabled {@link Boolean} is enable ack for response
+     * @return the consumer builder instance
+     */
+    ConsumerBuilder<T> enableAckResponse(boolean ackResponseEnabled);

Review comment:
       I think `response` here a little bit ambiguous since the client also can 
get the returned future without this feature. 

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -118,175 +122,302 @@ public boolean isDuplicate(MessageId messageId) {
     }
 
     @Override
-    public CompletableFuture<Void> addListAcknowledgment(List<MessageIdImpl> 
messageIds,
+    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> 
messageIds,
                                                          AckType ackType, 
Map<String, Long> properties) {
-        if (ackType == AckType.Cumulative) {
-            messageIds.forEach(messageId -> doCumulativeAck(messageId, null));
-            return CompletableFuture.completedFuture(null);
+        if (AckType.Cumulative.equals(ackType)) {
+            if (ackResponseEnabled) {
+                Set<CompletableFuture<Void>> completableFutureSet = new 
HashSet<>();
+                messageIds.forEach(messageId ->
+                        
completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, 
properties)));
+                return FutureUtil.waitForAll(new 
ArrayList<>(completableFutureSet));
+            } else {
+                messageIds.forEach(messageId -> 
addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
+                return CompletableFuture.completedFuture(null);
+            }
+        } else {
+            if (ackResponseEnabled) {
+                try {
+                    // when flush the ack, we should bind the this ack in the 
currentFuture, during this time we can't
+                    // change currentFuture. but we can lock by the read lock, 
because the currentFuture is not change
+                    // any ack operation is allowed.
+                    this.lock.readLock().lock();
+                    addListAcknowledgment(messageIds);
+                    return this.currentIndividualAckFuture;
+                } finally {
+                    this.lock.readLock().unlock();
+                    if (acknowledgementGroupTimeMicros == 0 || 
pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                        flush();
+                    }
+                }
+            } else {
+                addListAcknowledgment(messageIds);
+                if (acknowledgementGroupTimeMicros == 0 || 
pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                    flush();
+                }
+                return CompletableFuture.completedFuture(null);
+            }
         }
-        messageIds.forEach(messageId -> {
+    }
+
+    private void addListAcknowledgment(List<MessageId> messageIds) {
+        for (MessageId messageId : messageIds) {
+            consumer.onAcknowledge(messageId, null);
             if (messageId instanceof BatchMessageIdImpl) {
                 BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
messageId;
-                pendingIndividualAcks.add(new 
MessageIdImpl(batchMessageId.getLedgerId(),
-                        batchMessageId.getEntryId(), 
batchMessageId.getPartitionIndex()));
+                if (!batchMessageId.ackIndividual()) {
+                    doIndividualBatchAckAsync((BatchMessageIdImpl) messageId);
+                } else {
+                    messageId = 
modifyBatchMessageIdAndStatusInConsumer(batchMessageId);
+                    doIndividualAckAsync((MessageIdImpl) messageId);
+                }
             } else {
-                pendingIndividualAcks.add(messageId);
-            }
-            pendingIndividualBatchIndexAcks.remove(messageId);
-            if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
+                modifyMessageIdStatusInConsumer((MessageIdImpl) messageId);
+                doIndividualAckAsync((MessageIdImpl) messageId);
             }
-        });
-        if (acknowledgementGroupTimeMicros == 0) {
-            flush();
         }
-        return CompletableFuture.completedFuture(null);
     }
 
     @Override
-    public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, 
AckType ackType, Map<String, Long> properties,
-                                  TransactionImpl txn) {
-        if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty() ||
-                (txn != null && ackType == AckType.Cumulative)) {
-                if (msgId instanceof BatchMessageIdImpl && txn != null) {
-                    BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
msgId;
-                    doImmediateBatchIndexAck(batchMessageId, 
batchMessageId.getBatchIndex(),
-                            batchMessageId.getBatchIndex(),
-                            ackType, properties, txn.getTxnIdMostBits(), 
txn.getTxnIdLeastBits());
+    public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, 
AckType ackType,
+                                                     Map<String, Long> 
properties) {
+        if (msgId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
+            if (ackType == AckType.Individual) {
+                consumer.onAcknowledge(msgId, null);
+                // ack this ack carry bitSet index and judge bit set are all 
ack
+                if (batchMessageId.ackIndividual()) {
+                    MessageIdImpl messageId = 
modifyBatchMessageIdAndStatusInConsumer(batchMessageId);
+                    return doIndividualAck(messageId, properties);
+                } else if (batchIndexAckEnabled){
+                    return doIndividualBatchAck(batchMessageId, properties);
+                } else {
+                    // if we prevent batchIndexAck, we can't send the ack 
command to broker when the batch message are
+                    // all ack complete
                     return CompletableFuture.completedFuture(null);
                 }
-            // We cannot group acks if the delay is 0 or when there are 
properties attached to it. Fortunately that's an
-            // uncommon condition since it's only used for the compaction 
subscription.
-            doImmediateAck(msgId, ackType, properties, txn);
-        } else if (ackType == AckType.Cumulative) {
-            doCumulativeAck(msgId, null);
-        } else {
-            // Individual ack
-            if (msgId instanceof BatchMessageIdImpl) {
-                pendingIndividualAcks.add(new 
MessageIdImpl(msgId.getLedgerId(),
-                        msgId.getEntryId(), msgId.getPartitionIndex()));
             } else {
-                if (txn != null) {
-                    pendingIndividualTransactionAcks
-                            .add(Triple.of(txn.getTxnIdMostBits(), 
txn.getTxnIdLeastBits(), msgId));
+                consumer.onAcknowledgeCumulative(msgId, null);
+                if (((BatchMessageIdImpl) msgId).ackCumulative()) {

Review comment:
       line 183 is checked, I think @congbobo184 you can use `batchMessageId` 
since you already convert to `BatchMessageIdImpl` in 184

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
##########
@@ -816,25 +833,35 @@ SocketAddress serverAddrees() {
     }
 
     CompletableFuture<ProducerResponse> sendRequestWithId(ByteBuf cmd, long 
requestId) {
-        return sendRequestAndHandleTimeout(cmd, requestId, 
RequestType.Command);
+        return sendRequestAndHandleTimeout(cmd, requestId, 
RequestType.Command, true,
+                new TimedCompletableFuture<>());
     }
 
-    private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf 
requestMessage, long requestId, RequestType requestType) {
-        TimedCompletableFuture<T> future = new TimedCompletableFuture<>();
+    private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf 
requestMessage, long requestId,
+                                                                 RequestType 
requestType, boolean flush,
+                                                                 
TimedCompletableFuture<T> future) {

Review comment:
       I think you can use 2 methods to handle this case
   
   ```
   private <T> void sendRequestAndHandleTimeout(ByteBuf requestMessage, long 
requestId,
                                                                    RequestType 
requestType, boolean flush,
                                                                    
TimedCompletableFuture<T> future) {
   
   }
   ```
   
   ```
   private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf 
requestMessage, long requestId, RequestType requestType, boolean flush) {
          TimedCompletableFuture<T> future = new TimedCompletableFuture<>();
           sendRequestAndHandleTimeout(... future);
           return future;
   }
   ```

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -497,46 +586,84 @@ public void close() {
         }
     }
 
-    private void newAckCommand(long consumerId, MessageIdImpl msgId, 
BitSetRecyclable lastCumulativeAckSet,
-            AckType ackType, ValidationError validationError, Map<String, 
Long> map, ClientCnx cnx,
-                               boolean flush, long txnidMostBits, long 
txnidLeastBits) {
-
-        MessageIdImpl[] chunkMsgIds = 
this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId);
-        if (chunkMsgIds != null && txnidLeastBits < 0 && txnidMostBits < 0) {
-            if 
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())
-                    && ackType != AckType.Cumulative) {
+    private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId, 
MessageIdImpl msgId,
+                                                            BitSetRecyclable 
bitSet, AckType ackType,
+                                                            Map<String, Long> 
map, ClientCnx cnx) {
+        MessageIdImpl[] chunkMsgIds = 
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
+        final CompletableFuture<Void> completableFuture;
+        // cumulative ack chunk by the last messageId
+        if (chunkMsgIds != null &&  ackType != AckType.Cumulative) {
+            if 
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()))
 {
                 List<Triple<Long, Long, ConcurrentBitSetRecyclable>> 
entriesToAck = new ArrayList<>(chunkMsgIds.length);
                 for (MessageIdImpl cMsgId : chunkMsgIds) {
                     if (cMsgId != null && chunkMsgIds.length > 1) {
                         entriesToAck.add(Triple.of(cMsgId.getLedgerId(), 
cMsgId.getEntryId(), null));
                     }
                 }
-                ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, 
entriesToAck);
-                if (flush) {
-                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                if (ackResponseEnabled) {
+                    long requestId = consumer.getClient().newRequestId();
+                    ByteBuf cmd = 
Commands.newMultiMessageAck(consumer.consumerId, entriesToAck, requestId);
+                    completableFuture = cnx.newAckForResponse(cmd, requestId);
                 } else {
-                    cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+                    ByteBuf cmd = 
Commands.newMultiMessageAck(consumer.consumerId, entriesToAck, -1);
+                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                    completableFuture = 
CompletableFuture.completedFuture(null);
                 }
             } else {
+                // if don't support multi message ack, it also support ack 
response, so we should not think about the
+                // ack response in this logic
                 for (MessageIdImpl cMsgId : chunkMsgIds) {
                     ByteBuf cmd = Commands.newAck(consumerId, 
cMsgId.getLedgerId(), cMsgId.getEntryId(),
-                            lastCumulativeAckSet, ackType, validationError, 
map);
-                    if (flush) {
-                        cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
-                    } else {
-                        cnx.ctx().write(cmd, cnx.ctx().voidPromise());
-                    }
+                            bitSet, ackType, null, map, -1);
+                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
                 }
+                completableFuture = CompletableFuture.completedFuture(null);
             }
-            this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
         } else {
-            ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), 
msgId.getEntryId(), lastCumulativeAckSet,
-                    ackType, validationError, map, txnidLeastBits, 
txnidMostBits, -1);
-            if (flush) {
-                cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+            if (ackResponseEnabled) {
+                long requestId = consumer.getClient().newRequestId();
+                ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), 
msgId.getEntryId(), bitSet,
+                        ackType, null, map, requestId);
+                completableFuture = cnx.newAckForResponse(cmd, requestId);
             } else {
-                cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+                ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), 
msgId.getEntryId(), bitSet,
+                        ackType, null, map, -1);
+                cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                completableFuture = CompletableFuture.completedFuture(null);
+            }
+        }
+        return completableFuture;

Review comment:
       I think we can use a method `getRequestId()` and a method 
`getCompletableFuture()`. I think this will make the logic simpler

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -497,46 +586,84 @@ public void close() {
         }
     }
 
-    private void newAckCommand(long consumerId, MessageIdImpl msgId, 
BitSetRecyclable lastCumulativeAckSet,
-            AckType ackType, ValidationError validationError, Map<String, 
Long> map, ClientCnx cnx,
-                               boolean flush, long txnidMostBits, long 
txnidLeastBits) {
-
-        MessageIdImpl[] chunkMsgIds = 
this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId);
-        if (chunkMsgIds != null && txnidLeastBits < 0 && txnidMostBits < 0) {
-            if 
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())
-                    && ackType != AckType.Cumulative) {
+    private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId, 
MessageIdImpl msgId,
+                                                            BitSetRecyclable 
bitSet, AckType ackType,
+                                                            Map<String, Long> 
map, ClientCnx cnx) {
+        MessageIdImpl[] chunkMsgIds = 
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
+        final CompletableFuture<Void> completableFuture;
+        // cumulative ack chunk by the last messageId
+        if (chunkMsgIds != null &&  ackType != AckType.Cumulative) {
+            if 
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()))
 {
                 List<Triple<Long, Long, ConcurrentBitSetRecyclable>> 
entriesToAck = new ArrayList<>(chunkMsgIds.length);
                 for (MessageIdImpl cMsgId : chunkMsgIds) {
                     if (cMsgId != null && chunkMsgIds.length > 1) {
                         entriesToAck.add(Triple.of(cMsgId.getLedgerId(), 
cMsgId.getEntryId(), null));
                     }
                 }
-                ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, 
entriesToAck);
-                if (flush) {
-                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                if (ackResponseEnabled) {
+                    long requestId = consumer.getClient().newRequestId();
+                    ByteBuf cmd = 
Commands.newMultiMessageAck(consumer.consumerId, entriesToAck, requestId);
+                    completableFuture = cnx.newAckForResponse(cmd, requestId);
                 } else {
-                    cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+                    ByteBuf cmd = 
Commands.newMultiMessageAck(consumer.consumerId, entriesToAck, -1);
+                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                    completableFuture = 
CompletableFuture.completedFuture(null);
                 }
             } else {
+                // if don't support multi message ack, it also support ack 
response, so we should not think about the
+                // ack response in this logic

Review comment:
       Why only support for muti message ack. Single message acknowledge also 
can enable ack receipt?

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -118,175 +122,302 @@ public boolean isDuplicate(MessageId messageId) {
     }
 
     @Override
-    public CompletableFuture<Void> addListAcknowledgment(List<MessageIdImpl> 
messageIds,
+    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> 
messageIds,
                                                          AckType ackType, 
Map<String, Long> properties) {
-        if (ackType == AckType.Cumulative) {
-            messageIds.forEach(messageId -> doCumulativeAck(messageId, null));
-            return CompletableFuture.completedFuture(null);
+        if (AckType.Cumulative.equals(ackType)) {
+            if (ackResponseEnabled) {
+                Set<CompletableFuture<Void>> completableFutureSet = new 
HashSet<>();
+                messageIds.forEach(messageId ->
+                        
completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, 
properties)));
+                return FutureUtil.waitForAll(new 
ArrayList<>(completableFutureSet));
+            } else {
+                messageIds.forEach(messageId -> 
addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
+                return CompletableFuture.completedFuture(null);
+            }
+        } else {
+            if (ackResponseEnabled) {
+                try {
+                    // when flush the ack, we should bind the this ack in the 
currentFuture, during this time we can't
+                    // change currentFuture. but we can lock by the read lock, 
because the currentFuture is not change
+                    // any ack operation is allowed.
+                    this.lock.readLock().lock();

Review comment:
       +1

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -115,186 +122,302 @@ public boolean isDuplicate(MessageId messageId) {
     }
 
     @Override
-    public void addListAcknowledgment(List<MessageIdImpl> messageIds, AckType 
ackType, Map<String, Long> properties) {
-        if (ackType == AckType.Cumulative) {
-            messageIds.forEach(messageId -> doCumulativeAck(messageId, null));
-            return;
+    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> 
messageIds,
+                                                         AckType ackType, 
Map<String, Long> properties) {
+        if (AckType.Cumulative.equals(ackType)) {
+            if (ackResponseEnabled) {
+                Set<CompletableFuture<Void>> completableFutureSet = new 
HashSet<>();
+                messageIds.forEach(messageId ->
+                        
completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, 
properties)));
+                return FutureUtil.waitForAll(new 
ArrayList<>(completableFutureSet));
+            } else {
+                messageIds.forEach(messageId -> 
addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
+                return CompletableFuture.completedFuture(null);
+            }
+        } else {
+            if (ackResponseEnabled) {
+                try {
+                    // when flush the ack, we should bind the this ack in the 
currentFuture, during this time we can't
+                    // change currentFuture. but we can lock by the read lock, 
because the currentFuture is not change
+                    // any ack operation is allowed.
+                    this.lock.readLock().lock();
+                    addListAcknowledgment(messageIds);
+                    return this.currentIndividualAckFuture;
+                } finally {
+                    this.lock.readLock().unlock();
+                    if (acknowledgementGroupTimeMicros == 0 || 
pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                        flush();
+                    }
+                }
+            } else {
+                addListAcknowledgment(messageIds);
+                if (acknowledgementGroupTimeMicros == 0 || 
pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                    flush();
+                }
+                return CompletableFuture.completedFuture(null);
+            }
         }
-        messageIds.forEach(messageId -> {
+    }
+
+    private void addListAcknowledgment(List<MessageId> messageIds) {
+        for (MessageId messageId : messageIds) {
+            consumer.onAcknowledge(messageId, null);
             if (messageId instanceof BatchMessageIdImpl) {
                 BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
messageId;
-                pendingIndividualAcks.add(new 
MessageIdImpl(batchMessageId.getLedgerId(),
-                        batchMessageId.getEntryId(), 
batchMessageId.getPartitionIndex()));
+                if (!batchMessageId.ackIndividual()) {
+                    doIndividualBatchAckAsync((BatchMessageIdImpl) messageId);
+                } else {
+                    messageId = 
modifyBatchMessageIdAndStatusInConsumer(batchMessageId);
+                    doIndividualAckAsync((MessageIdImpl) messageId);
+                }
             } else {
-                pendingIndividualAcks.add(messageId);
-            }
-            pendingIndividualBatchIndexAcks.remove(messageId);
-            if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
+                modifyMessageIdStatusInConsumer((MessageIdImpl) messageId);
+                doIndividualAckAsync((MessageIdImpl) messageId);
             }
-        });
-        if (acknowledgementGroupTimeMicros == 0) {
-            flush();
         }
     }
 
     @Override
-    public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, 
Map<String, Long> properties,
-                                  TransactionImpl txn) {
-        if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty() ||
-                (txn != null && ackType == AckType.Cumulative)) {
-                if (msgId instanceof BatchMessageIdImpl && txn != null) {
-                    BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
msgId;
-                    doImmediateBatchIndexAck(batchMessageId, 
batchMessageId.getBatchIndex(),
-                            batchMessageId.getBatchIndex(),
-                            ackType, properties, txn.getTxnIdMostBits(), 
txn.getTxnIdLeastBits());
-                    return;
+    public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, 
AckType ackType,
+                                                     Map<String, Long> 
properties) {
+        if (msgId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
+            if (ackType == AckType.Individual) {
+                consumer.onAcknowledge(msgId, null);
+                // ack this ack carry bitSet index and judge bit set are all 
ack
+                if (batchMessageId.ackIndividual()) {
+                    MessageIdImpl messageId = 
modifyBatchMessageIdAndStatusInConsumer(batchMessageId);
+                    return doIndividualAck(messageId, properties);
+                } else if (batchIndexAckEnabled){
+                    return doIndividualBatchAck(batchMessageId, properties);
+                } else {
+                    // if we prevent batchIndexAck, we can't send the ack 
command to broker when the batch message are
+                    // all ack complete
+                    return CompletableFuture.completedFuture(null);
                 }
-            // We cannot group acks if the delay is 0 or when there are 
properties attached to it. Fortunately that's an
-            // uncommon condition since it's only used for the compaction 
subscription.
-            doImmediateAck(msgId, ackType, properties, txn);
-        } else if (ackType == AckType.Cumulative) {
-            doCumulativeAck(msgId, null);
-        } else {
-            // Individual ack
-            if (msgId instanceof BatchMessageIdImpl) {
-                pendingIndividualAcks.add(new 
MessageIdImpl(msgId.getLedgerId(),
-                        msgId.getEntryId(), msgId.getPartitionIndex()));
             } else {
-                if (txn != null) {
-                    pendingIndividualTransactionAcks
-                            .add(Triple.of(txn.getTxnIdMostBits(), 
txn.getTxnIdLeastBits(), msgId));
+                consumer.onAcknowledgeCumulative(msgId, null);
+                if (((BatchMessageIdImpl) msgId).ackCumulative()) {
+                    return doCumulativeAck(msgId, properties, null);
                 } else {
-                    pendingIndividualAcks.add(msgId);
+                    if (batchIndexAckEnabled) {
+                        return doCumulativeBatchAck(batchMessageId, 
properties);
+                    } else {
+                        // ack the pre messageId, because we prevent the 
batchIndexAck, we can ensure pre messageId can
+                        // ack
+                        if (AckType.Cumulative == ackType
+                                && 
!batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
+                            
doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
+                            
batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
+                        }
+                        return CompletableFuture.completedFuture(null);
+                    }
                 }
             }
-            pendingIndividualBatchIndexAcks.remove(msgId);
-            if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
+        } else {
+            if (ackType == AckType.Individual) {
+                consumer.onAcknowledge(msgId, null);
+                modifyMessageIdStatusInConsumer(msgId);
+                return doIndividualAck(msgId, properties);
+            } else {
+                consumer.onAcknowledgeCumulative(msgId, null);
+                return doCumulativeAck(msgId, properties, null);
             }
         }
     }
 
-    public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int 
batchIndex, int batchSize, AckType ackType,
-                                            Map<String, Long> properties, 
TransactionImpl txn) {
-        if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
-            doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType, 
properties,
-                    txn == null ? -1 : txn.getTxnIdMostBits(),
-                    txn == null ? -1 : txn.getTxnIdLeastBits());
-        } else if (ackType == AckType.Cumulative) {
-            BitSetRecyclable bitSet = BitSetRecyclable.create();
-            bitSet.set(0, batchSize);
-            bitSet.clear(0, batchIndex + 1);
-            doCumulativeAck(msgId, bitSet);
-        } else if (ackType == AckType.Individual) {
-            ConcurrentBitSetRecyclable bitSet;
-            if (txn != null) {
-                synchronized (txn) {
-                    ConcurrentHashMap<MessageIdImpl, 
ConcurrentBitSetRecyclable> transactionIndividualBatchIndexAcks =
-                            pendingIndividualTransactionBatchIndexAcks
-                                    .computeIfAbsent(txn, (v) -> new 
ConcurrentHashMap<>());
-                    bitSet = 
transactionIndividualBatchIndexAcks.computeIfAbsent(msgId, (v) -> {
-                        ConcurrentBitSetRecyclable value;
-                        value = ConcurrentBitSetRecyclable.create();
-                        value.set(0, msgId.getAcker().getBatchSize());
-                        return value;
-                    });
-                    bitSet.clear(batchIndex);
+    private MessageIdImpl 
modifyBatchMessageIdAndStatusInConsumer(BatchMessageIdImpl batchMessageId) {
+        MessageIdImpl messageId = new 
MessageIdImpl(batchMessageId.getLedgerId(),
+                batchMessageId.getEntryId(), 
batchMessageId.getPartitionIndex());
+        
consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
+        modifyMessageIdStatusInConsumerCommon(messageId);
+        return messageId;
+    }
+
+    private void modifyMessageIdStatusInConsumer(MessageIdImpl messageId) {
+        consumer.getStats().incrementNumAcksSent(1);
+        modifyMessageIdStatusInConsumerCommon(messageId);
+    }
+
+    private void modifyMessageIdStatusInConsumerCommon(MessageIdImpl 
messageId) {
+        consumer.getUnAckedMessageTracker().remove(messageId);
+        if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
+            
consumer.getPossibleSendToDeadLetterTopicMessages().remove(messageId);
+        }
+    }
+
+    private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId, 
Map<String, Long> properties) {
+        if (acknowledgementGroupTimeMicros == 0 || (properties != null && 
!properties.isEmpty())) {
+            // We cannot group acks if the delay is 0 or when there are 
properties attached to it. Fortunately that's an
+            // uncommon condition since it's only used for the compaction 
subscription.
+            return doImmediateAck(messageId, AckType.Individual, properties, 
null);
+        } else {
+            if (ackResponseEnabled) {
+                // when flush the ack, we should bind the this ack in the 
currentFuture, during this time we can't
+                // change currentFuture. but we can lock by the read lock, 
because the currentFuture is not change
+                // any ack operation is allowed.
+                this.lock.readLock().lock();
+                try {
+                    doIndividualAckAsync(messageId);
+                    return this.currentIndividualAckFuture;
+                } finally {
+                    this.lock.readLock().unlock();
+                    if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                        flush();
+                    }
                 }
             } else {
-                bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
-                new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), 
msgId.getPartitionIndex()), (v) -> {
-                            ConcurrentBitSetRecyclable value;
-                            if (msgId.getAcker() != null && !(msgId.getAcker() 
instanceof BatchMessageAckerDisabled)) {
-                                value = 
ConcurrentBitSetRecyclable.create(msgId.getAcker().getBitSet());
-                            } else {
-                                value = ConcurrentBitSetRecyclable.create();
-                                value.set(0, batchSize);
-                            }
-                            return value;
-                        });
-                bitSet.clear(batchIndex);
+                doIndividualAckAsync(messageId);
+                if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                    flush();
+                }
+                return CompletableFuture.completedFuture(null);
             }
-            if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
+        }
+    }
+
+
+    private void doIndividualAckAsync(MessageIdImpl messageId) {
+        pendingIndividualAcks.add(messageId);
+        pendingIndividualBatchIndexAcks.remove(messageId);
+    }
+
+    private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl 
batchMessageId,
+                                                         Map<String, Long> 
properties) {
+        if (acknowledgementGroupTimeMicros == 0 || (properties != null && 
!properties.isEmpty())) {
+            return doImmediateBatchIndexAck(batchMessageId, 
batchMessageId.getBatchIndex(),
+                    batchMessageId.getBatchSize(), AckType.Individual, 
properties);
+        } else {
+            return doIndividualBatchAck(batchMessageId);
+        }
+    }
+
+    private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl 
batchMessageId) {
+        if (ackResponseEnabled) {
+            // when flush the ack, we should bind the this ack in the 
currentFuture, during this time we can't
+            // change currentFuture. but we can lock by the read lock, because 
the currentFuture is not change
+            // any ack operation is allowed.
+            this.lock.readLock().lock();
+            try {
+                doIndividualBatchAckAsync(batchMessageId);
+                return this.currentIndividualAckFuture;
+            } finally {
+                this.lock.readLock().unlock();
             }
+        } else {
+            doIndividualBatchAckAsync(batchMessageId);
+            return CompletableFuture.completedFuture(null);
         }
     }
 
-    private void doCumulativeAck(MessageIdImpl msgId, BitSetRecyclable bitSet) 
{
-        // Handle concurrent updates from different threads
-        while (true) {
-            MessageIdImpl lastCumlativeAck = this.lastCumulativeAck;
-            BitSetRecyclable lastBitSet = this.lastCumulativeAckSet;
-            if (msgId.compareTo(lastCumlativeAck) > 0) {
-                if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, 
lastCumlativeAck, msgId) && LAST_CUMULATIVE_ACK_SET_UPDATER.compareAndSet(this, 
lastBitSet, bitSet)) {
-                    if (lastBitSet != null) {
-                        try {
-                            lastBitSet.recycle();
-                        } catch (Exception ignore) {
-                            // no-op
-                        }
+    private CompletableFuture<Void> doCumulativeAck(MessageIdImpl messageId, 
Map<String, Long> properties,
+                                                    BitSetRecyclable bitSet) {
+        
consumer.getStats().incrementNumAcksSent(consumer.getUnAckedMessageTracker().removeMessagesTill(messageId));
+        if (acknowledgementGroupTimeMicros == 0 || (properties != null && 
!properties.isEmpty())) {
+            // We cannot group acks if the delay is 0 or when there are 
properties attached to it. Fortunately that's an
+            // uncommon condition since it's only used for the compaction 
subscription.
+            return doImmediateAck(messageId, AckType.Cumulative, properties, 
bitSet);
+        } else {
+            if (ackResponseEnabled) {
+                try {
+                    // when flush the ack, we should bind the this ack in the 
currentFuture, during this time we can't
+                    // change currentFuture. but we can lock by the read lock, 
because the currentFuture is not change
+                    // any ack operation is allowed.
+                    this.lock.readLock().lock();
+                    doCumulativeAckAsync(messageId, bitSet);
+                    return this.currentCumulativeAckFuture;
+                } finally {
+                    this.lock.readLock().unlock();
+                    if (pendingIndividualBatchIndexAcks.size() >= 
MAX_ACK_GROUP_SIZE) {
+                        flush();
                     }
-                    // Successfully updated the last cumulative ack. Next 
flush iteration will send this to broker.
-                    cumulativeAckFlushRequired = true;
-                    return;
                 }
             } else {
-                // message id acknowledging an before the current last 
cumulative ack
-                return;
+                doCumulativeAckAsync(messageId, bitSet);
+                if (pendingIndividualBatchIndexAcks.size() >= 
MAX_ACK_GROUP_SIZE) {
+                    flush();
+                }
+                return CompletableFuture.completedFuture(null);
             }
         }
     }
 
-    private void doTransactionCumulativeAck(MessageIdImpl msgId, 
BitSetRecyclable bitSet) {
+    private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {
+        ConcurrentBitSetRecyclable bitSet = 
pendingIndividualBatchIndexAcks.computeIfAbsent(
+                new MessageIdImpl(batchMessageId.getLedgerId(), 
batchMessageId.getEntryId(),
+                        batchMessageId.getPartitionIndex()), (v) -> {
+                    ConcurrentBitSetRecyclable value;
+                    if (batchMessageId.getAcker() != null &&
+                            !(batchMessageId.getAcker() instanceof 
BatchMessageAckerDisabled)) {
+                        value = 
ConcurrentBitSetRecyclable.create(batchMessageId.getAcker().getBitSet());
+                    } else {
+                        value = ConcurrentBitSetRecyclable.create();
+                        value.set(0, batchMessageId.getBatchIndex());
+                    }
+                    return value;
+                });
+        bitSet.clear(batchMessageId.getBatchIndex());
+    }
+
+    private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable 
bitSet) {
         // Handle concurrent updates from different threads
+        LastCumulativeAck currentCumulativeAck = 
LastCumulativeAck.create(msgId, bitSet);
         while (true) {
-            MessageIdImpl lastCumlativeAck = this.lastCumulativeAck;
-            BitSetRecyclable lastBitSet = this.lastCumulativeAckSet;
-            if (msgId.compareTo(lastCumlativeAck) > 0) {
-                if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, 
lastCumlativeAck, msgId) && LAST_CUMULATIVE_ACK_SET_UPDATER.compareAndSet(this, 
lastBitSet, bitSet)) {
-                    if (lastBitSet != null) {
+            LastCumulativeAck lastCumulativeAck = this.lastCumulativeAck;
+            if (msgId.compareTo(lastCumulativeAck.messageId) > 0) {
+                if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, 
this.lastCumulativeAck, currentCumulativeAck)) {
+                    if (lastCumulativeAck.bitSetRecyclable != null) {
                         try {
-                            lastBitSet.recycle();
+                            lastCumulativeAck.bitSetRecyclable.recycle();
                         } catch (Exception ignore) {
                             // no-op
                         }
+                        lastCumulativeAck.bitSetRecyclable = null;
                     }
+                    lastCumulativeAck.recycle();
                     // Successfully updated the last cumulative ack. Next 
flush iteration will send this to broker.
                     cumulativeAckFlushRequired = true;
                     return;
                 }
             } else {
+                currentCumulativeAck.recycle();
                 // message id acknowledging an before the current last 
cumulative ack
                 return;
             }
         }
     }
 
-    private boolean doImmediateAck(MessageIdImpl msgId, AckType ackType, 
Map<String, Long> properties,
-                                   TransactionImpl transaction) {
+    private CompletableFuture<Void> doCumulativeBatchAck(BatchMessageIdImpl 
batchMessageId,

Review comment:
       ```suggestion
       private CompletableFuture<Void> 
doCumulativeBatchIndexAck(BatchMessageIdImpl batchMessageId,
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to