codelipenghui commented on code in PR #24622:
URL: https://github.com/apache/pulsar/pull/24622#discussion_r2271829526


##########
pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/PersistentMessageExpiryMonitorTest.java:
##########
@@ -138,4 +147,52 @@ void testConcurrentlyExpireMessages() throws Exception {
         producer.close();
         admin.topics().delete(topicName);
     }
+
+    /***
+     * Verify finding position task only executes once for multiple 
subscriptions of a topic.
+     */
+    @Test
+    void testTopicExpireMessages() throws Exception {
+        // Create topic.
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setMaxEntriesPerLedger(2);
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
pulsar.getDefaultManagedLedgerFactory()
+                .open(TopicName.get(topicName).getPersistenceNamingEncoding(), 
managedLedgerConfig);
+        long firstLedger = ml.currentLedger.getId();
+        final String cursorName1 = "s1";
+        final String cursorName2 = "s2";
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        admin.topics().createSubscriptionAsync(topicName, cursorName1, 
MessageId.earliest);
+        admin.topics().createSubscriptionAsync(topicName, cursorName2, 
MessageId.earliest);
+        admin.topicPolicies().setMessageTTL(topicName, 1);
+        // Trigger 3 ledgers creation.
+        producer.send("1");
+        producer.send("2");
+        producer.send("4");
+        producer.send("5");
+        Assert.assertEquals(3, ml.getLedgersInfo().size());
+        // Do a injection to count the access of the first ledger.
+        AtomicInteger accessedCount = new AtomicInteger();
+        ReadHandle readHandle = ml.getLedgerHandle(firstLedger).get();
+        ReadHandle spyReadHandle = spy(readHandle);
+        doAnswer(invocationOnMock -> {
+            long startEntry = (long) invocationOnMock.getArguments()[0];
+            if (startEntry == 0) {
+                accessedCount.incrementAndGet();
+            }
+            return invocationOnMock.callRealMethod();
+        }).when(spyReadHandle).readAsync(anyLong(), anyLong());
+        ml.ledgerCache.put(firstLedger, 
CompletableFuture.completedFuture(spyReadHandle));
+        // Verify: the first ledger will be accessed only once after expiry 
for two subscriptions.
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        persistentTopic.checkMessageExpiry();
+        Thread.sleep(2000);
+        assertEquals(1, accessedCount.get());
+
+        // cleanup.
+        producer.close();
+        admin.topics().delete(topicName);

Review Comment:
   Interesting.. This operation should get failed since there are subscription 
still exist?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -2118,19 +2119,75 @@ private CompletableFuture<Void> 
checkShadowReplication() {
     @Override
     public void checkMessageExpiry() {
         int messageTtlInSeconds = topicPolicies.getMessageTTLInSeconds().get();
-        if (messageTtlInSeconds != 0) {
+        if (messageTtlInSeconds <= 0) {
+            return;
+        }
+
+        // Fallback to the slower solution if managed ledger is not an 
instance of ManagedLedgerImpl: each
+        // subscription find position and handle expiring itself.
+        ManagedLedger managedLedger = getManagedLedger();
+        if (!(managedLedger instanceof ManagedLedgerImpl ml)) {
+            subscriptionsCheckMessageExpiryEachOther(messageTtlInSeconds);
+            return;
+        }
+
+        // Find the target position at one time, then expire all subscriptions 
and replicators.
+        ManagedCursor cursor = 
ml.getCursors().getCursorWithOldestPosition().getCursor();
+        PersistentMessageFinder finder = new PersistentMessageFinder(topic, 
cursor, brokerService.getPulsar()
+                
.getConfig().getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis());
+        // Find the target position.
+        long expiredMessageTimestamp = System.currentTimeMillis() - 
TimeUnit.SECONDS.toMillis(messageTtlInSeconds);
+        CompletableFuture<Position> positionToMarkDelete = new 
CompletableFuture<>();
+        finder.findMessages(expiredMessageTimestamp, new 
AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionToMarkDelete.complete(position);
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition,
+                                        Object ctx) {
+                log.error("[{}] Error finding expired position, failed reading 
position is {}", topic,
+                        failedReadPosition.orElse(null), exception);
+                // Since we have logged the error, we can skip to print error 
log at next step.
+                positionToMarkDelete.complete(null);
+            }
+        });
+        positionToMarkDelete.thenAccept(position -> {
+            if (position == null) {
+                // Nothing need to be expired.
+                return;
+            }
+            // Expire messages by position, which is more efficient.
             subscriptions.forEach((__, sub) -> {
                 if (!isCompactionSubscription(sub.getName())
                         && (additionalSystemCursorNames.isEmpty()
-                            || 
!additionalSystemCursorNames.contains(sub.getName()))) {
-                   sub.expireMessagesAsync(messageTtlInSeconds);
+                        || 
!additionalSystemCursorNames.contains(sub.getName()))) {
+                    sub.expireMessages(position);

Review Comment:
   I noticed expire message by position will try to get the previous position 
of `position`. It seems a behavior change with the current implementation?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -2118,19 +2119,75 @@ private CompletableFuture<Void> 
checkShadowReplication() {
     @Override
     public void checkMessageExpiry() {
         int messageTtlInSeconds = topicPolicies.getMessageTTLInSeconds().get();
-        if (messageTtlInSeconds != 0) {
+        if (messageTtlInSeconds <= 0) {
+            return;
+        }
+
+        // Fallback to the slower solution if managed ledger is not an 
instance of ManagedLedgerImpl: each
+        // subscription find position and handle expiring itself.
+        ManagedLedger managedLedger = getManagedLedger();
+        if (!(managedLedger instanceof ManagedLedgerImpl ml)) {
+            subscriptionsCheckMessageExpiryEachOther(messageTtlInSeconds);
+            return;
+        }
+
+        // Find the target position at one time, then expire all subscriptions 
and replicators.
+        ManagedCursor cursor = 
ml.getCursors().getCursorWithOldestPosition().getCursor();
+        PersistentMessageFinder finder = new PersistentMessageFinder(topic, 
cursor, brokerService.getPulsar()
+                
.getConfig().getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis());
+        // Find the target position.
+        long expiredMessageTimestamp = System.currentTimeMillis() - 
TimeUnit.SECONDS.toMillis(messageTtlInSeconds);
+        CompletableFuture<Position> positionToMarkDelete = new 
CompletableFuture<>();
+        finder.findMessages(expiredMessageTimestamp, new 
AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionToMarkDelete.complete(position);
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition,
+                                        Object ctx) {
+                log.error("[{}] Error finding expired position, failed reading 
position is {}", topic,
+                        failedReadPosition.orElse(null), exception);
+                // Since we have logged the error, we can skip to print error 
log at next step.
+                positionToMarkDelete.complete(null);
+            }
+        });
+        positionToMarkDelete.thenAccept(position -> {
+            if (position == null) {
+                // Nothing need to be expired.
+                return;
+            }
+            // Expire messages by position, which is more efficient.
             subscriptions.forEach((__, sub) -> {
                 if (!isCompactionSubscription(sub.getName())
                         && (additionalSystemCursorNames.isEmpty()
-                            || 
!additionalSystemCursorNames.contains(sub.getName()))) {
-                   sub.expireMessagesAsync(messageTtlInSeconds);
+                        || 
!additionalSystemCursorNames.contains(sub.getName()))) {
+                    sub.expireMessages(position);
                 }
             });
             replicators.forEach((__, replicator)
-                    -> ((PersistentReplicator) 
replicator).expireMessagesAsync(messageTtlInSeconds));
+                    -> ((PersistentReplicator) 
replicator).expireMessages(position));
             shadowReplicators.forEach((__, replicator)
-                    -> ((PersistentReplicator) 
replicator).expireMessagesAsync(messageTtlInSeconds));
-        }
+                    -> ((PersistentReplicator) 
replicator).expireMessages(position));
+        }).exceptionally(ex -> {
+            log.error("[{}] Failed to expire messages by position", topic, ex);
+            return null;
+        });
+    }
+
+    private void subscriptionsCheckMessageExpiryEachOther(int 
messageTtlInSeconds) {

Review Comment:
   I would like suggest to have 2 methods
   
   - checkMessageExpiryWithoutSharedPosition()
   - checkMessageExpiryWithSharedPosition()
   
   So that checkMessageExpiry() method can call above 2 methods, which should 
be better for code organization?



##########
pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/PersistentMessageExpiryMonitorTest.java:
##########
@@ -138,4 +147,52 @@ void testConcurrentlyExpireMessages() throws Exception {
         producer.close();
         admin.topics().delete(topicName);
     }
+
+    /***
+     * Verify finding position task only executes once for multiple 
subscriptions of a topic.
+     */
+    @Test
+    void testTopicExpireMessages() throws Exception {
+        // Create topic.
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setMaxEntriesPerLedger(2);
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
pulsar.getDefaultManagedLedgerFactory()
+                .open(TopicName.get(topicName).getPersistenceNamingEncoding(), 
managedLedgerConfig);
+        long firstLedger = ml.currentLedger.getId();
+        final String cursorName1 = "s1";
+        final String cursorName2 = "s2";
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        admin.topics().createSubscriptionAsync(topicName, cursorName1, 
MessageId.earliest);
+        admin.topics().createSubscriptionAsync(topicName, cursorName2, 
MessageId.earliest);
+        admin.topicPolicies().setMessageTTL(topicName, 1);
+        // Trigger 3 ledgers creation.
+        producer.send("1");
+        producer.send("2");
+        producer.send("4");
+        producer.send("5");
+        Assert.assertEquals(3, ml.getLedgersInfo().size());
+        // Do a injection to count the access of the first ledger.
+        AtomicInteger accessedCount = new AtomicInteger();
+        ReadHandle readHandle = ml.getLedgerHandle(firstLedger).get();
+        ReadHandle spyReadHandle = spy(readHandle);
+        doAnswer(invocationOnMock -> {
+            long startEntry = (long) invocationOnMock.getArguments()[0];
+            if (startEntry == 0) {
+                accessedCount.incrementAndGet();
+            }
+            return invocationOnMock.callRealMethod();
+        }).when(spyReadHandle).readAsync(anyLong(), anyLong());
+        ml.ledgerCache.put(firstLedger, 
CompletableFuture.completedFuture(spyReadHandle));
+        // Verify: the first ledger will be accessed only once after expiry 
for two subscriptions.
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        persistentTopic.checkMessageExpiry();
+        Thread.sleep(2000);
+        assertEquals(1, accessedCount.get());

Review Comment:
   Is it better to use Awaitibility instead of Thread.sleep?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to