This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 16ea0d88ee5 [fix] [broker][branch-3.0] Expire messages according to 
ledger close time to avoid client clock skew (#21940) (#22211)
16ea0d88ee5 is described below

commit 16ea0d88ee50dc9267c2da5a31e0e8b4f0d958d0
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Mar 6 17:53:05 2024 -0800

    [fix] [broker][branch-3.0] Expire messages according to ledger close time 
to avoid client clock skew (#21940) (#22211)
    
    Co-authored-by: feynmanlin <[email protected]>
---
 .../persistent/PersistentMessageExpiryMonitor.java | 35 +++++++++++++++++++++-
 .../service/PersistentMessageFinderTest.java       | 25 +++++++++++++++-
 2 files changed, 58 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 41bc6098e1a..d82f3d93f8f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.SortedMap;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
@@ -30,8 +31,10 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.LedgerNotExistException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.protocol.Commands;
@@ -78,7 +81,9 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback {
         if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) 
{
             log.info("[{}][{}] Starting message expiry check, ttl= {} 
seconds", topicName, subName,
                     messageTTLInSeconds);
-
+            // First filter the entire Ledger reached TTL based on the Ledger 
closing time to avoid client clock skew
+            checkExpiryByLedgerClosureTime(cursor, messageTTLInSeconds);
+            // Some part of entries in active Ledger may have reached TTL, so 
we need to continue searching.
             
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
 entry -> {
                 try {
                     long entryTimestamp = 
Commands.getEntryTimestamp(entry.getDataBuffer());
@@ -99,6 +104,34 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback {
             return false;
         }
     }
+    private void checkExpiryByLedgerClosureTime(ManagedCursor cursor, int 
messageTTLInSeconds) {
+        if (messageTTLInSeconds <= 0) {
+            return;
+        }
+        if (cursor instanceof ManagedCursorImpl managedCursor) {
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
managedCursor.getManagedLedger();
+            Position deletedPosition = managedCursor.getMarkDeletedPosition();
+            SortedMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> 
ledgerInfoSortedMap =
+                    
managedLedger.getLedgersInfo().subMap(deletedPosition.getLedgerId(), true,
+                            managedLedger.getLedgersInfo().lastKey(), true);
+            MLDataFormats.ManagedLedgerInfo.LedgerInfo info = null;
+            for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : 
ledgerInfoSortedMap.values()) {
+                if (!ledgerInfo.hasTimestamp() || 
!MessageImpl.isEntryExpired(messageTTLInSeconds,
+                        ledgerInfo.getTimestamp())) {
+                    break;
+                }
+                info = ledgerInfo;
+            }
+            if (info != null && info.getLedgerId() > -1) {
+                PositionImpl position = PositionImpl.get(info.getLedgerId(), 
info.getEntries() - 1);
+                if (((PositionImpl) 
managedLedger.getLastConfirmedEntry()).compareTo(position) < 0) {
+                    findEntryComplete(managedLedger.getLastConfirmedEntry(), 
null);
+                } else {
+                    findEntryComplete(position, null);
+                }
+            }
+        }
+    }
 
     public boolean expireMessages(Position messagePosition) {
         // If it's beyond last position of this topic, do nothing.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index e77fd07c6ef..d56968c8f8e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -80,8 +80,11 @@ import javax.ws.rs.core.MediaType;
 public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
 
     public static byte[] createMessageWrittenToLedger(String msg) {
+        return createMessageWrittenToLedger(msg, System.currentTimeMillis());
+    }
+    public static byte[] createMessageWrittenToLedger(String msg, long 
messageTimestamp) {
         MessageMetadata messageMetadata = new MessageMetadata()
-                    .setPublishTime(System.currentTimeMillis())
+                    .setPublishTime(messageTimestamp)
                     .setProducerName("createMessageWrittenToLedger")
                     .setSequenceId(1);
         ByteBuf data = 
UnpooledByteBufAllocator.DEFAULT.heapBuffer().writeBytes(msg.getBytes());
@@ -428,6 +431,26 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
 
     }
 
+    @Test
+    public void testIncorrectClientClock() throws Exception {
+        final String ledgerAndCursorName = "testIncorrectClientClock";
+        int maxTTLSeconds = 1;
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(1);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open(ledgerAndCursorName, config);
+        ManagedCursorImpl c1 = (ManagedCursorImpl) 
ledger.openCursor(ledgerAndCursorName);
+        // set client clock to 10 days later
+        long incorrectPublishTimestamp = System.currentTimeMillis() + 
TimeUnit.DAYS.toMillis(10);
+        for (int i = 0; i < 10; i++) {
+            ledger.addEntry(createMessageWrittenToLedger("msg" + i, 
incorrectPublishTimestamp));
+        }
+        assertEquals(ledger.getLedgersInfoAsList().size(), 10);
+        PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
+        Thread.sleep(TimeUnit.SECONDS.toMillis(maxTTLSeconds));
+        monitor.expireMessages(maxTTLSeconds);
+        assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
+    }
+
     @Test
     void testMessageExpiryWithPosition() throws Exception {
         final String ledgerAndCursorName = 
"testPersistentMessageExpiryWithPositionNonRecoverableLedgers";

Reply via email to