congbobo184 commented on a change in pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#discussion_r557839031
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -497,46 +586,84 @@ public void close() {
}
}
- private void newAckCommand(long consumerId, MessageIdImpl msgId,
BitSetRecyclable lastCumulativeAckSet,
- AckType ackType, ValidationError validationError, Map<String,
Long> map, ClientCnx cnx,
- boolean flush, long txnidMostBits, long
txnidLeastBits) {
-
- MessageIdImpl[] chunkMsgIds =
this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId);
- if (chunkMsgIds != null && txnidLeastBits < 0 && txnidMostBits < 0) {
- if
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())
- && ackType != AckType.Cumulative) {
+ private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId,
MessageIdImpl msgId,
+ BitSetRecyclable
bitSet, AckType ackType,
+ Map<String, Long>
map, ClientCnx cnx) {
+ MessageIdImpl[] chunkMsgIds =
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
+ final CompletableFuture<Void> completableFuture;
+ // cumulative ack chunk by the last messageId
+ if (chunkMsgIds != null && ackType != AckType.Cumulative) {
+ if
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()))
{
List<Triple<Long, Long, ConcurrentBitSetRecyclable>>
entriesToAck = new ArrayList<>(chunkMsgIds.length);
for (MessageIdImpl cMsgId : chunkMsgIds) {
if (cMsgId != null && chunkMsgIds.length > 1) {
entriesToAck.add(Triple.of(cMsgId.getLedgerId(),
cMsgId.getEntryId(), null));
}
}
- ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId,
entriesToAck);
- if (flush) {
- cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+ if (ackResponseEnabled) {
+ long requestId = consumer.getClient().newRequestId();
+ ByteBuf cmd =
Commands.newMultiMessageAck(consumer.consumerId, entriesToAck, requestId);
+ completableFuture = cnx.newAckForResponse(cmd, requestId);
} else {
- cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+ ByteBuf cmd =
Commands.newMultiMessageAck(consumer.consumerId, entriesToAck, -1);
+ cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+ completableFuture =
CompletableFuture.completedFuture(null);
}
} else {
+ // if don't support multi message ack, it also support ack
response, so we should not think about the
+ // ack response in this logic
Review comment:
this is the chunk message, so if want to use ack response, broker must
support multi message ack, we don't need to support broker don't support multi
message ack and then we return ack response.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]