This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d2b2942d8e01f78e2f991ea3997ee3613ad9d1c1
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Fri Aug 8 12:46:17 2025 +0800

    [improve][broker]Remove block calling that named cursor.asyncGetNth when 
expiring messages (#24606)
    
    (cherry picked from commit d275bd4004ab4d11e9527e10c3069126689ca10a)
---
 .../pulsar/broker/service/MessageExpirer.java      |   3 +
 .../nonpersistent/NonPersistentSubscription.java   |   6 ++
 .../persistent/PersistentMessageExpiryMonitor.java |   6 ++
 .../service/persistent/PersistentReplicator.java   |  17 ++++
 .../service/persistent/PersistentSubscription.java |  20 ++++
 .../broker/service/persistent/PersistentTopic.java |  41 +++++++-
 .../service/PersistentMessageFinderTest.java       | 110 +++++++++++++++++++++
 .../pulsar/broker/service/ReplicatorTest.java      |  30 ++++++
 8 files changed, 230 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java
index 7cb1d2a904a..3008717a3df 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.common.classification.InterfaceStability;
 
@@ -27,4 +28,6 @@ public interface MessageExpirer {
     boolean expireMessages(Position position);
 
     boolean expireMessages(int messageTTLInSeconds);
+
+    CompletableFuture<Boolean> expireMessagesAsync(int messageTTLInSeconds);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index cfe05cc32b7..b9083d68046 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -472,6 +472,12 @@ public class NonPersistentSubscription extends 
AbstractSubscription {
                 + " non-persistent topic.");
     }
 
+    @Override
+    public CompletableFuture<Boolean> expireMessagesAsync(int 
messageTTLInSeconds) {
+        return CompletableFuture.failedFuture(new 
UnsupportedOperationException("Expire message by timestamp is not"
+                + " supported for non-persistent topic."));
+    }
+
     @Override
     public boolean expireMessages(Position position) {
         throw new UnsupportedOperationException("Expire message by position is 
not supported for"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 7a18f4abe47..93b03145cad 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.SortedMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
@@ -80,6 +81,11 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
                 && 
this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData();
     }
 
+    @Override
+    public CompletableFuture<Boolean> expireMessagesAsync(int 
messageTTLInSeconds) {
+        return CompletableFuture.supplyAsync(() -> 
expireMessages(messageTTLInSeconds), topic.getOrderedExecutor());
+    }
+
     @Override
     public boolean expireMessages(int messageTTLInSeconds) {
         if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) 
{
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index cf3a71f7d73..fffbb1b6d3d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -685,6 +685,23 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         return expiryMonitor.expireMessages(messageTTLInSeconds);
     }
 
+    @Override
+    public CompletableFuture<Boolean> expireMessagesAsync(int 
messageTTLInSeconds) {
+        long backlog = cursor.getNumberOfEntriesInBacklog(false);
+        if (backlog == 0) {
+            return CompletableFuture.completedFuture(false);
+        } else if (backlog < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK) {
+            return topic.isOldestMessageExpiredAsync(cursor, 
messageTTLInSeconds).thenCompose(oldestMsgExpired -> {
+                if (oldestMsgExpired) {
+                    return 
expiryMonitor.expireMessagesAsync(messageTTLInSeconds);
+                } else {
+                    return CompletableFuture.completedFuture(false);
+                }
+            });
+        }
+        return expiryMonitor.expireMessagesAsync(messageTTLInSeconds);
+    }
+
     @Override
     public boolean expireMessages(Position position) {
         return expiryMonitor.expireMessages(position);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 86c4251f371..b3834017395 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1208,6 +1208,26 @@ public class PersistentSubscription extends 
AbstractSubscription {
         return expiryMonitor.expireMessages(messageTTLInSeconds);
     }
 
+    @Override
+    public CompletableFuture<Boolean> expireMessagesAsync(int 
messageTTLInSeconds) {
+        long backlog = getNumberOfEntriesInBacklog(false);
+        if (backlog == 0) {
+            return CompletableFuture.completedFuture(false);
+        }
+        if (dispatcher != null && dispatcher.isConsumerConnected() && backlog 
< MINIMUM_BACKLOG_FOR_EXPIRY_CHECK) {
+            return topic.isOldestMessageExpiredAsync(cursor, 
messageTTLInSeconds)
+                .thenCompose(oldestMsgExpired -> {
+                    if (oldestMsgExpired) {
+                        this.lastExpireTimestamp = System.currentTimeMillis();
+                        return 
expiryMonitor.expireMessagesAsync(messageTTLInSeconds);
+                    } else {
+                        return CompletableFuture.completedFuture(false);
+                    }
+            });
+        }
+        return expiryMonitor.expireMessagesAsync(messageTTLInSeconds);
+    }
+
     @Override
     public boolean expireMessages(Position position) {
         this.lastExpireTimestamp = System.currentTimeMillis();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2b7221600e9..d513c62ecb5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -30,6 +30,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.FastThreadLocal;
+import java.io.IOException;
 import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -2122,13 +2123,13 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 if (!isCompactionSubscription(sub.getName())
                         && (additionalSystemCursorNames.isEmpty()
                             || 
!additionalSystemCursorNames.contains(sub.getName()))) {
-                   sub.expireMessages(messageTtlInSeconds);
+                   sub.expireMessagesAsync(messageTtlInSeconds);
                 }
             });
             replicators.forEach((__, replicator)
-                    -> ((PersistentReplicator) 
replicator).expireMessages(messageTtlInSeconds));
+                    -> ((PersistentReplicator) 
replicator).expireMessagesAsync(messageTtlInSeconds));
             shadowReplicators.forEach((__, replicator)
-                    -> ((PersistentReplicator) 
replicator).expireMessages(messageTtlInSeconds));
+                    -> ((PersistentReplicator) 
replicator).expireMessagesAsync(messageTtlInSeconds));
         }
     }
 
@@ -3931,6 +3932,40 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return isOldestMessageExpired;
     }
 
+    public CompletableFuture<Boolean> 
isOldestMessageExpiredAsync(ManagedCursor cursor, int messageTTLInSeconds) {
+        CompletableFuture<Boolean> res = new CompletableFuture<>();
+        cursor.asyncGetNthEntry(1, IndividualDeletedEntries.Include, new 
AsyncCallbacks.ReadEntryCallback() {
+
+            @Override
+            public void readEntryComplete(Entry entry, Object ctx) {
+                long entryTimestamp = 0;
+                try {
+                    entryTimestamp = 
Commands.getEntryTimestamp(entry.getDataBuffer());
+                    res.complete(MessageImpl.isEntryExpired(
+                            (int) (messageTTLInSeconds * 
MESSAGE_EXPIRY_THRESHOLD), entryTimestamp));
+                } catch (IOException e) {
+                    log.warn("[{}] [{}] Error while getting the oldest 
message", topic, cursor.toString(), e);
+                    res.complete(false);
+                }
+
+            }
+
+            @Override
+            public void readEntryFailed(ManagedLedgerException e, Object ctx) {
+                if 
(brokerService.pulsar().getConfiguration().isAutoSkipNonRecoverableData()
+                        && e instanceof NonRecoverableLedgerException) {
+                    // NonRecoverableLedgerException means the ledger or entry 
can't be read anymore.
+                    // if AutoSkipNonRecoverableData is set to true, just 
return true here.
+                    res.complete(true);
+                } else {
+                    log.warn("[{}] [{}] Error while getting the oldest 
message", topic, cursor.toString(), e);
+                    res.complete(false);
+                }
+            }
+        }, null);
+        return res;
+    }
+
     /**
      * Clears backlog for all cursors in the topic.
      *
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 93b061f8420..135829160de 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -61,6 +62,8 @@ import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import 
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -450,6 +453,62 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
 
     }
 
+    /**
+     * It tests that message expiry doesn't get stuck if it can't read deleted 
ledger's entry.
+     */
+    @Test
+    void testMessageExpiryAsyncWithTimestampNonRecoverableException() throws 
Exception {
+
+        final String ledgerAndCursorName = 
"testPersistentMessageExpiryWithNonRecoverableLedgers";
+        final int entriesPerLedger = 2;
+        final int totalEntries = 10;
+        final int ttlSeconds = 1;
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setRetentionSizeInMB(10);
+        config.setMaxEntriesPerLedger(entriesPerLedger);
+        config.setRetentionTime(1, TimeUnit.HOURS);
+        config.setAutoSkipNonRecoverableData(true);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open(ledgerAndCursorName, config);
+        ManagedCursorImpl c1 = (ManagedCursorImpl) 
ledger.openCursor(ledgerAndCursorName);
+
+        for (int i = 0; i < totalEntries; i++) {
+            ledger.addEntry(createMessageWrittenToLedger("msg" + i));
+        }
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(ledger.getState(), 
ManagedLedgerImpl.State.LedgerOpened));
+
+        List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();
+        LedgerInfo lastLedgerInfo = ledgers.get(ledgers.size() - 1);
+        // The `lastLedgerInfo` should be newly opened, and it does not 
contain any entries.
+        // Please refer to: https://github.com/apache/pulsar/pull/22034
+        assertEquals(lastLedgerInfo.getEntries(), 0);
+        assertEquals(ledgers.size(), totalEntries / entriesPerLedger + 1);
+
+        // this will make sure that all entries should be deleted
+        Thread.sleep(TimeUnit.SECONDS.toMillis(ttlSeconds));
+
+        bkc.deleteLedger(ledgers.get(0).getLedgerId());
+        bkc.deleteLedger(ledgers.get(1).getLedgerId());
+        bkc.deleteLedger(ledgers.get(2).getLedgerId());
+
+        PersistentTopic mock = mockPersistentTopic("topicname");
+
+        PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
+        assertTrue(monitor.expireMessagesAsync(ttlSeconds).get());
+        Awaitility.await().untilAsserted(() -> {
+            Position markDeletePosition = c1.getMarkDeletedPosition();
+            // The markDeletePosition points to the last entry of the previous 
ledger in lastLedgerInfo.
+            assertEquals(markDeletePosition.getLedgerId(), 
lastLedgerInfo.getLedgerId() - 1);
+            assertEquals(markDeletePosition.getEntryId(), entriesPerLedger - 
1);
+        });
+
+        c1.close();
+        ledger.close();
+        factory.shutdown();
+
+    }
+
     public void testFindMessageWithTimestampAutoSkipNonRecoverable() throws 
Exception {
 
         final String ledgerAndCursorName = 
"testFindMessageWithTimestampAutoSkipNonRecoverable";
@@ -625,6 +684,20 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
     }
 
+    private PersistentTopic mockPersistentTopic(String topicName) throws 
Exception {
+        PersistentTopic mock = mock(PersistentTopic.class);
+        when(mock.getName()).thenReturn("topicname");
+        when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
+        BrokerService brokerService = mock(BrokerService.class);
+        doReturn(brokerService).when(mock).getBrokerService();
+        doReturn(executor).when(mock).getOrderedExecutor();
+        PulsarService pulsarService = mock(PulsarService.class);
+        doReturn(pulsarService).when(brokerService).pulsar();
+        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
+        doReturn(serviceConfiguration).when(pulsarService).getConfig();
+        return mock;
+    }
+
     @Test
     public void testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger() 
throws Throwable {
         final String ledgerAndCursorName = 
"testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger";
@@ -664,6 +737,43 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         Assert.assertNull(throwableAtomicReference.get());
     }
 
+    @Test
+    public void testCheckExpiryAsyncByLedgerClosureTimeWithAckUnclosedLedger() 
throws Throwable {
+        final String ledgerAndCursorName = 
"testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger";
+        int maxTTLSeconds = 1;
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(5);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open(ledgerAndCursorName, config);
+        ManagedCursorImpl c1 = (ManagedCursorImpl) 
ledger.openCursor(ledgerAndCursorName);
+        // set client clock to 10 days later
+        long incorrectPublishTimestamp = System.currentTimeMillis() + 
TimeUnit.DAYS.toMillis(10);
+        for (int i = 0; i < 7; i++) {
+            ledger.addEntry(createMessageWrittenToLedger("msg" + i, 
incorrectPublishTimestamp));
+        }
+        assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+        PersistentTopic mock = mockPersistentTopic("topicname");
+        PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
+        AsyncCallbacks.MarkDeleteCallback markDeleteCallback =
+                (AsyncCallbacks.MarkDeleteCallback) spy(
+                        FieldUtils.readDeclaredField(monitor, 
"markDeleteCallback", true));
+        FieldUtils.writeField(monitor, "markDeleteCallback", 
markDeleteCallback, true);
+
+        AtomicReference<Throwable> throwableAtomicReference = new 
AtomicReference<>();
+        Mockito.doAnswer(invocation -> {
+            ManagedLedgerException argument = invocation.getArgument(0, 
ManagedLedgerException.class);
+            throwableAtomicReference.set(argument);
+            return invocation.callRealMethod();
+        }).when(markDeleteCallback).markDeleteFailed(any(), any());
+
+        Position position = ledger.getLastConfirmedEntry();
+        c1.markDelete(position);
+        Thread.sleep(TimeUnit.SECONDS.toMillis(maxTTLSeconds));
+        monitor.expireMessagesAsync(maxTTLSeconds).get();
+        assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
+
+        Assert.assertNull(throwableAtomicReference.get());
+    }
+
     @Test
     void testMessageExpiryWithPosition() throws Exception {
         final String ledgerAndCursorName = 
"testPersistentMessageExpiryWithPositionNonRecoverableLedgers";
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index eefcc0a9f04..cd078fd89f9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -668,6 +668,36 @@ public class ReplicatorTest extends ReplicatorTestBase {
         assertEquals(status.getReplicationBacklog(), 0);
     }
 
+    @Test(timeOut = 30000)
+    public void testReplicatorExpireMsgAsync() throws Exception {
+
+        // This test is to verify that reset cursor fails on global topic
+        SortedSet<String> testDests = new TreeSet<>();
+
+        final TopicName dest = TopicName
+                
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/clearBacklogTopic"));
+        testDests.add(dest.toString());
+
+        @Cleanup
+        MessageProducer producer1 = new MessageProducer(url1, dest);
+
+        @Cleanup
+        MessageConsumer consumer1 = new MessageConsumer(url3, dest);
+
+        // Produce from cluster1 and consume from the rest
+        producer1.produce(2);
+        PersistentTopic topic = (PersistentTopic) 
pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
+        PersistentReplicator replicator = (PersistentReplicator) spy(
+                
topic.getReplicators().get(topic.getReplicators().keys().stream().toList().get(0)));
+        replicator.readEntriesFailed(new 
ManagedLedgerException.InvalidCursorPositionException("failed"), null);
+        replicator.clearBacklog().get();
+        Thread.sleep(100);
+        replicator.updateRates(); // for code-coverage
+        replicator.expireMessagesAsync(1).get(); // for code-coverage
+        ReplicatorStats status = replicator.getStats();
+        assertEquals(status.getReplicationBacklog(), 0);
+    }
+
 
     @Test(timeOut = 30000)
     public void testResetReplicatorSubscriptionPosition() throws Exception {

Reply via email to