BewareMyPower commented on code in PR #21928:
URL: https://github.com/apache/pulsar/pull/21928#discussion_r1461256290
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -231,29 +231,31 @@ private CompletableFuture<Void>
addAcknowledgment(MessageIdAdv msgId,
case Individual:
return addIndividualAcknowledgment(msgId,
batchMessageId,
- __ -> doIndividualAck(__, properties),
- __ -> doIndividualBatchAck(__, properties));
+ __ -> doIndividualAck(__, properties, false),
+ __ -> doIndividualBatchAck(__, properties, false));
case Cumulative:
if (batchMessageId != null) {
consumer.onAcknowledgeCumulative(batchMessageId, null);
} else {
consumer.onAcknowledgeCumulative(msgId, null);
}
if (batchMessageId == null ||
MessageIdAdvUtils.acknowledge(batchMessageId, false)) {
- return doCumulativeAck(msgId, properties, null);
+ return doCumulativeAck(msgId, properties, null, false);
} else if (batchIndexAckEnabled) {
- return doCumulativeBatchIndexAck(batchMessageId,
properties);
+ return doCumulativeBatchIndexAck(batchMessageId,
properties, false);
} else {
-
doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties,
null);
+
doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties,
null, false);
return CompletableFuture.completedFuture(null);
}
default:
throw new IllegalStateException("Unknown AckType: " + ackType);
}
}
- private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId,
Map<String, Long> properties) {
- if (acknowledgementGroupTimeMicros == 0 || (properties != null &&
!properties.isEmpty())) {
+ private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId,
Map<String, Long> properties,
+ boolean
queueDueToConnecting) {
+ if (!queueDueToConnecting
+ && (acknowledgementGroupTimeMicros == 0 || (properties != null
&& !properties.isEmpty()))) {
Review Comment:
> split PersistentAcknowledgmentsGroupingTracker into two implementations
It's reasonable. BTW, the design of the C++ client is better that:
-
[AckGroupingEnabled](https://github.com/apache/pulsar-client-cpp/blob/main/lib/AckGroupingTrackerEnabled.cc):
cache the acknowledgments and flush them due to the timeout or count
-
[AckGrouingDisabled](https://github.com/apache/pulsar-client-cpp/blob/main/lib/AckGroupingTrackerDisabled.cc):
acknowledge immediately
https://github.com/apache/pulsar-client-cpp/blob/72b7311aeef32e28a28e926da686aaf948e8f948/lib/ConsumerImpl.cc#L201C1-L215C6
Though the naming is not good (but consistent with other impl classes in the
library)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]