poorbarcode commented on code in PR #21928:
URL: https://github.com/apache/pulsar/pull/21928#discussion_r1461093751
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -231,29 +231,31 @@ private CompletableFuture<Void>
addAcknowledgment(MessageIdAdv msgId,
case Individual:
return addIndividualAcknowledgment(msgId,
batchMessageId,
- __ -> doIndividualAck(__, properties),
- __ -> doIndividualBatchAck(__, properties));
+ __ -> doIndividualAck(__, properties, false),
+ __ -> doIndividualBatchAck(__, properties, false));
case Cumulative:
if (batchMessageId != null) {
consumer.onAcknowledgeCumulative(batchMessageId, null);
} else {
consumer.onAcknowledgeCumulative(msgId, null);
}
if (batchMessageId == null ||
MessageIdAdvUtils.acknowledge(batchMessageId, false)) {
- return doCumulativeAck(msgId, properties, null);
+ return doCumulativeAck(msgId, properties, null, false);
} else if (batchIndexAckEnabled) {
- return doCumulativeBatchIndexAck(batchMessageId,
properties);
+ return doCumulativeBatchIndexAck(batchMessageId,
properties, false);
} else {
-
doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties,
null);
+
doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties,
null, false);
return CompletableFuture.completedFuture(null);
}
default:
throw new IllegalStateException("Unknown AckType: " + ackType);
}
}
- private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId,
Map<String, Long> properties) {
- if (acknowledgementGroupTimeMicros == 0 || (properties != null &&
!properties.isEmpty())) {
+ private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId,
Map<String, Long> properties,
+ boolean
queueDueToConnecting) {
+ if (!queueDueToConnecting
+ && (acknowledgementGroupTimeMicros == 0 || (properties != null
&& !properties.isEmpty()))) {
Review Comment:
Agress with @BewareMyPower
I am trying to split this PR into the following.
- part-1: add a new component `AcknowledgmentCache` to support caching the
acknowledgments which include the arg `properties.`
- part-2: split `PersistentAcknowledgmentsGroupingTracker` into two
implementations:
- cache and batch the acks
- immediately ack
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]