poorbarcode commented on code in PR #21250:
URL: https://github.com/apache/pulsar/pull/21250#discussion_r1337399346


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java:
##########
@@ -254,4 +266,125 @@ public void testFlowCountForMultiTopics() throws 
Exception {
 
         assertEquals(numFlow.get(), numPartitions);
     }
+
+    private void trimLedgers(final String tpName) {
+        // Wait for topic loading.
+        org.awaitility.Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
pulsar.getBrokerService().getTopic(tpName, false).join().get();
+            assertNotNull(persistentTopic);
+        });
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(tpName, 
false).join().get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+        CompletableFuture<Void> trimLedgersTask = new CompletableFuture<>();
+        ml.trimConsumedLedgersInBackground(trimLedgersTask);
+        trimLedgersTask.join();
+    }
+
+    @Test
+    public void testTrimLedgerIfNoDurableCursor() throws Exception {
+        final String nonDurableCursor = "non-durable-cursor";
+        final String durableCursor = "durable-cursor";
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(topicName);
+        Reader<String> reader = 
pulsarClient.newReader(Schema.STRING).topic(topicName).receiverQueueSize(1)
+                
.subscriptionName(nonDurableCursor).startMessageId(MessageIdImpl.earliest).create();
+        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName).receiverQueueSize(1)
+                .subscriptionName(durableCursor).subscribe();
+        consumer.close();
+
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        producer.send("1");
+        producer.send("2");
+        producer.send("3");
+        producer.send("4");
+        MessageIdImpl msgIdInDeletedLedger5 = (MessageIdImpl) 
producer.send("5");
+
+        Message<String> msg1 = reader.readNext(2, TimeUnit.SECONDS);
+        assertEquals(msg1.getValue(), "1");
+        Message<String> msg2 = reader.readNext(2, TimeUnit.SECONDS);
+        assertEquals(msg2.getValue(), "2");
+        Message<String> msg3 = reader.readNext(2, TimeUnit.SECONDS);
+        assertEquals(msg3.getValue(), "3");
+
+        // Unsubscribe durable cursor.
+        // Trigger a trim ledgers task, and verify trim ledgers successful.
+        admin.topics().unload(topicName);
+        Thread.sleep(3 * 1000);
+        admin.topics().deleteSubscription(topicName, durableCursor);
+        // Trim ledgers after release durable cursor.
+        trimLedgers(topicName);
+        List<ManagedLedgerInternalStats.LedgerInfo> ledgers = 
admin.topics().getInternalStats(topicName).ledgers;
+        assertEquals(ledgers.size(), 1);
+        assertNotEquals(ledgers.get(0).ledgerId, 
msgIdInDeletedLedger5.getLedgerId());
+
+        // Verify backlog and markDeletePosition is correct.
+        Awaitility.await().untilAsserted(() -> {
+            SubscriptionStats subscriptionStats = 
admin.topics().getStats(topicName, true, true, true)
+                    .getSubscriptions().get(nonDurableCursor);
+            log.info("backlog size: {}", subscriptionStats.getMsgBacklog());
+            assertEquals(subscriptionStats.getMsgBacklog(), 0);
+            ManagedLedgerInternalStats.CursorStats cursorStats =
+                    
admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor);
+            String[] ledgerIdAndEntryId = 
cursorStats.markDeletePosition.split(":");
+            PositionImpl actMarkDeletedPos =
+                    PositionImpl.get(Long.valueOf(ledgerIdAndEntryId[0]), 
Long.valueOf(ledgerIdAndEntryId[1]));
+            PositionImpl expectedMarkDeletedPos =
+                    PositionImpl.get(msgIdInDeletedLedger5.getLedgerId(), 
msgIdInDeletedLedger5.getEntryId());
+            log.info("Expected mark deleted position: {}", 
expectedMarkDeletedPos);
+            log.info("Actual mark deleted position: {}", 
cursorStats.markDeletePosition);
+            
Assert.assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0);
+        });
+
+        // Clear the incoming queue of the reader for next test.
+        while (true) {
+            Message<String> msg = reader.readNext(2, TimeUnit.SECONDS);
+            if (msg == null) {
+                break;
+            }
+        }

Review Comment:
   Good suggestion, this remains me found a bug with `reader. 
hasMessageAvailable`
   - https://github.com/apache/pulsar/pull/21259



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to