poorbarcode commented on code in PR #24622: URL: https://github.com/apache/pulsar/pull/24622#discussion_r2271886162
########## pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/PersistentMessageExpiryMonitorTest.java: ########## @@ -138,4 +147,52 @@ void testConcurrentlyExpireMessages() throws Exception { producer.close(); admin.topics().delete(topicName); } + + /*** + * Verify finding position task only executes once for multiple subscriptions of a topic. + */ + @Test + void testTopicExpireMessages() throws Exception { + // Create topic. + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setMaxEntriesPerLedger(2); + ManagedLedgerImpl ml = (ManagedLedgerImpl) pulsar.getDefaultManagedLedgerFactory() + .open(TopicName.get(topicName).getPersistenceNamingEncoding(), managedLedgerConfig); + long firstLedger = ml.currentLedger.getId(); + final String cursorName1 = "s1"; + final String cursorName2 = "s2"; + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + admin.topics().createSubscriptionAsync(topicName, cursorName1, MessageId.earliest); + admin.topics().createSubscriptionAsync(topicName, cursorName2, MessageId.earliest); + admin.topicPolicies().setMessageTTL(topicName, 1); + // Trigger 3 ledgers creation. + producer.send("1"); + producer.send("2"); + producer.send("4"); + producer.send("5"); + Assert.assertEquals(3, ml.getLedgersInfo().size()); + // Do a injection to count the access of the first ledger. + AtomicInteger accessedCount = new AtomicInteger(); + ReadHandle readHandle = ml.getLedgerHandle(firstLedger).get(); + ReadHandle spyReadHandle = spy(readHandle); + doAnswer(invocationOnMock -> { + long startEntry = (long) invocationOnMock.getArguments()[0]; + if (startEntry == 0) { + accessedCount.incrementAndGet(); + } + return invocationOnMock.callRealMethod(); + }).when(spyReadHandle).readAsync(anyLong(), anyLong()); + ml.ledgerCache.put(firstLedger, CompletableFuture.completedFuture(spyReadHandle)); + // Verify: the first ledger will be accessed only once after expiry for two subscriptions. + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); + persistentTopic.checkMessageExpiry(); + Thread.sleep(2000); + assertEquals(1, accessedCount.get()); + + // cleanup. + producer.close(); + admin.topics().delete(topicName); Review Comment: No, the topic deletion only checks whether there are consumers connected. It does not care about how many subscriptions exixts ########## pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/PersistentMessageExpiryMonitorTest.java: ########## @@ -138,4 +147,52 @@ void testConcurrentlyExpireMessages() throws Exception { producer.close(); admin.topics().delete(topicName); } + + /*** + * Verify finding position task only executes once for multiple subscriptions of a topic. + */ + @Test + void testTopicExpireMessages() throws Exception { + // Create topic. + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setMaxEntriesPerLedger(2); + ManagedLedgerImpl ml = (ManagedLedgerImpl) pulsar.getDefaultManagedLedgerFactory() + .open(TopicName.get(topicName).getPersistenceNamingEncoding(), managedLedgerConfig); + long firstLedger = ml.currentLedger.getId(); + final String cursorName1 = "s1"; + final String cursorName2 = "s2"; + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + admin.topics().createSubscriptionAsync(topicName, cursorName1, MessageId.earliest); + admin.topics().createSubscriptionAsync(topicName, cursorName2, MessageId.earliest); + admin.topicPolicies().setMessageTTL(topicName, 1); + // Trigger 3 ledgers creation. + producer.send("1"); + producer.send("2"); + producer.send("4"); + producer.send("5"); + Assert.assertEquals(3, ml.getLedgersInfo().size()); + // Do a injection to count the access of the first ledger. + AtomicInteger accessedCount = new AtomicInteger(); + ReadHandle readHandle = ml.getLedgerHandle(firstLedger).get(); + ReadHandle spyReadHandle = spy(readHandle); + doAnswer(invocationOnMock -> { + long startEntry = (long) invocationOnMock.getArguments()[0]; + if (startEntry == 0) { + accessedCount.incrementAndGet(); + } + return invocationOnMock.callRealMethod(); + }).when(spyReadHandle).readAsync(anyLong(), anyLong()); + ml.ledgerCache.put(firstLedger, CompletableFuture.completedFuture(spyReadHandle)); + // Verify: the first ledger will be accessed only once after expiry for two subscriptions. + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); + persistentTopic.checkMessageExpiry(); + Thread.sleep(2000); + assertEquals(1, accessedCount.get()); + + // cleanup. + producer.close(); + admin.topics().delete(topicName); Review Comment: No, the topic deletion only checks whether there are consumers connected. It does not care about how many subscriptions exist -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org