congbobo184 commented on a change in pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#discussion_r557839031



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -497,46 +586,84 @@ public void close() {
         }
     }
 
-    private void newAckCommand(long consumerId, MessageIdImpl msgId, 
BitSetRecyclable lastCumulativeAckSet,
-            AckType ackType, ValidationError validationError, Map<String, 
Long> map, ClientCnx cnx,
-                               boolean flush, long txnidMostBits, long 
txnidLeastBits) {
-
-        MessageIdImpl[] chunkMsgIds = 
this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId);
-        if (chunkMsgIds != null && txnidLeastBits < 0 && txnidMostBits < 0) {
-            if 
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())
-                    && ackType != AckType.Cumulative) {
+    private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId, 
MessageIdImpl msgId,
+                                                            BitSetRecyclable 
bitSet, AckType ackType,
+                                                            Map<String, Long> 
map, ClientCnx cnx) {
+        MessageIdImpl[] chunkMsgIds = 
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
+        final CompletableFuture<Void> completableFuture;
+        // cumulative ack chunk by the last messageId
+        if (chunkMsgIds != null &&  ackType != AckType.Cumulative) {
+            if 
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()))
 {
                 List<Triple<Long, Long, ConcurrentBitSetRecyclable>> 
entriesToAck = new ArrayList<>(chunkMsgIds.length);
                 for (MessageIdImpl cMsgId : chunkMsgIds) {
                     if (cMsgId != null && chunkMsgIds.length > 1) {
                         entriesToAck.add(Triple.of(cMsgId.getLedgerId(), 
cMsgId.getEntryId(), null));
                     }
                 }
-                ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, 
entriesToAck);
-                if (flush) {
-                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                if (ackResponseEnabled) {
+                    long requestId = consumer.getClient().newRequestId();
+                    ByteBuf cmd = 
Commands.newMultiMessageAck(consumer.consumerId, entriesToAck, requestId);
+                    completableFuture = cnx.newAckForResponse(cmd, requestId);
                 } else {
-                    cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+                    ByteBuf cmd = 
Commands.newMultiMessageAck(consumer.consumerId, entriesToAck, -1);
+                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                    completableFuture = 
CompletableFuture.completedFuture(null);
                 }
             } else {
+                // if don't support multi message ack, it also support ack 
response, so we should not think about the
+                // ack response in this logic

Review comment:
       this is the chunk message, so if want to use ack response, broker must 
support multi message ack, we don't need to support broker don't support multi 
message ack and then we return ack response.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to