congbobo184 commented on code in PR #15729:
URL: https://github.com/apache/pulsar/pull/15729#discussion_r911909322
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -497,11 +497,30 @@ public CompletableFuture<Void>
acknowledgeAsync(Messages<?> messages) {
return acknowledgeAsync(messageIds);
}
+ @Override
+ public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages,
Transaction txn) {
+ List<MessageId> messageIds = new ArrayList<>(messages.size());
+ for (Message<?> message: messages) {
+ try {
+ validateMessageId(message);
+ } catch (PulsarClientException e) {
+ return FutureUtil.failedFuture(e);
+ }
+ messageIds.add(message.getMessageId());
+ }
Review Comment:
the same as `public CompletableFuture<Void> acknowledgeAsync(Messages<?>
messages)`
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2699,6 +2717,58 @@ private CompletableFuture<Void>
doTransactionAcknowledgeForResponse(MessageId me
}
}
+ private CompletableFuture<Void>
doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
+
ValidationError validationError,
+
Map<String, Long> properties, TxnID txnID) {
+ BitSetRecyclable bitSetRecyclable = null;
+ long ledgerId;
+ long entryId;
+ ByteBuf cmd;
+ long requestId = client.newRequestId();
+ List<MessageIdData> messageIdDataList = new LinkedList<>();
+ for (MessageId messageId : messageIds) {
+ if (messageId instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)
messageId;
+ bitSetRecyclable = BitSetRecyclable.create();
+ if (ackType == AckType.Cumulative) {
+ batchMessageId.ackCumulative();
+ bitSetRecyclable.set(0, batchMessageId.getBatchSize());
+ bitSetRecyclable.clear(0, batchMessageId.getBatchIndex() +
1);
+ } else {
+ bitSetRecyclable.set(0, batchMessageId.getBatchSize());
Review Comment:
same as
```
private CompletableFuture<Void>
doTransactionAcknowledgeForResponse(MessageId messageId, AckType ackType,
ValidationError validationError,
Map<String, Long> properties, TxnID txnID) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]