This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 0618877a62f [fix] [broker] Expire messages according to ledger close 
time to avoid client clock skew (#21940)
0618877a62f is described below

commit 0618877a62f26f320671136a0abd5c6b32e05477
Author: feynmanlin <[email protected]>
AuthorDate: Thu Feb 22 12:09:24 2024 +0800

    [fix] [broker] Expire messages according to ledger close time to avoid 
client clock skew (#21940)
---
 .../persistent/PersistentMessageExpiryMonitor.java | 36 +++++++++++++++++++++-
 .../service/PersistentMessageFinderTest.java       | 28 ++++++++++++++++-
 2 files changed, 62 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 5d3596d0d05..ac391c10503 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.SortedMap;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 import javax.annotation.Nullable;
@@ -31,8 +32,10 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.LedgerNotExistException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.pulsar.broker.service.MessageExpirer;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
@@ -82,7 +85,9 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
         if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) 
{
             log.info("[{}][{}] Starting message expiry check, ttl= {} 
seconds", topicName, subName,
                     messageTTLInSeconds);
-
+            // First filter the entire Ledger reached TTL based on the Ledger 
closing time to avoid client clock skew
+            checkExpiryByLedgerClosureTime(cursor, messageTTLInSeconds);
+            // Some part of entries in active Ledger may have reached TTL, so 
we need to continue searching.
             
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
 entry -> {
                 try {
                     long entryTimestamp = 
Commands.getEntryTimestamp(entry.getDataBuffer());
@@ -104,6 +109,35 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
         }
     }
 
+    private void checkExpiryByLedgerClosureTime(ManagedCursor cursor, int 
messageTTLInSeconds) {
+        if (messageTTLInSeconds <= 0) {
+            return;
+        }
+        if (cursor instanceof ManagedCursorImpl managedCursor) {
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
managedCursor.getManagedLedger();
+            Position deletedPosition = managedCursor.getMarkDeletedPosition();
+            SortedMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> 
ledgerInfoSortedMap =
+                    
managedLedger.getLedgersInfo().subMap(deletedPosition.getLedgerId(), true,
+                            managedLedger.getLedgersInfo().lastKey(), true);
+            MLDataFormats.ManagedLedgerInfo.LedgerInfo info = null;
+            for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : 
ledgerInfoSortedMap.values()) {
+                if (!ledgerInfo.hasTimestamp() || 
!MessageImpl.isEntryExpired(messageTTLInSeconds,
+                        ledgerInfo.getTimestamp())) {
+                    break;
+                }
+                info = ledgerInfo;
+            }
+            if (info != null && info.getLedgerId() > -1) {
+                PositionImpl position = PositionImpl.get(info.getLedgerId(), 
info.getEntries() - 1);
+                if (((PositionImpl) 
managedLedger.getLastConfirmedEntry()).compareTo(position) < 0) {
+                    findEntryComplete(managedLedger.getLastConfirmedEntry(), 
null);
+                } else {
+                    findEntryComplete(position, null);
+                }
+            }
+        }
+    }
+
     @Override
     public boolean expireMessages(Position messagePosition) {
         // If it's beyond last position of this topic, do nothing.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index f0e2e6eafcd..ace552a55a7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -81,8 +81,11 @@ import javax.ws.rs.core.MediaType;
 public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
 
     public static byte[] createMessageWrittenToLedger(String msg) {
+        return createMessageWrittenToLedger(msg, System.currentTimeMillis());
+    }
+    public static byte[] createMessageWrittenToLedger(String msg, long 
messageTimestamp) {
         MessageMetadata messageMetadata = new MessageMetadata()
-                    .setPublishTime(System.currentTimeMillis())
+                    .setPublishTime(messageTimestamp)
                     .setProducerName("createMessageWrittenToLedger")
                     .setSequenceId(1);
         ByteBuf data = 
UnpooledByteBufAllocator.DEFAULT.heapBuffer().writeBytes(msg.getBytes());
@@ -437,6 +440,29 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
 
     }
 
+    @Test
+    public void testIncorrectClientClock() throws Exception {
+        final String ledgerAndCursorName = "testIncorrectClientClock";
+        int maxTTLSeconds = 1;
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(1);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open(ledgerAndCursorName, config);
+        ManagedCursorImpl c1 = (ManagedCursorImpl) 
ledger.openCursor(ledgerAndCursorName);
+        // set client clock to 10 days later
+        long incorrectPublishTimestamp = System.currentTimeMillis() + 
TimeUnit.DAYS.toMillis(10);
+        for (int i = 0; i < 10; i++) {
+            ledger.addEntry(createMessageWrittenToLedger("msg" + i, 
incorrectPublishTimestamp));
+        }
+        assertEquals(ledger.getLedgersInfoAsList().size(), 10);
+        PersistentTopic mock = mock(PersistentTopic.class);
+        when(mock.getName()).thenReturn("topicname");
+        when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
+        PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
+        Thread.sleep(TimeUnit.SECONDS.toMillis(maxTTLSeconds));
+        monitor.expireMessages(maxTTLSeconds);
+        assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
+    }
+
     @Test
     void testMessageExpiryWithPosition() throws Exception {
         final String ledgerAndCursorName = 
"testPersistentMessageExpiryWithPositionNonRecoverableLedgers";

Reply via email to