codelipenghui commented on a change in pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#discussion_r489316644



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -326,6 +388,86 @@ public void flush() {
             }
         }
 
+        if (!pendingIndividualTransactionAcks.isEmpty()) {
+            if 
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()))
 {
+                // We can send 1 single protobuf command with all individual 
acks
+                while (true) {
+                    Triple<Long, Long, MessageIdImpl> entry = 
pendingIndividualTransactionAcks.pollFirst();
+                    if (entry == null) {
+                        break;
+                    }
+
+                    // if messageId is checked then all the chunked related to 
that msg also processed so, ack all of
+                    // them
+                    MessageIdImpl[] chunkMsgIds = 
this.consumer.unAckedChunckedMessageIdSequenceMap.get(entry.getRight());
+                    long mostSigBits = entry.getLeft();
+                    long leastSigBits = entry.getMiddle();
+                    MessageIdImpl messageId = entry.getRight();
+                    if (chunkMsgIds != null && chunkMsgIds.length > 1) {
+                        for (MessageIdImpl cMsgId : chunkMsgIds) {
+                            if (cMsgId != null) {
+                                newAckCommand(consumer.consumerId, cMsgId, 
null, AckType.Individual, null, Collections.emptyMap(), cnx, false, 
mostSigBits, leastSigBits);
+                            }
+                        }
+                        // messages will be acked so, remove checked message 
sequence
+                        
this.consumer.unAckedChunckedMessageIdSequenceMap.remove(messageId);
+                    } else {
+                        newAckCommand(consumer.consumerId, messageId, null, 
AckType.Individual, null, Collections.emptyMap(), cnx, false, mostSigBits, 
leastSigBits);
+                    }
+                }
+            } else {
+                // When talking to older brokers, send the acknowledgements 
individually
+                while (true) {
+                    Triple<Long, Long, MessageIdImpl> entry = 
pendingIndividualTransactionAcks.pollFirst();
+                    if (entry == null) {
+                        break;
+                    }
+
+                    newAckCommand(consumer.consumerId, entry.getRight(), null, 
AckType.Individual,
+                            null, Collections.emptyMap(), cnx, false, 
entry.getLeft(), entry.getMiddle());
+                    shouldFlush = true;
+                }
+            }
+        }
+
+        if (!pendingIndividualTransactionBatchIndexAcks.isEmpty()) {
+            Iterator<Map.Entry<TransactionImpl, 
ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>>> 
transactionIterator = 
pendingIndividualTransactionBatchIndexAcks.entrySet().iterator();
+            while (transactionIterator.hasNext()) {
+                Map.Entry<TransactionImpl, ConcurrentHashMap<MessageIdImpl, 
ConcurrentBitSetRecyclable>> transactionEntry = transactionIterator.next();
+                TransactionImpl txn = transactionEntry.getKey();
+                synchronized (txn) {
+                    if 
(pendingIndividualTransactionBatchIndexAcks.containsKey(txn)) {
+                        List<Triple<Long, Long, ConcurrentBitSetRecyclable>> 
messageIdBitSetList = new ArrayList<>();
+                        transactionEntriesToAck.put(txn, messageIdBitSetList);
+                        Iterator<Map.Entry<MessageIdImpl, 
ConcurrentBitSetRecyclable>> messageIdIterator = 
transactionEntry.getValue().entrySet().iterator();
+                        while (messageIdIterator.hasNext()) {
+                            Map.Entry<MessageIdImpl, 
ConcurrentBitSetRecyclable> messageIdEntry = messageIdIterator.next();
+                            ConcurrentBitSetRecyclable 
concurrentBitSetRecyclable =
+                                    
ConcurrentBitSetRecyclable.create(messageIdEntry.getValue());
+                            MessageIdImpl messageId = messageIdEntry.getKey();
+                            
messageIdBitSetList.add(Triple.of(messageId.ledgerId, messageId.entryId, 
concurrentBitSetRecyclable));
+                            messageIdEntry.getValue().set(0, 
messageIdEntry.getValue().size());
+                            messageIdIterator.remove();
+                        }
+                        transactionIterator.remove();
+                        
pendingIndividualTransactionBatchIndexAcks.remove(transactionEntry.getKey());

Review comment:
       Looks don't need to remove again.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to