BewareMyPower commented on code in PR #21928:
URL: https://github.com/apache/pulsar/pull/21928#discussion_r1458247191
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -231,29 +231,31 @@ private CompletableFuture<Void>
addAcknowledgment(MessageIdAdv msgId,
case Individual:
return addIndividualAcknowledgment(msgId,
batchMessageId,
- __ -> doIndividualAck(__, properties),
- __ -> doIndividualBatchAck(__, properties));
+ __ -> doIndividualAck(__, properties, false),
+ __ -> doIndividualBatchAck(__, properties, false));
case Cumulative:
if (batchMessageId != null) {
consumer.onAcknowledgeCumulative(batchMessageId, null);
} else {
consumer.onAcknowledgeCumulative(msgId, null);
}
if (batchMessageId == null ||
MessageIdAdvUtils.acknowledge(batchMessageId, false)) {
- return doCumulativeAck(msgId, properties, null);
+ return doCumulativeAck(msgId, properties, null, false);
} else if (batchIndexAckEnabled) {
- return doCumulativeBatchIndexAck(batchMessageId,
properties);
+ return doCumulativeBatchIndexAck(batchMessageId,
properties, false);
} else {
-
doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties,
null);
+
doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties,
null, false);
return CompletableFuture.completedFuture(null);
}
default:
throw new IllegalStateException("Unknown AckType: " + ackType);
}
}
- private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId,
Map<String, Long> properties) {
- if (acknowledgementGroupTimeMicros == 0 || (properties != null &&
!properties.isEmpty())) {
+ private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId,
Map<String, Long> properties,
+ boolean
queueDueToConnecting) {
Review Comment:
Method invocations like `f(args..., true)` and `f(args..., false)` are hard
to read unless you jump to the implementation of `f` and see what the boolean
parameter means.
You should add a new method like `queueIndividualAck` and call it directly.
For example,
```java
private CompletableFuture<Void> queueIndividualAck(MessageIdAdv
messageId) {
Optional<Lock> readLock = acquireReadLock();
try {
doIndividualAckAsync(messageId);
return readLock.map(__ ->
currentIndividualAckFuture).orElse(CompletableFuture.completedFuture(null));
} finally {
readLock.ifPresent(Lock::unlock);
if (pendingIndividualAcks.size() >= maxAckGroupSize) {
flush();
}
}
}
private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId,
Map<String, Long> properties) {
if (acknowledgementGroupTimeMicros == 0 || (properties != null &&
!properties.isEmpty())) {
// We cannot group acks if the delay is 0 or when there are
properties attached to it. Fortunately that's an
// uncommon condition since it's only used for the compaction
subscription.
return doImmediateAck(messageId, AckType.Individual, properties,
null);
} else {
return queueIndividualAck(messageId);
}
}
```
Then you don't have to add `false` argument to all existing
`doIndividualAck` calls. And in `doImmediateAck`, you can call
`queueIndividualAck` directly.
```java
private CompletableFuture<Void> doImmediateAck(MessageIdAdv msgId,
AckType ackType, Map<String, Long> properties,
BitSetRecyclable bitSet) {
ClientCnx cnx = consumer.getClientCnx();
if (cnx == null && consumer.getState() ==
HandlerState.State.Connecting) {
if (ackType == AckType.Cumulative) {
return queueCumulativeAck(msgId);
} else {
return queueIndividualAck(msgId);
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]