lhotari commented on code in PR #17833:
URL: https://github.com/apache/pulsar/pull/17833#discussion_r979686788
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -183,67 +187,57 @@ public CompletableFuture<Void>
addAcknowledgment(MessageIdImpl msgId, AckType ac
Map<String, Long>
properties) {
if (msgId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
- if (ackType == AckType.Individual) {
- consumer.onAcknowledge(msgId, null);
- // ack this ack carry bitSet index and judge bit set are all
ack
- if (batchMessageId.ackIndividual()) {
- MessageIdImpl messageId =
modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
- return doIndividualAck(messageId, properties);
- } else if (batchIndexAckEnabled){
- return doIndividualBatchAck(batchMessageId, properties);
- } else {
- // if we prevent batchIndexAck, we can't send the ack
command to broker when the batch message are
- // all ack complete
- return CompletableFuture.completedFuture(null);
- }
- } else {
- consumer.onAcknowledgeCumulative(msgId, null);
- if (batchMessageId.ackCumulative()) {
- return doCumulativeAck(msgId, properties, null);
- } else {
- if (batchIndexAckEnabled) {
- return doCumulativeBatchIndexAck(batchMessageId,
properties);
- } else {
- // ack the pre messageId, because we prevent the
batchIndexAck, we can ensure pre messageId can
- // ack
- if (AckType.Cumulative == ackType
- &&
!batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
-
doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
-
batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
- }
- return CompletableFuture.completedFuture(null);
- }
- }
- }
+ return addAcknowledgment(batchMessageId.toMessageIdImpl(),
ackType, properties, batchMessageId);
} else {
- if (ackType == AckType.Individual) {
- consumer.onAcknowledge(msgId, null);
- modifyMessageIdStatesInConsumer(msgId);
- return doIndividualAck(msgId, properties);
- } else {
- consumer.onAcknowledgeCumulative(msgId, null);
- return doCumulativeAck(msgId, properties, null);
- }
+ return addAcknowledgment(msgId, ackType, properties, null);
}
}
- private MessageIdImpl
modifyBatchMessageIdAndStatesInConsumer(BatchMessageIdImpl batchMessageId) {
- MessageIdImpl messageId = new
MessageIdImpl(batchMessageId.getLedgerId(),
- batchMessageId.getEntryId(),
batchMessageId.getPartitionIndex());
-
consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
- clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
- return messageId;
- }
-
- private void modifyMessageIdStatesInConsumer(MessageIdImpl messageId) {
- consumer.getStats().incrementNumAcksSent(1);
- clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
+ private CompletableFuture<Void> addIndividualAcknowledgment(
+ MessageIdImpl msgId,
+ @Nullable BatchMessageIdImpl batchMessageId,
+ Function<MessageIdImpl, CompletableFuture<Void>>
individualAckFunction,
+ Function<BatchMessageIdImpl, CompletableFuture<Void>>
batchAckFunction) {
+ consumer.onAcknowledge(msgId, null);
Review Comment:
there's a change in behavior in calling `onAcknowledge`. Previous, the
original message id instance got passed. Now some information is potentially
lost in the case of a batch message id.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]