BewareMyPower commented on code in PR #25998:
URL: https://github.com/apache/pulsar/pull/25998#discussion_r3395383670
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -471,6 +472,17 @@ public CompletableFuture<Void>
acknowledgeMessageAsync(List<Position> positions,
.attr("position", position)
.log("Cumulative ack on");
AckCallback callback = new AckCallback(previousMarkDeletePosition,
future);
+ if (dispatcher instanceof AbstractDispatcherSingleActiveConsumer
singleConsumerDispatcher) {
+ // For compacted consumer, we should ignore the position that
does not exist in the managed ledger,
+ // otherwise, the `asyncMarkDelete` call could jump the read
position to the active ledger, which will
+ // skip all entries present in the compacted ledger but not
present in the managed ledger.
+ final var consumer =
singleConsumerDispatcher.getActiveConsumer();
+ if (consumer != null
+ && consumer.readCompacted()
+ &&
cursor.getManagedLedger().getOptionalLedgerInfo(position.getLedgerId()).isEmpty())
{
Review Comment:
The returned future is mostly ignored, I'd like to add a warning log instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]