congbobo184 commented on code in PR #15729:
URL: https://github.com/apache/pulsar/pull/15729#discussion_r901420562
##########
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java:
##########
@@ -1011,6 +1011,37 @@ public static ByteBuf newAck(long consumerId, long
ledgerId, long entryId, BitSe
return serializeWithSize(cmd);
}
+ public static ByteBuf newAck(long consumerId, List<MessageIdData>
messageIds, AckType ackType,
+ ValidationError validationError, Map<String,
Long> properties, long txnIdLeastBits,
+ long txnIdMostBits, long requestId) {
+ BaseCommand cmd = localCmd(Type.ACK);
+ CommandAck ack = cmd.setAck()
+ .setConsumerId(consumerId)
+ .setAckType(ackType);
+ ack.addAllMessageIds(messageIds);
+
+ if (validationError != null) {
+ ack.setValidationError(validationError);
+ }
+ if (txnIdMostBits >= 0) {
+ ack.setTxnidMostBits(txnIdMostBits);
+ }
+ if (txnIdLeastBits >= 0) {
+ ack.setTxnidLeastBits(txnIdLeastBits);
+ }
+
+ if (requestId >= 0) {
+ ack.setRequestId(requestId);
+ }
+ if (!properties.isEmpty()) {
+ properties.forEach((k, v) -> {
+ ack.addProperty().setKey(k).setValue(v);
+ });
+ }
Review Comment:
same as
```
public static ByteBuf newAck(long consumerId, long ledgerId, long
entryId, BitSetRecyclable ackSet, AckType ackType,
ValidationError validationError,
Map<String, Long> properties, long txnIdLeastBits,
long txnIdMostBits, long requestId, int
batchSize) {
```
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2682,6 +2700,58 @@ private CompletableFuture<Void>
doTransactionAcknowledgeForResponse(MessageId me
return cnx().newAckForReceipt(cmd, requestId);
}
+ private CompletableFuture<Void>
doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
+
ValidationError validationError,
+
Map<String, Long> properties, TxnID txnID) {
+ BitSetRecyclable bitSetRecyclable = null;
+ long ledgerId;
+ long entryId;
+ ByteBuf cmd;
+ long requestId = client.newRequestId();
+ List<MessageIdData> messageIdDataList = new LinkedList<>();
+ for (MessageId messageId : messageIds) {
+ if (messageId instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)
messageId;
+ bitSetRecyclable = BitSetRecyclable.create();
+ if (ackType == AckType.Cumulative) {
+ batchMessageId.ackCumulative();
+ bitSetRecyclable.set(0, batchMessageId.getBatchSize());
+ bitSetRecyclable.clear(0, batchMessageId.getBatchIndex() +
1);
+ } else {
+ bitSetRecyclable.set(0, batchMessageId.getBatchSize());
+ bitSetRecyclable.clear(batchMessageId.getBatchIndex());
+ }
+ MessageIdData messageIdData = new MessageIdData();
+ messageIdData.setLedgerId(batchMessageId.getLedgerId());
+ messageIdData.setEntryId(batchMessageId.getEntryId());
+ messageIdData.setBatchSize(batchMessageId.getBatchSize());
+ long[] as = bitSetRecyclable.toLongArray();
+ for (int i = 0; i < as.length; i++) {
+ messageIdData.addAckSet(as[i]);
Review Comment:
same as
```
private CompletableFuture<Void>
doTransactionAcknowledgeForResponse(MessageId messageId, AckType ackType,
ValidationError validationError,
Map<String, Long> properties, TxnID txnID) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]