BewareMyPower commented on code in PR #25998:
URL: https://github.com/apache/pulsar/pull/25998#discussion_r3396302887


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -471,6 +472,22 @@ public CompletableFuture<Void> 
acknowledgeMessageAsync(List<Position> positions,
                     .attr("position", position)
                     .log("Cumulative ack on");
             AckCallback callback = new AckCallback(previousMarkDeletePosition, 
future);
+            if (dispatcher instanceof AbstractDispatcherSingleActiveConsumer 
singleConsumerDispatcher) {
+                // For compacted consumer, we should ignore the position that 
does not exist in the managed ledger,
+                // otherwise, the `asyncMarkDelete` call could jump the read 
position to the active ledger, which will
+                // skip all entries present in the compacted ledger but not 
present in the managed ledger.
+                final var consumer = 
singleConsumerDispatcher.getActiveConsumer();
+                final var ml = cursor.getManagedLedger();
+                if (consumer != null
+                        && consumer.readCompacted()
+                        && 
ml.getOptionalLedgerInfo(position.getLedgerId()).isEmpty()) {
+                    if (ml.getFirstPosition() == null || 
position.getLedgerId() > ml.getFirstPosition().getLedgerId()) {
+                        log.warn("Received an ACK whose position is " + 
position + ", valid ledgers: "
+                                + ml.getLedgersInfo().keySet());
+                    }
+                    return CompletableFuture.completedFuture(null);
+                }

Review Comment:
   Yes, applied.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to