codelipenghui commented on a change in pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#discussion_r559544860
##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -255,6 +255,7 @@ enum ProtocolVersion {
// Added Key_Shared subscription
v15 = 15; // Add CommandGetOrCreateSchema and
CommandGetOrCreateSchemaResponse
v16 = 16; // Add support for raw message metadata
+ v17 = 17; // Added support ack response
Review comment:
Please check all "ack response" related code and comments, Unify them to
"ack receipt"
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -115,186 +123,302 @@ public boolean isDuplicate(MessageId messageId) {
}
@Override
- public void addListAcknowledgment(List<MessageIdImpl> messageIds, AckType
ackType, Map<String, Long> properties) {
- if (ackType == AckType.Cumulative) {
- messageIds.forEach(messageId -> doCumulativeAck(messageId, null));
- return;
+ public CompletableFuture<Void> addListAcknowledgment(List<MessageId>
messageIds,
+ AckType ackType,
Map<String, Long> properties) {
+ if (AckType.Cumulative.equals(ackType)) {
+ if (ackReceiptEnabled) {
+ Set<CompletableFuture<Void>> completableFutureSet = new
HashSet<>();
+ messageIds.forEach(messageId ->
+
completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType,
properties)));
+ return FutureUtil.waitForAll(new
ArrayList<>(completableFutureSet));
+ } else {
+ messageIds.forEach(messageId ->
addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
+ return CompletableFuture.completedFuture(null);
+ }
+ } else {
+ if (ackReceiptEnabled) {
+ try {
+ // when flush the ack, we should bind the this ack in the
currentFuture, during this time we can't
+ // change currentFuture. but we can lock by the read lock,
because the currentFuture is not change
+ // any ack operation is allowed.
+ this.lock.readLock().lock();
+ addListAcknowledgment(messageIds);
+ return this.currentIndividualAckFuture;
+ } finally {
+ this.lock.readLock().unlock();
+ if (acknowledgementGroupTimeMicros == 0 ||
pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+ flush();
+ }
+ }
+ } else {
+ addListAcknowledgment(messageIds);
+ if (acknowledgementGroupTimeMicros == 0 ||
pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+ flush();
+ }
+ return CompletableFuture.completedFuture(null);
+ }
}
- messageIds.forEach(messageId -> {
+ }
+
+ private void addListAcknowledgment(List<MessageId> messageIds) {
+ for (MessageId messageId : messageIds) {
+ consumer.onAcknowledge(messageId, null);
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)
messageId;
- pendingIndividualAcks.add(new
MessageIdImpl(batchMessageId.getLedgerId(),
- batchMessageId.getEntryId(),
batchMessageId.getPartitionIndex()));
+ if (!batchMessageId.ackIndividual()) {
+ doIndividualBatchAckAsync((BatchMessageIdImpl) messageId);
+ } else {
+ messageId =
modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
+ doIndividualAckAsync((MessageIdImpl) messageId);
+ }
} else {
- pendingIndividualAcks.add(messageId);
- }
- pendingIndividualBatchIndexAcks.remove(messageId);
- if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
- flush();
+ modifyMessageIdStatesInConsumer((MessageIdImpl) messageId);
+ doIndividualAckAsync((MessageIdImpl) messageId);
}
- });
- if (acknowledgementGroupTimeMicros == 0) {
- flush();
}
}
@Override
- public void addAcknowledgment(MessageIdImpl msgId, AckType ackType,
Map<String, Long> properties,
- TransactionImpl txn) {
- if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty() ||
- (txn != null && ackType == AckType.Cumulative)) {
- if (msgId instanceof BatchMessageIdImpl && txn != null) {
- BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)
msgId;
- doImmediateBatchIndexAck(batchMessageId,
batchMessageId.getBatchIndex(),
- batchMessageId.getBatchIndex(),
- ackType, properties, txn.getTxnIdMostBits(),
txn.getTxnIdLeastBits());
- return;
+ public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId,
AckType ackType,
+ Map<String, Long>
properties) {
+ if (msgId instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
+ if (ackType == AckType.Individual) {
+ consumer.onAcknowledge(msgId, null);
+ // ack this ack carry bitSet index and judge bit set are all
ack
+ if (batchMessageId.ackIndividual()) {
+ MessageIdImpl messageId =
modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
+ return doIndividualAck(messageId, properties);
+ } else if (batchIndexAckEnabled){
+ return doIndividualBatchAck(batchMessageId, properties);
+ } else {
+ // if we prevent batchIndexAck, we can't send the ack
command to broker when the batch message are
+ // all ack complete
+ return CompletableFuture.completedFuture(null);
}
- // We cannot group acks if the delay is 0 or when there are
properties attached to it. Fortunately that's an
- // uncommon condition since it's only used for the compaction
subscription.
- doImmediateAck(msgId, ackType, properties, txn);
- } else if (ackType == AckType.Cumulative) {
- doCumulativeAck(msgId, null);
- } else {
- // Individual ack
- if (msgId instanceof BatchMessageIdImpl) {
- pendingIndividualAcks.add(new
MessageIdImpl(msgId.getLedgerId(),
- msgId.getEntryId(), msgId.getPartitionIndex()));
} else {
- if (txn != null) {
- pendingIndividualTransactionAcks
- .add(Triple.of(txn.getTxnIdMostBits(),
txn.getTxnIdLeastBits(), msgId));
+ consumer.onAcknowledgeCumulative(msgId, null);
+ if (batchMessageId.ackCumulative()) {
+ return doCumulativeAck(msgId, properties, null);
} else {
- pendingIndividualAcks.add(msgId);
+ if (batchIndexAckEnabled) {
+ return doCumulativeBatchIndexAck(batchMessageId,
properties);
+ } else {
+ // ack the pre messageId, because we prevent the
batchIndexAck, we can ensure pre messageId can
+ // ack
+ if (AckType.Cumulative == ackType
+ &&
!batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
+
doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
+
batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
}
}
- pendingIndividualBatchIndexAcks.remove(msgId);
- if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
- flush();
+ } else {
+ if (ackType == AckType.Individual) {
+ consumer.onAcknowledge(msgId, null);
+ modifyMessageIdStatesInConsumer(msgId);
+ return doIndividualAck(msgId, properties);
+ } else {
+ consumer.onAcknowledgeCumulative(msgId, null);
+ return doCumulativeAck(msgId, properties, null);
}
}
}
- public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int
batchIndex, int batchSize, AckType ackType,
- Map<String, Long> properties,
TransactionImpl txn) {
- if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
- doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType,
properties,
- txn == null ? -1 : txn.getTxnIdMostBits(),
- txn == null ? -1 : txn.getTxnIdLeastBits());
- } else if (ackType == AckType.Cumulative) {
- BitSetRecyclable bitSet = BitSetRecyclable.create();
- bitSet.set(0, batchSize);
- bitSet.clear(0, batchIndex + 1);
- doCumulativeAck(msgId, bitSet);
- } else if (ackType == AckType.Individual) {
- ConcurrentBitSetRecyclable bitSet;
- if (txn != null) {
- synchronized (txn) {
- ConcurrentHashMap<MessageIdImpl,
ConcurrentBitSetRecyclable> transactionIndividualBatchIndexAcks =
- pendingIndividualTransactionBatchIndexAcks
- .computeIfAbsent(txn, (v) -> new
ConcurrentHashMap<>());
- bitSet =
transactionIndividualBatchIndexAcks.computeIfAbsent(msgId, (v) -> {
- ConcurrentBitSetRecyclable value;
- value = ConcurrentBitSetRecyclable.create();
- value.set(0, msgId.getAcker().getBatchSize());
- return value;
- });
- bitSet.clear(batchIndex);
+ private MessageIdImpl
modifyBatchMessageIdAndStatesInConsumer(BatchMessageIdImpl batchMessageId) {
+ MessageIdImpl messageId = new
MessageIdImpl(batchMessageId.getLedgerId(),
+ batchMessageId.getEntryId(),
batchMessageId.getPartitionIndex());
+
consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
+ clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
+ return messageId;
+ }
+
+ private void modifyMessageIdStatesInConsumer(MessageIdImpl messageId) {
+ consumer.getStats().incrementNumAcksSent(1);
+ clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
+ }
+
+ private void clearMessageIdFromUnAckTrackerAndDeadLetter(MessageIdImpl
messageId) {
+ consumer.getUnAckedMessageTracker().remove(messageId);
+ if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
+
consumer.getPossibleSendToDeadLetterTopicMessages().remove(messageId);
+ }
+ }
+
+ private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId,
Map<String, Long> properties) {
+ if (acknowledgementGroupTimeMicros == 0 || (properties != null &&
!properties.isEmpty())) {
+ // We cannot group acks if the delay is 0 or when there are
properties attached to it. Fortunately that's an
+ // uncommon condition since it's only used for the compaction
subscription.
+ return doImmediateAck(messageId, AckType.Individual, properties,
null);
+ } else {
+ if (ackReceiptEnabled) {
Review comment:
It's better to use a method isAckReceiptEnabled() to check if the
consumer enabled the ack receipt and the broker support the ack receipt.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]