congbobo184 commented on code in PR #15729:
URL: https://github.com/apache/pulsar/pull/15729#discussion_r901420562


##########
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java:
##########
@@ -1011,6 +1011,37 @@ public static ByteBuf newAck(long consumerId, long 
ledgerId, long entryId, BitSe
         return serializeWithSize(cmd);
     }
 
+    public static ByteBuf newAck(long consumerId, List<MessageIdData> 
messageIds, AckType ackType,
+                                 ValidationError validationError, Map<String, 
Long> properties, long txnIdLeastBits,
+                                 long txnIdMostBits, long requestId) {
+        BaseCommand cmd = localCmd(Type.ACK);
+        CommandAck ack = cmd.setAck()
+                .setConsumerId(consumerId)
+                .setAckType(ackType);
+        ack.addAllMessageIds(messageIds);
+
+        if (validationError != null) {
+            ack.setValidationError(validationError);
+        }
+        if (txnIdMostBits >= 0) {
+            ack.setTxnidMostBits(txnIdMostBits);
+        }
+        if (txnIdLeastBits >= 0) {
+            ack.setTxnidLeastBits(txnIdLeastBits);
+        }
+
+        if (requestId >= 0) {
+            ack.setRequestId(requestId);
+        }
+        if (!properties.isEmpty()) {
+            properties.forEach((k, v) -> {
+                ack.addProperty().setKey(k).setValue(v);
+            });
+        }

Review Comment:
   same as 
   ```
       public static ByteBuf newAck(long consumerId, long ledgerId, long 
entryId, BitSetRecyclable ackSet, AckType ackType,
                                    ValidationError validationError, 
Map<String, Long> properties, long txnIdLeastBits,
                                    long txnIdMostBits, long requestId, int 
batchSize) {
   ```



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2682,6 +2700,58 @@ private CompletableFuture<Void> 
doTransactionAcknowledgeForResponse(MessageId me
         return cnx().newAckForReceipt(cmd, requestId);
     }
 
+    private CompletableFuture<Void> 
doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
+                                                                        
ValidationError validationError,
+                                                                        
Map<String, Long> properties, TxnID txnID) {
+        BitSetRecyclable bitSetRecyclable = null;
+        long ledgerId;
+        long entryId;
+        ByteBuf cmd;
+        long requestId = client.newRequestId();
+        List<MessageIdData> messageIdDataList = new LinkedList<>();
+        for (MessageId messageId : messageIds) {
+            if (messageId instanceof BatchMessageIdImpl) {
+                BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
messageId;
+                bitSetRecyclable = BitSetRecyclable.create();
+                if (ackType == AckType.Cumulative) {
+                    batchMessageId.ackCumulative();
+                    bitSetRecyclable.set(0, batchMessageId.getBatchSize());
+                    bitSetRecyclable.clear(0, batchMessageId.getBatchIndex() + 
1);
+                } else {
+                    bitSetRecyclable.set(0, batchMessageId.getBatchSize());
+                    bitSetRecyclable.clear(batchMessageId.getBatchIndex());
+                }
+                MessageIdData messageIdData = new MessageIdData();
+                messageIdData.setLedgerId(batchMessageId.getLedgerId());
+                messageIdData.setEntryId(batchMessageId.getEntryId());
+                messageIdData.setBatchSize(batchMessageId.getBatchSize());
+                long[] as = bitSetRecyclable.toLongArray();
+                for (int i = 0; i < as.length; i++) {
+                    messageIdData.addAckSet(as[i]);

Review Comment:
   same as
   
   ```
   private CompletableFuture<Void> 
doTransactionAcknowledgeForResponse(MessageId messageId, AckType ackType,
                                                                           
ValidationError validationError,
                                                                           
Map<String, Long> properties, TxnID txnID) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to