codelipenghui commented on a change in pull request #8037: URL: https://github.com/apache/pulsar/pull/8037#discussion_r489316644
########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java ########## @@ -326,6 +388,86 @@ public void flush() { } } + if (!pendingIndividualTransactionAcks.isEmpty()) { + if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) { + // We can send 1 single protobuf command with all individual acks + while (true) { + Triple<Long, Long, MessageIdImpl> entry = pendingIndividualTransactionAcks.pollFirst(); + if (entry == null) { + break; + } + + // if messageId is checked then all the chunked related to that msg also processed so, ack all of + // them + MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunckedMessageIdSequenceMap.get(entry.getRight()); + long mostSigBits = entry.getLeft(); + long leastSigBits = entry.getMiddle(); + MessageIdImpl messageId = entry.getRight(); + if (chunkMsgIds != null && chunkMsgIds.length > 1) { + for (MessageIdImpl cMsgId : chunkMsgIds) { + if (cMsgId != null) { + newAckCommand(consumer.consumerId, cMsgId, null, AckType.Individual, null, Collections.emptyMap(), cnx, false, mostSigBits, leastSigBits); + } + } + // messages will be acked so, remove checked message sequence + this.consumer.unAckedChunckedMessageIdSequenceMap.remove(messageId); + } else { + newAckCommand(consumer.consumerId, messageId, null, AckType.Individual, null, Collections.emptyMap(), cnx, false, mostSigBits, leastSigBits); + } + } + } else { + // When talking to older brokers, send the acknowledgements individually + while (true) { + Triple<Long, Long, MessageIdImpl> entry = pendingIndividualTransactionAcks.pollFirst(); + if (entry == null) { + break; + } + + newAckCommand(consumer.consumerId, entry.getRight(), null, AckType.Individual, + null, Collections.emptyMap(), cnx, false, entry.getLeft(), entry.getMiddle()); + shouldFlush = true; + } + } + } + + if (!pendingIndividualTransactionBatchIndexAcks.isEmpty()) { + Iterator<Map.Entry<TransactionImpl, ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>>> transactionIterator = pendingIndividualTransactionBatchIndexAcks.entrySet().iterator(); + while (transactionIterator.hasNext()) { + Map.Entry<TransactionImpl, ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>> transactionEntry = transactionIterator.next(); + TransactionImpl txn = transactionEntry.getKey(); + synchronized (txn) { + if (pendingIndividualTransactionBatchIndexAcks.containsKey(txn)) { + List<Triple<Long, Long, ConcurrentBitSetRecyclable>> messageIdBitSetList = new ArrayList<>(); + transactionEntriesToAck.put(txn, messageIdBitSetList); + Iterator<Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable>> messageIdIterator = transactionEntry.getValue().entrySet().iterator(); + while (messageIdIterator.hasNext()) { + Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable> messageIdEntry = messageIdIterator.next(); + ConcurrentBitSetRecyclable concurrentBitSetRecyclable = + ConcurrentBitSetRecyclable.create(messageIdEntry.getValue()); + MessageIdImpl messageId = messageIdEntry.getKey(); + messageIdBitSetList.add(Triple.of(messageId.ledgerId, messageId.entryId, concurrentBitSetRecyclable)); + messageIdEntry.getValue().set(0, messageIdEntry.getValue().size()); + messageIdIterator.remove(); + } + transactionIterator.remove(); + pendingIndividualTransactionBatchIndexAcks.remove(transactionEntry.getKey()); Review comment: Looks don't need to remove again. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org