congbobo184 commented on a change in pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#discussion_r557844899
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -497,46 +586,84 @@ public void close() {
}
}
- private void newAckCommand(long consumerId, MessageIdImpl msgId,
BitSetRecyclable lastCumulativeAckSet,
- AckType ackType, ValidationError validationError, Map<String,
Long> map, ClientCnx cnx,
- boolean flush, long txnidMostBits, long
txnidLeastBits) {
-
- MessageIdImpl[] chunkMsgIds =
this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId);
- if (chunkMsgIds != null && txnidLeastBits < 0 && txnidMostBits < 0) {
- if
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())
- && ackType != AckType.Cumulative) {
+ private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId,
MessageIdImpl msgId,
+ BitSetRecyclable
bitSet, AckType ackType,
+ Map<String, Long>
map, ClientCnx cnx) {
+ MessageIdImpl[] chunkMsgIds =
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
+ final CompletableFuture<Void> completableFuture;
+ // cumulative ack chunk by the last messageId
+ if (chunkMsgIds != null && ackType != AckType.Cumulative) {
+ if
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()))
{
List<Triple<Long, Long, ConcurrentBitSetRecyclable>>
entriesToAck = new ArrayList<>(chunkMsgIds.length);
for (MessageIdImpl cMsgId : chunkMsgIds) {
if (cMsgId != null && chunkMsgIds.length > 1) {
entriesToAck.add(Triple.of(cMsgId.getLedgerId(),
cMsgId.getEntryId(), null));
}
}
- ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId,
entriesToAck);
- if (flush) {
- cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+ if (ackResponseEnabled) {
+ long requestId = consumer.getClient().newRequestId();
+ ByteBuf cmd =
Commands.newMultiMessageAck(consumer.consumerId, entriesToAck, requestId);
+ completableFuture = cnx.newAckForResponse(cmd, requestId);
Review comment:
because newImmediateAckAndFlush so we should write the command to broker
immediately.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]