BewareMyPower commented on code in PR #21928:
URL: https://github.com/apache/pulsar/pull/21928#discussion_r1458251277
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -231,29 +231,31 @@ private CompletableFuture<Void>
addAcknowledgment(MessageIdAdv msgId,
case Individual:
return addIndividualAcknowledgment(msgId,
batchMessageId,
- __ -> doIndividualAck(__, properties),
- __ -> doIndividualBatchAck(__, properties));
+ __ -> doIndividualAck(__, properties, false),
+ __ -> doIndividualBatchAck(__, properties, false));
case Cumulative:
if (batchMessageId != null) {
consumer.onAcknowledgeCumulative(batchMessageId, null);
} else {
consumer.onAcknowledgeCumulative(msgId, null);
}
if (batchMessageId == null ||
MessageIdAdvUtils.acknowledge(batchMessageId, false)) {
- return doCumulativeAck(msgId, properties, null);
+ return doCumulativeAck(msgId, properties, null, false);
} else if (batchIndexAckEnabled) {
- return doCumulativeBatchIndexAck(batchMessageId,
properties);
+ return doCumulativeBatchIndexAck(batchMessageId,
properties, false);
} else {
-
doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties,
null);
+
doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties,
null, false);
return CompletableFuture.completedFuture(null);
}
default:
throw new IllegalStateException("Unknown AckType: " + ackType);
}
}
- private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId,
Map<String, Long> properties) {
- if (acknowledgementGroupTimeMicros == 0 || (properties != null &&
!properties.isEmpty())) {
+ private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId,
Map<String, Long> properties,
+ boolean
queueDueToConnecting) {
+ if (!queueDueToConnecting
+ && (acknowledgementGroupTimeMicros == 0 || (properties != null
&& !properties.isEmpty()))) {
Review Comment:
@codelipenghui Yes. But this behavior is only added for the
`TwoPhaseCompactor` to avoid latency when seeking, the client side has no way
to attach properties. see
https://github.com/apache/pulsar/blob/55520bd810d29991d4af2124151c8d75c25b0800/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java#L229-L230
Currently, if `acknowledge` is called during reconnection,
`phaseTwoSeekThenLoop` will fail.
From my perspective, we should also queue the ACK requests and flush them
after connected. @poorbarcode
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]