void-ptr974 commented on code in PR #25998:
URL: https://github.com/apache/pulsar/pull/25998#discussion_r3400031174


##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -2648,4 +2664,74 @@ private void triggerAndWaitCompaction(String topic) 
throws Exception {
         Awaitility.await().untilAsserted(() -> assertEquals(
                 admin.topics().compactionStatus(topic).status, 
LongRunningProcessStatus.Status.SUCCESS));
     }
+
+    @Test
+    public void testReaderReadOnDeletedLedger() throws Exception {
+        final var topic = 
"persistent://my-tenant/my-ns/reader-read-on-deleted-ledger";
+        try (final var producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create()) {
+            for (int i = 0; i < 3; i++) {
+                producer.newMessage().key("key-" + i).value("value-" + 
i).send();
+            }
+        }
+        // Trigger the ledger rollover
+        var ml = (ManagedLedgerImpl) ((PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(topic).get()
+                .orElseThrow()).getManagedLedger();
+        ml.getConfig().setMaxEntriesPerLedger(1);
+        ml.getConfig().setMaxSizePerLedgerMb(0);
+        ml.getConfig().setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+        ml.rollCurrentLedgerIfFull();
+        Awaitility.await().untilAsserted(() -> 
assertEquals(ml.getLedgersInfo().size(), 2));
+
+        final var subName = "sub-" + System.currentTimeMillis();
+        @Cleanup final var reader = 
pulsarClient.newReader(Schema.STRING).readCompacted(true).topic(topic)
+                .subscriptionName(subName)
+                .startMessageId(MessageId.earliest).create();
+
+        // Slow down the pre-fetching
+        
pulsarTestContext.getMockBookKeeper().setDefaultReadEntriesDelayMillis(500);
+
+        // Receive 1 message so that the startMessageId will be reset to 
ledger_id:0 after reconnection
+        assertTrue(reader.hasMessageAvailable());
+        final var firstMsg = reader.readNext(3, TimeUnit.SECONDS);
+        assertNotNull(firstMsg);
+
+        triggerAndWaitCompaction(topic);
+
+        // Trigger the reconnection and trim the first ledger.
+        admin.namespaces().unload("my-tenant/my-ns");
+        // Simulate the pending cumulative acknowledgment is flushed after the 
consumer is created
+        // We don't need such interception if we can support controlling the 
acknowledgment flush for reader.
+        final var firstTime = new AtomicBoolean(true);
+        consumerCreated = serverConsumer -> {

Review Comment:
   I might be missing some timing detail here, but since this hook is installed 
after `unload()`, the reader could reconnect and create the server-side 
consumer before the hook is active. In that case the delayed cumulative ACK 
would not be injected, and the test could still pass without covering the 
regression path. Would it be safer to install the hook before `unload()` and 
assert it fired?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to