eolivelli commented on a change in pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#discussion_r549152589
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -118,175 +122,302 @@ public boolean isDuplicate(MessageId messageId) {
}
@Override
- public CompletableFuture<Void> addListAcknowledgment(List<MessageIdImpl>
messageIds,
+ public CompletableFuture<Void> addListAcknowledgment(List<MessageId>
messageIds,
AckType ackType,
Map<String, Long> properties) {
- if (ackType == AckType.Cumulative) {
- messageIds.forEach(messageId -> doCumulativeAck(messageId, null));
- return CompletableFuture.completedFuture(null);
+ if (AckType.Cumulative.equals(ackType)) {
+ if (ackResponseEnabled) {
+ Set<CompletableFuture<Void>> completableFutureSet = new
HashSet<>();
+ messageIds.forEach(messageId ->
+
completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType,
properties)));
+ return FutureUtil.waitForAll(new
ArrayList<>(completableFutureSet));
+ } else {
+ messageIds.forEach(messageId ->
addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
+ return CompletableFuture.completedFuture(null);
+ }
+ } else {
+ if (ackResponseEnabled) {
+ try {
+ // when flush the ack, we should bind the this ack in the
currentFuture, during this time we can't
+ // change currentFuture. but we can lock by the read lock,
because the currentFuture is not change
+ // any ack operation is allowed.
+ this.lock.readLock().lock();
+ addListAcknowledgment(messageIds);
+ return this.currentIndividualAckFuture;
+ } finally {
+ this.lock.readLock().unlock();
+ if (acknowledgementGroupTimeMicros == 0 ||
pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+ flush();
+ }
+ }
+ } else {
+ addListAcknowledgment(messageIds);
+ if (acknowledgementGroupTimeMicros == 0 ||
pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+ flush();
+ }
+ return CompletableFuture.completedFuture(null);
+ }
}
- messageIds.forEach(messageId -> {
+ }
+
+ private void addListAcknowledgment(List<MessageId> messageIds) {
+ for (MessageId messageId : messageIds) {
+ consumer.onAcknowledge(messageId, null);
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)
messageId;
- pendingIndividualAcks.add(new
MessageIdImpl(batchMessageId.getLedgerId(),
- batchMessageId.getEntryId(),
batchMessageId.getPartitionIndex()));
+ if (!batchMessageId.ackIndividual()) {
+ doIndividualBatchAckAsync((BatchMessageIdImpl) messageId);
+ } else {
+ messageId =
modifyBatchMessageIdAndStatusInConsumer(batchMessageId);
+ doIndividualAckAsync((MessageIdImpl) messageId);
+ }
} else {
- pendingIndividualAcks.add(messageId);
- }
- pendingIndividualBatchIndexAcks.remove(messageId);
- if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
- flush();
+ modifyMessageIdStatusInConsumer((MessageIdImpl) messageId);
+ doIndividualAckAsync((MessageIdImpl) messageId);
}
- });
- if (acknowledgementGroupTimeMicros == 0) {
- flush();
}
- return CompletableFuture.completedFuture(null);
}
@Override
- public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId,
AckType ackType, Map<String, Long> properties,
- TransactionImpl txn) {
- if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty() ||
- (txn != null && ackType == AckType.Cumulative)) {
- if (msgId instanceof BatchMessageIdImpl && txn != null) {
- BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)
msgId;
- doImmediateBatchIndexAck(batchMessageId,
batchMessageId.getBatchIndex(),
- batchMessageId.getBatchIndex(),
- ackType, properties, txn.getTxnIdMostBits(),
txn.getTxnIdLeastBits());
+ public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId,
AckType ackType,
+ Map<String, Long>
properties) {
+ if (msgId instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
+ if (ackType == AckType.Individual) {
+ consumer.onAcknowledge(msgId, null);
+ // ack this ack carry bitSet index and judge bit set are all
ack
+ if (batchMessageId.ackIndividual()) {
+ MessageIdImpl messageId =
modifyBatchMessageIdAndStatusInConsumer(batchMessageId);
+ return doIndividualAck(messageId, properties);
+ } else if (batchIndexAckEnabled){
+ return doIndividualBatchAck(batchMessageId, properties);
+ } else {
+ // if we prevent batchIndexAck, we can't send the ack
command to broker when the batch message are
+ // all ack complete
return CompletableFuture.completedFuture(null);
}
- // We cannot group acks if the delay is 0 or when there are
properties attached to it. Fortunately that's an
- // uncommon condition since it's only used for the compaction
subscription.
- doImmediateAck(msgId, ackType, properties, txn);
- } else if (ackType == AckType.Cumulative) {
- doCumulativeAck(msgId, null);
- } else {
- // Individual ack
- if (msgId instanceof BatchMessageIdImpl) {
- pendingIndividualAcks.add(new
MessageIdImpl(msgId.getLedgerId(),
- msgId.getEntryId(), msgId.getPartitionIndex()));
} else {
- if (txn != null) {
- pendingIndividualTransactionAcks
- .add(Triple.of(txn.getTxnIdMostBits(),
txn.getTxnIdLeastBits(), msgId));
+ consumer.onAcknowledgeCumulative(msgId, null);
+ if (((BatchMessageIdImpl) msgId).ackCumulative()) {
Review comment:
What happens if we don't have this class? Should we add an *instanceof*
test?
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -118,175 +122,302 @@ public boolean isDuplicate(MessageId messageId) {
}
@Override
- public CompletableFuture<Void> addListAcknowledgment(List<MessageIdImpl>
messageIds,
+ public CompletableFuture<Void> addListAcknowledgment(List<MessageId>
messageIds,
AckType ackType,
Map<String, Long> properties) {
- if (ackType == AckType.Cumulative) {
- messageIds.forEach(messageId -> doCumulativeAck(messageId, null));
- return CompletableFuture.completedFuture(null);
+ if (AckType.Cumulative.equals(ackType)) {
+ if (ackResponseEnabled) {
+ Set<CompletableFuture<Void>> completableFutureSet = new
HashSet<>();
+ messageIds.forEach(messageId ->
+
completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType,
properties)));
+ return FutureUtil.waitForAll(new
ArrayList<>(completableFutureSet));
+ } else {
+ messageIds.forEach(messageId ->
addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
+ return CompletableFuture.completedFuture(null);
+ }
+ } else {
+ if (ackResponseEnabled) {
+ try {
+ // when flush the ack, we should bind the this ack in the
currentFuture, during this time we can't
+ // change currentFuture. but we can lock by the read lock,
because the currentFuture is not change
+ // any ack operation is allowed.
+ this.lock.readLock().lock();
Review comment:
Move this line out of the try block
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -118,175 +122,302 @@ public boolean isDuplicate(MessageId messageId) {
}
@Override
- public CompletableFuture<Void> addListAcknowledgment(List<MessageIdImpl>
messageIds,
+ public CompletableFuture<Void> addListAcknowledgment(List<MessageId>
messageIds,
AckType ackType,
Map<String, Long> properties) {
- if (ackType == AckType.Cumulative) {
- messageIds.forEach(messageId -> doCumulativeAck(messageId, null));
- return CompletableFuture.completedFuture(null);
+ if (AckType.Cumulative.equals(ackType)) {
+ if (ackResponseEnabled) {
+ Set<CompletableFuture<Void>> completableFutureSet = new
HashSet<>();
+ messageIds.forEach(messageId ->
+
completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType,
properties)));
+ return FutureUtil.waitForAll(new
ArrayList<>(completableFutureSet));
+ } else {
+ messageIds.forEach(messageId ->
addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
+ return CompletableFuture.completedFuture(null);
+ }
+ } else {
+ if (ackResponseEnabled) {
+ try {
+ // when flush the ack, we should bind the this ack in the
currentFuture, during this time we can't
+ // change currentFuture. but we can lock by the read lock,
because the currentFuture is not change
+ // any ack operation is allowed.
+ this.lock.readLock().lock();
+ addListAcknowledgment(messageIds);
+ return this.currentIndividualAckFuture;
+ } finally {
+ this.lock.readLock().unlock();
+ if (acknowledgementGroupTimeMicros == 0 ||
pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+ flush();
+ }
+ }
+ } else {
+ addListAcknowledgment(messageIds);
+ if (acknowledgementGroupTimeMicros == 0 ||
pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+ flush();
+ }
+ return CompletableFuture.completedFuture(null);
+ }
}
- messageIds.forEach(messageId -> {
+ }
+
+ private void addListAcknowledgment(List<MessageId> messageIds) {
+ for (MessageId messageId : messageIds) {
+ consumer.onAcknowledge(messageId, null);
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)
messageId;
- pendingIndividualAcks.add(new
MessageIdImpl(batchMessageId.getLedgerId(),
- batchMessageId.getEntryId(),
batchMessageId.getPartitionIndex()));
+ if (!batchMessageId.ackIndividual()) {
+ doIndividualBatchAckAsync((BatchMessageIdImpl) messageId);
+ } else {
+ messageId =
modifyBatchMessageIdAndStatusInConsumer(batchMessageId);
+ doIndividualAckAsync((MessageIdImpl) messageId);
+ }
} else {
- pendingIndividualAcks.add(messageId);
- }
- pendingIndividualBatchIndexAcks.remove(messageId);
- if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
- flush();
+ modifyMessageIdStatusInConsumer((MessageIdImpl) messageId);
+ doIndividualAckAsync((MessageIdImpl) messageId);
}
- });
- if (acknowledgementGroupTimeMicros == 0) {
- flush();
}
- return CompletableFuture.completedFuture(null);
}
@Override
- public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId,
AckType ackType, Map<String, Long> properties,
- TransactionImpl txn) {
- if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty() ||
- (txn != null && ackType == AckType.Cumulative)) {
- if (msgId instanceof BatchMessageIdImpl && txn != null) {
- BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)
msgId;
- doImmediateBatchIndexAck(batchMessageId,
batchMessageId.getBatchIndex(),
- batchMessageId.getBatchIndex(),
- ackType, properties, txn.getTxnIdMostBits(),
txn.getTxnIdLeastBits());
+ public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId,
AckType ackType,
+ Map<String, Long>
properties) {
+ if (msgId instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
+ if (ackType == AckType.Individual) {
+ consumer.onAcknowledge(msgId, null);
+ // ack this ack carry bitSet index and judge bit set are all
ack
+ if (batchMessageId.ackIndividual()) {
+ MessageIdImpl messageId =
modifyBatchMessageIdAndStatusInConsumer(batchMessageId);
+ return doIndividualAck(messageId, properties);
+ } else if (batchIndexAckEnabled){
+ return doIndividualBatchAck(batchMessageId, properties);
+ } else {
+ // if we prevent batchIndexAck, we can't send the ack
command to broker when the batch message are
+ // all ack complete
return CompletableFuture.completedFuture(null);
}
- // We cannot group acks if the delay is 0 or when there are
properties attached to it. Fortunately that's an
- // uncommon condition since it's only used for the compaction
subscription.
- doImmediateAck(msgId, ackType, properties, txn);
- } else if (ackType == AckType.Cumulative) {
- doCumulativeAck(msgId, null);
- } else {
- // Individual ack
- if (msgId instanceof BatchMessageIdImpl) {
- pendingIndividualAcks.add(new
MessageIdImpl(msgId.getLedgerId(),
- msgId.getEntryId(), msgId.getPartitionIndex()));
} else {
- if (txn != null) {
- pendingIndividualTransactionAcks
- .add(Triple.of(txn.getTxnIdMostBits(),
txn.getTxnIdLeastBits(), msgId));
+ consumer.onAcknowledgeCumulative(msgId, null);
+ if (((BatchMessageIdImpl) msgId).ackCumulative()) {
+ return doCumulativeAck(msgId, properties, null);
} else {
- pendingIndividualAcks.add(msgId);
+ if (batchIndexAckEnabled) {
+ return doCumulativeBatchAck(batchMessageId,
properties);
+ } else {
+ // ack the pre messageId, because we prevent the
batchIndexAck, we can ensure pre messageId can
+ // ack
+ if (AckType.Cumulative == ackType
+ &&
!batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
+
doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
+
batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
}
}
- pendingIndividualBatchIndexAcks.remove(msgId);
- if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
- flush();
+ } else {
+ if (ackType == AckType.Individual) {
+ consumer.onAcknowledge(msgId, null);
+ modifyMessageIdStatusInConsumer(msgId);
+ return doIndividualAck(msgId, properties);
+ } else {
+ consumer.onAcknowledgeCumulative(msgId, null);
+ return doCumulativeAck(msgId, properties, null);
}
}
- return CompletableFuture.completedFuture(null);
}
- @Override
- public CompletableFuture<Void>
addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int
batchSize, AckType ackType,
- Map<String, Long> properties,
TransactionImpl txn) {
- if (batchIndexAckEnabled) {
- if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
- doImmediateBatchIndexAck(msgId, batchIndex, batchSize,
ackType, properties,
- txn == null ? -1 : txn.getTxnIdMostBits(),
- txn == null ? -1 : txn.getTxnIdLeastBits());
- } else if (ackType == AckType.Cumulative) {
- BitSetRecyclable bitSet = BitSetRecyclable.create();
- bitSet.set(0, batchSize);
- bitSet.clear(0, batchIndex + 1);
- doCumulativeAck(msgId, bitSet);
- } else if (ackType == AckType.Individual) {
- ConcurrentBitSetRecyclable bitSet;
- bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
- new MessageIdImpl(msgId.getLedgerId(),
msgId.getEntryId(),
- msgId.getPartitionIndex()), (v) -> {
- ConcurrentBitSetRecyclable value;
- if (msgId.getAcker() != null &&
- !(msgId.getAcker() instanceof
BatchMessageAckerDisabled)) {
- value =
ConcurrentBitSetRecyclable.create(msgId.getAcker().getBitSet());
- } else {
- value = ConcurrentBitSetRecyclable.create();
- value.set(0, batchSize);
- }
- return value;
- });
- bitSet.clear(batchIndex);
- if (pendingIndividualBatchIndexAcks.size() >=
MAX_ACK_GROUP_SIZE) {
+ private MessageIdImpl
modifyBatchMessageIdAndStatusInConsumer(BatchMessageIdImpl batchMessageId) {
+ MessageIdImpl messageId = new
MessageIdImpl(batchMessageId.getLedgerId(),
+ batchMessageId.getEntryId(),
batchMessageId.getPartitionIndex());
+
consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
+ modifyMessageIdStatusInConsumerCommon(messageId);
+ return messageId;
+ }
+
+ private void modifyMessageIdStatusInConsumer(MessageIdImpl messageId) {
+ consumer.getStats().incrementNumAcksSent(1);
+ modifyMessageIdStatusInConsumerCommon(messageId);
+ }
+
+ private void modifyMessageIdStatusInConsumerCommon(MessageIdImpl
messageId) {
+ consumer.getUnAckedMessageTracker().remove(messageId);
+ if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
+
consumer.getPossibleSendToDeadLetterTopicMessages().remove(messageId);
+ }
+ }
+
+ private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId,
Map<String, Long> properties) {
+ if (acknowledgementGroupTimeMicros == 0 || (properties != null &&
!properties.isEmpty())) {
+ // We cannot group acks if the delay is 0 or when there are
properties attached to it. Fortunately that's an
+ // uncommon condition since it's only used for the compaction
subscription.
+ return doImmediateAck(messageId, AckType.Individual, properties,
null);
+ } else {
+ if (ackResponseEnabled) {
+ try {
+ // when flush the ack, we should bind the this ack in the
currentFuture, during this time we can't
+ // change currentFuture. but we can lock by the read lock,
because the currentFuture is not change
+ // any ack operation is allowed.
+ this.lock.readLock().lock();
Review comment:
Move this line out of the finally block
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]