BewareMyPower commented on code in PR #25998:
URL: https://github.com/apache/pulsar/pull/25998#discussion_r3400143729


##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -2648,4 +2664,74 @@ private void triggerAndWaitCompaction(String topic) 
throws Exception {
         Awaitility.await().untilAsserted(() -> assertEquals(
                 admin.topics().compactionStatus(topic).status, 
LongRunningProcessStatus.Status.SUCCESS));
     }
+
+    @Test
+    public void testReaderReadOnDeletedLedger() throws Exception {
+        final var topic = 
"persistent://my-tenant/my-ns/reader-read-on-deleted-ledger";
+        try (final var producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create()) {
+            for (int i = 0; i < 3; i++) {
+                producer.newMessage().key("key-" + i).value("value-" + 
i).send();
+            }
+        }
+        // Trigger the ledger rollover
+        var ml = (ManagedLedgerImpl) ((PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(topic).get()
+                .orElseThrow()).getManagedLedger();
+        ml.getConfig().setMaxEntriesPerLedger(1);
+        ml.getConfig().setMaxSizePerLedgerMb(0);
+        ml.getConfig().setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+        ml.rollCurrentLedgerIfFull();
+        Awaitility.await().untilAsserted(() -> 
assertEquals(ml.getLedgersInfo().size(), 2));
+
+        final var subName = "sub-" + System.currentTimeMillis();
+        @Cleanup final var reader = 
pulsarClient.newReader(Schema.STRING).readCompacted(true).topic(topic)
+                .subscriptionName(subName)
+                .startMessageId(MessageId.earliest).create();
+
+        // Slow down the pre-fetching
+        
pulsarTestContext.getMockBookKeeper().setDefaultReadEntriesDelayMillis(500);
+
+        // Receive 1 message so that the startMessageId will be reset to 
ledger_id:0 after reconnection
+        assertTrue(reader.hasMessageAvailable());
+        final var firstMsg = reader.readNext(3, TimeUnit.SECONDS);
+        assertNotNull(firstMsg);
+
+        triggerAndWaitCompaction(topic);
+
+        // Trigger the reconnection and trim the first ledger.
+        admin.namespaces().unload("my-tenant/my-ns");
+        // Simulate the pending cumulative acknowledgment is flushed after the 
consumer is created
+        // We don't need such interception if we can support controlling the 
acknowledgment flush for reader.
+        final var firstTime = new AtomicBoolean(true);
+        consumerCreated = serverConsumer -> {

Review Comment:
   Anyway, advance this modification before unload should be more safe, I've 
updated it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to