This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 29778d5bd3c [improve][broker]Find the target position at most once, 
during expiring messages for a topic, even though there are many subscriptions 
(#24622)
29778d5bd3c is described below

commit 29778d5bd3c97d065b5dbdb935cc8689e00d5c4c
Author: Apurva007 <apurvatelan...@gmail.com>
AuthorDate: Tue Aug 20 16:10:26 2024 +0800

    [improve][broker]Find the target position at most once, during expiring 
messages for a topic, even though there are many subscriptions (#24622)
    
    (cherry picked from commit 84205ebd849479edac2c6533ea9259091e2e5bed)
---
 .../bookkeeper/mledger/impl/OpFindNewest.java      |  3 +
 .../pulsar/broker/service/MessageExpirer.java      | 10 +++
 .../persistent/PersistentMessageExpiryMonitor.java | 15 ++++
 .../broker/service/persistent/PersistentTopic.java | 76 +++++++++++++++++++--
 .../impl}/PersistentMessageExpiryMonitorTest.java  | 79 ++++++++++++++++++----
 .../broker/service/PersistentTopicE2ETest.java     |  4 +-
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  7 +-
 7 files changed, 171 insertions(+), 23 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
index 8f86cb33ae8..31c1a090b50 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
@@ -250,6 +250,9 @@ class OpFindNewest implements ReadEntryCallback {
         return nextPosition;
     }
 
+    /**
+     * Find the largest entry that matches the given predicate.
+     */
     public void find() {
         if (cursor != null ? cursor.hasMoreEntries(searchPosition) : 
ledger.hasMoreEntries(searchPosition)) {
             ledger.asyncReadEntry(searchPosition, this, null);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java
index 3008717a3df..60cf5bd0523 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java
@@ -25,9 +25,19 @@ import 
org.apache.pulsar.common.classification.InterfaceStability;
 @InterfaceStability.Evolving
 public interface MessageExpirer {
 
+    /**
+     * Mark delete the largest position that is less than or equals the 
{@param position}.
+     */
     boolean expireMessages(Position position);
 
+    /**
+     * Mark delete the largest message that publish timestamp is less than the 
result of the expression
+     * "{@link System#currentTimeMillis - {@param messageTTLInSeconds})".
+     */
     boolean expireMessages(int messageTTLInSeconds);
 
+    /**
+     * Async implementation of {@link #expireMessages(int)}.
+     */
     CompletableFuture<Boolean> expireMessagesAsync(int messageTTLInSeconds);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 348284fc624..a51f02b0c0d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -34,6 +34,7 @@ import 
org.apache.bookkeeper.mledger.ManagedLedgerException.LedgerNotExistExcept
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.pulsar.broker.service.MessageExpirer;
 import org.apache.pulsar.client.impl.MessageImpl;
@@ -145,6 +146,20 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
     public boolean expireMessages(Position messagePosition) {
         // If it's beyond last position of this topic, do nothing.
         Position topicLastPosition = this.topic.getLastPosition();
+        ManagedLedger managedLedger = cursor.getManagedLedger();
+        if (managedLedger instanceof ManagedLedgerImpl ml) {
+            // Confirm the position is valid.
+            Optional<MLDataFormats.ManagedLedgerInfo.LedgerInfo> 
ledgerInfoOptional =
+                    ml.getOptionalLedgerInfo(messagePosition.getLedgerId());
+            if (ledgerInfoOptional.isPresent()) {
+                if (messagePosition.getEntryId() >= 0
+                        && ledgerInfoOptional.get().getEntries() - 1 >= 
messagePosition.getEntryId()) {
+                    findEntryComplete(messagePosition, null);
+                    return true;
+                }
+            }
+        }
+        // Fallback to the slower solution if the managed ledger is not an 
instance of ManagedLedgerImpl.
         if (topicLastPosition.compareTo(messagePosition) < 0) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Ignore expire-message scheduled task, 
given position {} is beyond "
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index b31ae804247..061cbb1b13e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -88,6 +88,7 @@ import org.apache.bookkeeper.mledger.PositionBound;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer.CursorInfo;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -2115,19 +2116,82 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     @Override
     public void checkMessageExpiry() {
         int messageTtlInSeconds = topicPolicies.getMessageTTLInSeconds().get();
-        if (messageTtlInSeconds != 0) {
+        if (messageTtlInSeconds <= 0) {
+            return;
+        }
+
+        ManagedLedger managedLedger = getManagedLedger();
+        if (managedLedger instanceof ManagedLedgerImpl ml) {
+            checkMessageExpiryWithSharedPosition(ml, messageTtlInSeconds);
+        } else {
+            // Fallback to the slower solution if managed ledger is not an 
instance of ManagedLedgerImpl: each
+            // subscription find position and handle expiring itself.
+            checkMessageExpiryWithoutSharedPosition(messageTtlInSeconds);
+        }
+    }
+
+    private void checkMessageExpiryWithoutSharedPosition(int 
messageTtlInSeconds) {
+        subscriptions.forEach((__, sub) -> {
+            if (!isCompactionSubscription(sub.getName())
+                    && (additionalSystemCursorNames.isEmpty()
+                    || !additionalSystemCursorNames.contains(sub.getName()))) {
+                sub.expireMessagesAsync(messageTtlInSeconds);
+            }
+        });
+        replicators.forEach((__, replicator)
+                -> ((PersistentReplicator) 
replicator).expireMessagesAsync(messageTtlInSeconds));
+        shadowReplicators.forEach((__, replicator)
+                -> ((PersistentReplicator) 
replicator).expireMessagesAsync(messageTtlInSeconds));
+    }
+
+    private void checkMessageExpiryWithSharedPosition(ManagedLedgerImpl ml, 
int messageTtlInSeconds) {
+        // Find the target position at one time, then expire all subscriptions 
and replicators.
+        ManagedCursor cursor = 
ml.getCursors().getCursorWithOldestPosition().getCursor();
+        PersistentMessageFinder finder = new PersistentMessageFinder(topic, 
cursor, brokerService.getPulsar()
+                
.getConfig().getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis());
+        // Find the target position.
+        long expiredMessageTimestamp = System.currentTimeMillis() - 
TimeUnit.SECONDS.toMillis(messageTtlInSeconds);
+        CompletableFuture<Position> positionToMarkDelete = new 
CompletableFuture<>();
+        finder.findMessages(expiredMessageTimestamp, new 
AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionToMarkDelete.complete(position);
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition,
+                                        Object ctx) {
+                log.error("[{}] Error finding expired position, failed reading 
position is {}", topic,
+                        failedReadPosition.orElse(null), exception);
+                // Since we have logged the error, we can skip to print error 
log at next step.
+                positionToMarkDelete.complete(null);
+            }
+        });
+        positionToMarkDelete.thenAccept(position -> {
+            if (position == null) {
+                // Nothing need to be expired.
+                return;
+            }
+            // Expire messages by position, which is more efficient.
             subscriptions.forEach((__, sub) -> {
                 if (!isCompactionSubscription(sub.getName())
                         && (additionalSystemCursorNames.isEmpty()
-                            || 
!additionalSystemCursorNames.contains(sub.getName()))) {
-                   sub.expireMessagesAsync(messageTtlInSeconds);
+                        || 
!additionalSystemCursorNames.contains(sub.getName()))) {
+                    // The variable "position" is to mark delete position.
+                    // Regarding the method "expireMessages(position)", it 
will mark delete the target position if the
+                    // position is valid, otherwise, it mark deletes the 
previous valid position.
+                    // So we give it the position to be mark deleted.
+                    sub.expireMessages(position);
                 }
             });
             replicators.forEach((__, replicator)
-                    -> ((PersistentReplicator) 
replicator).expireMessagesAsync(messageTtlInSeconds));
+                    -> ((PersistentReplicator) 
replicator).expireMessages(position));
             shadowReplicators.forEach((__, replicator)
-                    -> ((PersistentReplicator) 
replicator).expireMessagesAsync(messageTtlInSeconds));
-        }
+                    -> ((PersistentReplicator) 
replicator).expireMessages(position));
+        }).exceptionally(ex -> {
+            log.error("[{}] Failed to expire messages by position", topic, ex);
+            return null;
+        });
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitorTest.java
 
b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/PersistentMessageExpiryMonitorTest.java
similarity index 62%
rename from 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitorTest.java
rename to 
pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/PersistentMessageExpiryMonitorTest.java
index 5535561a5fa..39aec66726e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/PersistentMessageExpiryMonitorTest.java
@@ -16,28 +16,31 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.service.persistent;
+package org.apache.bookkeeper.mledger.impl;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
 import static org.testng.AssertJUnit.assertEquals;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Position;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import 
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.Schema;
 import org.awaitility.Awaitility;
-import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -59,6 +62,11 @@ public class PersistentMessageExpiryMonitorTest extends 
ProducerConsumerBase {
         super.internalCleanup();
     }
 
+    @Override
+    protected void doInitConf() throws Exception {
+        conf.setMessageExpiryCheckIntervalInMinutes(60);
+    }
+
     /***
      * Confirm the anti-concurrency mechanism 
"expirationCheckInProgressUpdater" works.
      */
@@ -76,7 +84,7 @@ public class PersistentMessageExpiryMonitorTest extends 
ProducerConsumerBase {
                 (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
         ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
         ManagedCursorImpl cursor = (ManagedCursorImpl) 
ml.getCursors().get(cursorName);
-        ManagedCursorImpl spyCursor = Mockito.spy(cursor);
+        ManagedCursorImpl spyCursor = spy(cursor);
 
         // Make the mark-deleting delay.
         CountDownLatch firstFindingCompleted = new CountDownLatch(1);
@@ -98,14 +106,6 @@ public class PersistentMessageExpiryMonitorTest extends 
ProducerConsumerBase {
             calledFindPositionCount.incrementAndGet();
             return invocationOnMock.callRealMethod();
         }).when(spyCursor).asyncFindNewestMatching(any(), any(), any(), any(), 
any(), any(), anyBoolean());
-        doAnswer(invocationOnMock -> {
-            calledFindPositionCount.incrementAndGet();
-            return invocationOnMock.callRealMethod();
-        }).when(spyCursor).asyncFindNewestMatching(any(), any(), any(), any(), 
anyBoolean());
-        doAnswer(invocationOnMock -> {
-            calledFindPositionCount.incrementAndGet();
-            return invocationOnMock.callRealMethod();
-        }).when(spyCursor).asyncFindNewestMatching(any(), any(), any(), any());
 
         // Sleep 2s to make "find(1s)" get a position.
         Thread.sleep(2000);
@@ -138,4 +138,57 @@ public class PersistentMessageExpiryMonitorTest extends 
ProducerConsumerBase {
         producer.close();
         admin.topics().delete(topicName);
     }
+
+    /***
+     * Verify finding position task only executes once for multiple 
subscriptions of a topic.
+     */
+    @Test(invocationCount = 2)
+    void testTopicExpireMessages() throws Exception {
+        // Create topic.
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(topicName);
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        final String cursorName1 = "s1";
+        final String cursorName2 = "s2";
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        admin.topics().createSubscriptionAsync(topicName, cursorName1, 
MessageId.earliest);
+        admin.topics().createSubscriptionAsync(topicName, cursorName2, 
MessageId.earliest);
+        admin.topicPolicies().setMessageTTL(topicName, 1);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(1, 
persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().get().intValue());
+        });
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+        ml.getConfig().setMaxEntriesPerLedger(2);
+        ml.getConfig().setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
+        long firstLedger = ml.currentLedger.getId();
+        System.out.println("maxEntriesPerLedger 1 : " + 
ml.getConfig().getMaxEntriesPerLedger());
+        // Trigger 3 ledgers creation.
+        for (int i = 0; i < 5; i++) {
+            producer.send("" + i);
+            Thread.sleep(100);
+        }
+        System.out.println("maxEntriesPerLedger 2 : " + 
ml.getConfig().getMaxEntriesPerLedger());
+        assertEquals(3, ml.getLedgersInfo().size());
+        // Do a injection to count the access of the first ledger.
+        AtomicInteger accessedCount = new AtomicInteger();
+        ReadHandle readHandle = ml.getLedgerHandle(firstLedger).get();
+        ReadHandle spyReadHandle = spy(readHandle);
+        doAnswer(invocationOnMock -> {
+            long startEntry = (long) invocationOnMock.getArguments()[0];
+            if (startEntry == 0) {
+                accessedCount.incrementAndGet();
+            }
+            return invocationOnMock.callRealMethod();
+        }).when(spyReadHandle).readAsync(anyLong(), anyLong());
+        ml.ledgerCache.put(firstLedger, 
CompletableFuture.completedFuture(spyReadHandle));
+        // Verify: the first ledger will be accessed only once after expiry 
for two subscriptions.
+        persistentTopic.checkMessageExpiry();
+        Thread.sleep(2000);
+        assertEquals(1, accessedCount.get());
+
+        // cleanup.
+        producer.close();
+        admin.topics().delete(topicName);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index c6fc9bd0eef..cf08dd48261 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -1119,12 +1119,12 @@ public class PersistentTopicE2ETest extends 
BrokerTestBase {
         rolloverPerIntervalStats();
         assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
 
-        Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs));
+        Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs - 1));
         runMessageExpiryCheck();
 
         assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
 
-        Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs / 2));
+        Thread.sleep(TimeUnit.SECONDS.toMillis(1 + messageTTLSecs / 2));
         runMessageExpiryCheck();
 
         assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 2d181d852a8..8bb05373aad 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -801,11 +801,14 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         p2.close();
         // Let the message expire
         for (String topic : topicList) {
+            // The TTL value can not be set to a negative value, the mininum 
value is 1.
             PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService()
                     .getTopicIfExists(topic).get().get();
-            
persistentTopic.getBrokerService().getPulsar().getConfiguration().setTtlDurationDefaultInSeconds(-1);
-            
persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().updateBrokerValue(-1);
+            
persistentTopic.getBrokerService().getPulsar().getConfiguration().setTtlDurationDefaultInSeconds(1);
+            
persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().updateBrokerValue(1);
         }
+        // Wait 2 seconds to expire message.
+        Thread.sleep(2000);
         pulsar.getBrokerService().forEachTopic(Topic::checkMessageExpiry);
         //wait for checkMessageExpiry
         PersistentSubscription sub = (PersistentSubscription)

Reply via email to