poorbarcode commented on code in PR #25998:
URL: https://github.com/apache/pulsar/pull/25998#discussion_r3395195700


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -471,6 +472,17 @@ public CompletableFuture<Void> 
acknowledgeMessageAsync(List<Position> positions,
                     .attr("position", position)
                     .log("Cumulative ack on");
             AckCallback callback = new AckCallback(previousMarkDeletePosition, 
future);
+            if (dispatcher instanceof AbstractDispatcherSingleActiveConsumer 
singleConsumerDispatcher) {
+                // For compacted consumer, we should ignore the position that 
does not exist in the managed ledger,
+                // otherwise, the `asyncMarkDelete` call could jump the read 
position to the active ledger, which will
+                // skip all entries present in the compacted ledger but not 
present in the managed ledger.
+                final var consumer = 
singleConsumerDispatcher.getActiveConsumer();
+                if (consumer != null
+                        && consumer.readCompacted()
+                        && 
cursor.getManagedLedger().getOptionalLedgerInfo(position.getLedgerId()).isEmpty())
 {

Review Comment:
   Should we compare the position being acked with the first ledger in 
`ledgers`? 
   - less than: skip this acknowledgement, which is the same as the current PR 
does
   - otherwise: throw an error, at least print a warning log, which means the 
users are doing an unexpected ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to