codelipenghui commented on a change in pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#discussion_r559544860



##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -255,6 +255,7 @@ enum ProtocolVersion {
               // Added Key_Shared subscription
     v15 = 15; // Add CommandGetOrCreateSchema and 
CommandGetOrCreateSchemaResponse
     v16 = 16; // Add support for raw message metadata
+    v17 = 17; // Added support ack response

Review comment:
       Please check all "ack response" related code and comments, Unify them to 
"ack receipt" 

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -115,186 +123,302 @@ public boolean isDuplicate(MessageId messageId) {
     }
 
     @Override
-    public void addListAcknowledgment(List<MessageIdImpl> messageIds, AckType 
ackType, Map<String, Long> properties) {
-        if (ackType == AckType.Cumulative) {
-            messageIds.forEach(messageId -> doCumulativeAck(messageId, null));
-            return;
+    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> 
messageIds,
+                                                         AckType ackType, 
Map<String, Long> properties) {
+        if (AckType.Cumulative.equals(ackType)) {
+            if (ackReceiptEnabled) {
+                Set<CompletableFuture<Void>> completableFutureSet = new 
HashSet<>();
+                messageIds.forEach(messageId ->
+                        
completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, 
properties)));
+                return FutureUtil.waitForAll(new 
ArrayList<>(completableFutureSet));
+            } else {
+                messageIds.forEach(messageId -> 
addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
+                return CompletableFuture.completedFuture(null);
+            }
+        } else {
+            if (ackReceiptEnabled) {
+                try {
+                    // when flush the ack, we should bind the this ack in the 
currentFuture, during this time we can't
+                    // change currentFuture. but we can lock by the read lock, 
because the currentFuture is not change
+                    // any ack operation is allowed.
+                    this.lock.readLock().lock();
+                    addListAcknowledgment(messageIds);
+                    return this.currentIndividualAckFuture;
+                } finally {
+                    this.lock.readLock().unlock();
+                    if (acknowledgementGroupTimeMicros == 0 || 
pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                        flush();
+                    }
+                }
+            } else {
+                addListAcknowledgment(messageIds);
+                if (acknowledgementGroupTimeMicros == 0 || 
pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                    flush();
+                }
+                return CompletableFuture.completedFuture(null);
+            }
         }
-        messageIds.forEach(messageId -> {
+    }
+
+    private void addListAcknowledgment(List<MessageId> messageIds) {
+        for (MessageId messageId : messageIds) {
+            consumer.onAcknowledge(messageId, null);
             if (messageId instanceof BatchMessageIdImpl) {
                 BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
messageId;
-                pendingIndividualAcks.add(new 
MessageIdImpl(batchMessageId.getLedgerId(),
-                        batchMessageId.getEntryId(), 
batchMessageId.getPartitionIndex()));
+                if (!batchMessageId.ackIndividual()) {
+                    doIndividualBatchAckAsync((BatchMessageIdImpl) messageId);
+                } else {
+                    messageId = 
modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
+                    doIndividualAckAsync((MessageIdImpl) messageId);
+                }
             } else {
-                pendingIndividualAcks.add(messageId);
-            }
-            pendingIndividualBatchIndexAcks.remove(messageId);
-            if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
+                modifyMessageIdStatesInConsumer((MessageIdImpl) messageId);
+                doIndividualAckAsync((MessageIdImpl) messageId);
             }
-        });
-        if (acknowledgementGroupTimeMicros == 0) {
-            flush();
         }
     }
 
     @Override
-    public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, 
Map<String, Long> properties,
-                                  TransactionImpl txn) {
-        if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty() ||
-                (txn != null && ackType == AckType.Cumulative)) {
-                if (msgId instanceof BatchMessageIdImpl && txn != null) {
-                    BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
msgId;
-                    doImmediateBatchIndexAck(batchMessageId, 
batchMessageId.getBatchIndex(),
-                            batchMessageId.getBatchIndex(),
-                            ackType, properties, txn.getTxnIdMostBits(), 
txn.getTxnIdLeastBits());
-                    return;
+    public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, 
AckType ackType,
+                                                     Map<String, Long> 
properties) {
+        if (msgId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
+            if (ackType == AckType.Individual) {
+                consumer.onAcknowledge(msgId, null);
+                // ack this ack carry bitSet index and judge bit set are all 
ack
+                if (batchMessageId.ackIndividual()) {
+                    MessageIdImpl messageId = 
modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
+                    return doIndividualAck(messageId, properties);
+                } else if (batchIndexAckEnabled){
+                    return doIndividualBatchAck(batchMessageId, properties);
+                } else {
+                    // if we prevent batchIndexAck, we can't send the ack 
command to broker when the batch message are
+                    // all ack complete
+                    return CompletableFuture.completedFuture(null);
                 }
-            // We cannot group acks if the delay is 0 or when there are 
properties attached to it. Fortunately that's an
-            // uncommon condition since it's only used for the compaction 
subscription.
-            doImmediateAck(msgId, ackType, properties, txn);
-        } else if (ackType == AckType.Cumulative) {
-            doCumulativeAck(msgId, null);
-        } else {
-            // Individual ack
-            if (msgId instanceof BatchMessageIdImpl) {
-                pendingIndividualAcks.add(new 
MessageIdImpl(msgId.getLedgerId(),
-                        msgId.getEntryId(), msgId.getPartitionIndex()));
             } else {
-                if (txn != null) {
-                    pendingIndividualTransactionAcks
-                            .add(Triple.of(txn.getTxnIdMostBits(), 
txn.getTxnIdLeastBits(), msgId));
+                consumer.onAcknowledgeCumulative(msgId, null);
+                if (batchMessageId.ackCumulative()) {
+                    return doCumulativeAck(msgId, properties, null);
                 } else {
-                    pendingIndividualAcks.add(msgId);
+                    if (batchIndexAckEnabled) {
+                        return doCumulativeBatchIndexAck(batchMessageId, 
properties);
+                    } else {
+                        // ack the pre messageId, because we prevent the 
batchIndexAck, we can ensure pre messageId can
+                        // ack
+                        if (AckType.Cumulative == ackType
+                                && 
!batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
+                            
doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
+                            
batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
+                        }
+                        return CompletableFuture.completedFuture(null);
+                    }
                 }
             }
-            pendingIndividualBatchIndexAcks.remove(msgId);
-            if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
+        } else {
+            if (ackType == AckType.Individual) {
+                consumer.onAcknowledge(msgId, null);
+                modifyMessageIdStatesInConsumer(msgId);
+                return doIndividualAck(msgId, properties);
+            } else {
+                consumer.onAcknowledgeCumulative(msgId, null);
+                return doCumulativeAck(msgId, properties, null);
             }
         }
     }
 
-    public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int 
batchIndex, int batchSize, AckType ackType,
-                                            Map<String, Long> properties, 
TransactionImpl txn) {
-        if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
-            doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType, 
properties,
-                    txn == null ? -1 : txn.getTxnIdMostBits(),
-                    txn == null ? -1 : txn.getTxnIdLeastBits());
-        } else if (ackType == AckType.Cumulative) {
-            BitSetRecyclable bitSet = BitSetRecyclable.create();
-            bitSet.set(0, batchSize);
-            bitSet.clear(0, batchIndex + 1);
-            doCumulativeAck(msgId, bitSet);
-        } else if (ackType == AckType.Individual) {
-            ConcurrentBitSetRecyclable bitSet;
-            if (txn != null) {
-                synchronized (txn) {
-                    ConcurrentHashMap<MessageIdImpl, 
ConcurrentBitSetRecyclable> transactionIndividualBatchIndexAcks =
-                            pendingIndividualTransactionBatchIndexAcks
-                                    .computeIfAbsent(txn, (v) -> new 
ConcurrentHashMap<>());
-                    bitSet = 
transactionIndividualBatchIndexAcks.computeIfAbsent(msgId, (v) -> {
-                        ConcurrentBitSetRecyclable value;
-                        value = ConcurrentBitSetRecyclable.create();
-                        value.set(0, msgId.getAcker().getBatchSize());
-                        return value;
-                    });
-                    bitSet.clear(batchIndex);
+    private MessageIdImpl 
modifyBatchMessageIdAndStatesInConsumer(BatchMessageIdImpl batchMessageId) {
+        MessageIdImpl messageId = new 
MessageIdImpl(batchMessageId.getLedgerId(),
+                batchMessageId.getEntryId(), 
batchMessageId.getPartitionIndex());
+        
consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
+        clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
+        return messageId;
+    }
+
+    private void modifyMessageIdStatesInConsumer(MessageIdImpl messageId) {
+        consumer.getStats().incrementNumAcksSent(1);
+        clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
+    }
+
+    private void clearMessageIdFromUnAckTrackerAndDeadLetter(MessageIdImpl 
messageId) {
+        consumer.getUnAckedMessageTracker().remove(messageId);
+        if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
+            
consumer.getPossibleSendToDeadLetterTopicMessages().remove(messageId);
+        }
+    }
+
+    private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId, 
Map<String, Long> properties) {
+        if (acknowledgementGroupTimeMicros == 0 || (properties != null && 
!properties.isEmpty())) {
+            // We cannot group acks if the delay is 0 or when there are 
properties attached to it. Fortunately that's an
+            // uncommon condition since it's only used for the compaction 
subscription.
+            return doImmediateAck(messageId, AckType.Individual, properties, 
null);
+        } else {
+            if (ackReceiptEnabled) {

Review comment:
       It's better to use a method isAckReceiptEnabled() to check if the 
consumer enabled the ack receipt and the broker support the ack receipt.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to