BewareMyPower commented on code in PR #25998:
URL: https://github.com/apache/pulsar/pull/25998#discussion_r3400133632
##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -2648,4 +2664,74 @@ private void triggerAndWaitCompaction(String topic)
throws Exception {
Awaitility.await().untilAsserted(() -> assertEquals(
admin.topics().compactionStatus(topic).status,
LongRunningProcessStatus.Status.SUCCESS));
}
+
+ @Test
+ public void testReaderReadOnDeletedLedger() throws Exception {
+ final var topic =
"persistent://my-tenant/my-ns/reader-read-on-deleted-ledger";
+ try (final var producer =
pulsarClient.newProducer(Schema.STRING).topic(topic).create()) {
+ for (int i = 0; i < 3; i++) {
+ producer.newMessage().key("key-" + i).value("value-" +
i).send();
+ }
+ }
+ // Trigger the ledger rollover
+ var ml = (ManagedLedgerImpl) ((PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(topic).get()
+ .orElseThrow()).getManagedLedger();
+ ml.getConfig().setMaxEntriesPerLedger(1);
+ ml.getConfig().setMaxSizePerLedgerMb(0);
+ ml.getConfig().setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+ ml.rollCurrentLedgerIfFull();
+ Awaitility.await().untilAsserted(() ->
assertEquals(ml.getLedgersInfo().size(), 2));
+
+ final var subName = "sub-" + System.currentTimeMillis();
+ @Cleanup final var reader =
pulsarClient.newReader(Schema.STRING).readCompacted(true).topic(topic)
+ .subscriptionName(subName)
+ .startMessageId(MessageId.earliest).create();
+
+ // Slow down the pre-fetching
+
pulsarTestContext.getMockBookKeeper().setDefaultReadEntriesDelayMillis(500);
+
+ // Receive 1 message so that the startMessageId will be reset to
ledger_id:0 after reconnection
+ assertTrue(reader.hasMessageAvailable());
+ final var firstMsg = reader.readNext(3, TimeUnit.SECONDS);
+ assertNotNull(firstMsg);
+
+ triggerAndWaitCompaction(topic);
+
+ // Trigger the reconnection and trim the first ledger.
+ admin.namespaces().unload("my-tenant/my-ns");
+ // Simulate the pending cumulative acknowledgment is flushed after the
consumer is created
+ // We don't need such interception if we can support controlling the
acknowledgment flush for reader.
+ final var firstTime = new AtomicBoolean(true);
+ consumerCreated = serverConsumer -> {
Review Comment:
reader's initial reconnect delay is 100 ms, this race should hardly happen.
In addition, the correctness to reproduce this issue also depends on the
trim operation later. if the read happened before that, the test could also
succeed without the fix.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]