codelipenghui commented on code in PR #21250:
URL: https://github.com/apache/pulsar/pull/21250#discussion_r1338065417


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java:
##########
@@ -254,4 +266,125 @@ public void testFlowCountForMultiTopics() throws 
Exception {
 
         assertEquals(numFlow.get(), numPartitions);
     }
+
+    private void trimLedgers(final String tpName) {
+        // Wait for topic loading.
+        org.awaitility.Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
pulsar.getBrokerService().getTopic(tpName, false).join().get();
+            assertNotNull(persistentTopic);
+        });
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(tpName, 
false).join().get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+        CompletableFuture<Void> trimLedgersTask = new CompletableFuture<>();
+        ml.trimConsumedLedgersInBackground(trimLedgersTask);
+        trimLedgersTask.join();
+    }
+
+    @Test
+    public void testTrimLedgerIfNoDurableCursor() throws Exception {
+        final String nonDurableCursor = "non-durable-cursor";
+        final String durableCursor = "durable-cursor";
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(topicName);
+        Reader<String> reader = 
pulsarClient.newReader(Schema.STRING).topic(topicName).receiverQueueSize(1)
+                
.subscriptionName(nonDurableCursor).startMessageId(MessageIdImpl.earliest).create();
+        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName).receiverQueueSize(1)
+                .subscriptionName(durableCursor).subscribe();
+        consumer.close();
+
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        producer.send("1");
+        producer.send("2");
+        producer.send("3");
+        producer.send("4");
+        MessageIdImpl msgIdInDeletedLedger5 = (MessageIdImpl) 
producer.send("5");
+
+        Message<String> msg1 = reader.readNext(2, TimeUnit.SECONDS);
+        assertEquals(msg1.getValue(), "1");
+        Message<String> msg2 = reader.readNext(2, TimeUnit.SECONDS);
+        assertEquals(msg2.getValue(), "2");
+        Message<String> msg3 = reader.readNext(2, TimeUnit.SECONDS);
+        assertEquals(msg3.getValue(), "3");
+
+        // Unsubscribe durable cursor.
+        // Trigger a trim ledgers task, and verify trim ledgers successful.
+        admin.topics().unload(topicName);
+        Thread.sleep(3 * 1000);
+        admin.topics().deleteSubscription(topicName, durableCursor);
+        // Trim ledgers after release durable cursor.
+        trimLedgers(topicName);
+        List<ManagedLedgerInternalStats.LedgerInfo> ledgers = 
admin.topics().getInternalStats(topicName).ledgers;
+        assertEquals(ledgers.size(), 1);
+        assertNotEquals(ledgers.get(0).ledgerId, 
msgIdInDeletedLedger5.getLedgerId());
+
+        // Verify backlog and markDeletePosition is correct.
+        Awaitility.await().untilAsserted(() -> {
+            SubscriptionStats subscriptionStats = 
admin.topics().getStats(topicName, true, true, true)
+                    .getSubscriptions().get(nonDurableCursor);
+            log.info("backlog size: {}", subscriptionStats.getMsgBacklog());
+            assertEquals(subscriptionStats.getMsgBacklog(), 0);
+            ManagedLedgerInternalStats.CursorStats cursorStats =
+                    
admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor);
+            String[] ledgerIdAndEntryId = 
cursorStats.markDeletePosition.split(":");
+            PositionImpl actMarkDeletedPos =
+                    PositionImpl.get(Long.valueOf(ledgerIdAndEntryId[0]), 
Long.valueOf(ledgerIdAndEntryId[1]));
+            PositionImpl expectedMarkDeletedPos =
+                    PositionImpl.get(msgIdInDeletedLedger5.getLedgerId(), 
msgIdInDeletedLedger5.getEntryId());
+            log.info("Expected mark deleted position: {}", 
expectedMarkDeletedPos);
+            log.info("Actual mark deleted position: {}", 
cursorStats.markDeletePosition);
+            
Assert.assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0);
+        });
+
+        // Clear the incoming queue of the reader for next test.
+        while (true) {
+            Message<String> msg = reader.readNext(2, TimeUnit.SECONDS);
+            if (msg == null) {
+                break;
+            }
+        }
+
+        // The following tests are designed to verify the api 
"getNumberOfEntries" and "consumedEntries" still work
+        // after changes.See the code-description added with the PR 
https://github.com/apache/pulsar/pull/10667.
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ml.getCursors().get(nonDurableCursor);
+
+        // Verify "getNumberOfEntries" if there is no entries to consume.
+        assertEquals(0, cursor.getNumberOfEntries());
+        assertEquals(0, ml.getNumberOfEntries());
+
+        // Verify "getNumberOfEntries" if there is 1 entry to consume.
+        producer.send("6");
+        producer.send("7");
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(2, ml.getNumberOfEntries());
+            // Since there is one message has been pulled into the incoming 
queue of reader. There is only one messages
+            // waiting to cursor read.
+            assertEquals(1, cursor.getNumberOfEntries());
+        });
+
+        // Verify "consumedEntries" is correct.
+        ManagedLedgerInternalStats.CursorStats cursorStats =
+                
admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor);
+        // "messagesConsumedCounter" should be 0 after unload the topic.
+        assertEquals(0, cursorStats.messagesConsumedCounter);

Review Comment:
   Ah, ok I through it was messageOutCounter before. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to