This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 0e2e571c1e5 [fix][broker] Fix seeking by timestamp can be reset the 
cursor position to earliest (#23919)
0e2e571c1e5 is described below

commit 0e2e571c1e5c2a6d9979c3120e62b7554b8f94a4
Author: 道君 <[email protected]>
AuthorDate: Tue Feb 11 14:25:13 2025 +0800

    [fix][broker] Fix seeking by timestamp can be reset the cursor position to 
earliest (#23919)
    
    (cherry picked from commit 59cf36f217daf8ab7b2665dc6b377c0375c8e87a)
---
 .../persistent/PersistentMessageFinder.java        |  19 +---
 .../service/PersistentMessageFinderTest.java       |   6 +-
 .../broker/service/SubscriptionSeekTest.java       | 118 +++++++++++++++++++++
 3 files changed, 123 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
index 5a4631cf205..e780f167284 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.annotations.VisibleForTesting;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -91,6 +92,7 @@ public class PersistentMessageFinder implements 
AsyncCallbacks.FindEntryCallback
         }
     }
 
+    @VisibleForTesting
     public static Pair<Position, Position> 
getFindPositionRange(Iterable<LedgerInfo> ledgerInfos,
                                                                 Position 
lastConfirmedEntry, long targetTimestamp,
                                                                 int 
ledgerCloseTimestampMaxClockSkewMillis) {
@@ -105,15 +107,11 @@ public class PersistentMessageFinder implements 
AsyncCallbacks.FindEntryCallback
         Position start = null;
         Position end = null;
 
-        LedgerInfo secondToLastLedgerInfo = null;
-        LedgerInfo lastLedgerInfo = null;
         for (LedgerInfo info : ledgerInfos) {
             if (!info.hasTimestamp()) {
                 // unexpected case, don't set start and end
                 return Pair.of(null, null);
             }
-            secondToLastLedgerInfo = lastLedgerInfo;
-            lastLedgerInfo = info;
             long closeTimestamp = info.getTimestamp();
             // For an open ledger, closeTimestamp is 0
             if (closeTimestamp == 0) {
@@ -128,19 +126,6 @@ public class PersistentMessageFinder implements 
AsyncCallbacks.FindEntryCallback
                 break;
             }
         }
-        // If the second-to-last ledger's close timestamp is less than the 
target timestamp, then start from the
-        // first entry of the last ledger when there are confirmed entries in 
the ledger
-        if (lastLedgerInfo != null && secondToLastLedgerInfo != null
-                && secondToLastLedgerInfo.getTimestamp() > 0
-                && secondToLastLedgerInfo.getTimestamp() < targetTimestampMin) 
{
-            Position firstPositionInLedger = 
PositionFactory.create(lastLedgerInfo.getLedgerId(), 0);
-            if (lastConfirmedEntry != null
-                    && lastConfirmedEntry.compareTo(firstPositionInLedger) >= 
0) {
-                start = firstPositionInLedger;
-            } else {
-                start = lastConfirmedEntry;
-            }
-        }
         return Pair.of(start, end);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 6f2f1f3a1a2..62e27eaea41 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -636,7 +636,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         assertNotNull(range);
         assertNotNull(range.getLeft());
         assertNull(range.getRight());
-        assertEquals(range.getLeft(), PositionFactory.create(3, 0));
+        assertEquals(range.getLeft(), PositionFactory.create(2, 0));
     }
 
     @Test
@@ -654,7 +654,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         assertNotNull(range);
         assertNotNull(range.getLeft());
         assertNull(range.getRight());
-        assertEquals(range.getLeft(), PositionFactory.create(2, 9));
+        assertEquals(range.getLeft(), PositionFactory.create(2, 0));
     }
 
     @Test
@@ -689,7 +689,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         assertNotNull(range);
         assertNotNull(range.getLeft());
         assertNotNull(range.getRight());
-        assertEquals(range.getLeft(), PositionFactory.create(3, 0));
+        assertEquals(range.getLeft(), PositionFactory.create(2, 0));
         assertEquals(range.getRight(), PositionFactory.create(3, 9));
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index 582d10294a5..3a9c5c43f1c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -27,8 +27,10 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -38,6 +40,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -50,6 +58,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
@@ -61,6 +70,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.util.RelativeTimeUtil;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -72,6 +82,10 @@ public class SubscriptionSeekTest extends BrokerTestBase {
     @BeforeClass
     @Override
     protected void setup() throws Exception {
+        conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+        conf.setManagedLedgerMaxEntriesPerLedger(10);
+        conf.setDefaultRetentionSizeInMB(100);
+        conf.setDefaultRetentionTimeInMinutes(100);
         super.baseSetup();
         conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
     }
@@ -489,6 +503,110 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         assertEquals(sub.getNumberOfEntriesInBacklog(false), 10);
     }
 
+    @Test(timeOut = 30_000)
+    public void testSeekByTimestamp() throws Exception {
+        String topicName = "persistent://prop/use/ns-abc/testSeekByTimestamp";
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().createSubscription(topicName, "my-sub", 
MessageId.earliest);
+
+        @Cleanup
+        Producer<String> producer =
+                
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+        for (int i = 0; i < 25; i++) {
+            producer.send(("message-" + i));
+            Thread.sleep(10);
+        }
+
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).get().get();
+
+        Map<Long, MessageId> timestampToMessageId = new HashMap<>();
+        @Cleanup
+        Reader<String> reader = 
pulsarClient.newReader(Schema.STRING).topic(topicName).startMessageId(MessageId.earliest).create();
+        while (reader.hasMessageAvailable()) {
+           Message<String> message = reader.readNext();
+              timestampToMessageId.put(message.getPublishTime(), 
message.getMessageId());
+        }
+
+        Assert.assertEquals(timestampToMessageId.size(), 25);
+
+        PersistentSubscription subscription = topic.getSubscription("my-sub");
+        ManagedCursor cursor = subscription.getCursor();
+
+        @Cleanup
+        org.apache.pulsar.client.api.Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING)
+                
.topic(topicName).subscriptionName("my-sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+        long[] timestamps = 
timestampToMessageId.keySet().stream().mapToLong(Long::longValue).toArray();
+        ArrayUtils.shuffle(timestamps);
+        for (long timestamp : timestamps) {
+            MessageIdImpl messageId = (MessageIdImpl) 
timestampToMessageId.get(timestamp);
+            consumer.seek(timestamp);
+            Position readPosition = cursor.getReadPosition();
+            Assert.assertEquals(readPosition.getLedgerId(), 
messageId.getLedgerId());
+            Assert.assertEquals(readPosition.getEntryId(), 
messageId.getEntryId());
+        }
+    }
+
+    @Test(timeOut = 30_000)
+    public void testSeekByTimestampWithLedgerTrim() throws Exception {
+        String topicName = 
"persistent://prop/use/ns-abc/testSeekByTimestampWithLedgerTrim";
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().createSubscription(topicName, "my-sub", 
MessageId.earliest);
+
+        @Cleanup
+        Producer<String> producer =
+                
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+        for (int i = 0; i < 25; i++) {
+            producer.send(("message-" + i));
+            Thread.sleep(10);
+        }
+
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).get().get();
+        ManagedLedger ledger = topic.getManagedLedger();
+        ManagedLedgerConfig config = ledger.getConfig();
+        config.setRetentionTime(0, TimeUnit.SECONDS);
+        config.setRetentionSizeInMB(0);
+
+        Map<Long, MessageId> timestampToMessageId = new HashMap<>();
+        @Cleanup
+        Reader<String> reader = 
pulsarClient.newReader(Schema.STRING).topic(topicName).startMessageId(MessageId.earliest).create();
+        while (reader.hasMessageAvailable()) {
+            Message<String> message = reader.readNext();
+            timestampToMessageId.put(message.getPublishTime(), 
message.getMessageId());
+        }
+
+        Assert.assertEquals(timestampToMessageId.size(), 25);
+
+        PersistentSubscription subscription = topic.getSubscription("my-sub");
+        ManagedCursor cursor = subscription.getCursor();
+
+        @Cleanup
+        org.apache.pulsar.client.api.Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING)
+                
.topic(topicName).subscriptionName("my-sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+        long[] timestamps = 
timestampToMessageId.keySet().stream().mapToLong(Long::longValue).toArray();
+        ArrayUtils.shuffle(timestamps);
+        boolean enterLedgerTrimmedBranch = false;
+        for (long timestamp : timestamps) {
+            MessageIdImpl messageId = (MessageIdImpl) 
timestampToMessageId.get(timestamp);
+            consumer.seek(timestamp);
+            CompletableFuture<?> trimFuture = new CompletableFuture<>();
+            ledger.trimConsumedLedgersInBackground(trimFuture);
+            trimFuture.get();
+            Position readPosition = cursor.getReadPosition();
+            Map.Entry<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> 
firstLedger = ledger.getLedgersInfo().firstEntry();
+            Assert.assertNotNull(firstLedger);
+            if (firstLedger.getKey() > messageId.getLedgerId()) {
+                Assert.assertEquals(readPosition.getLedgerId(), 
firstLedger.getKey());
+                Assert.assertEquals(readPosition.getEntryId(), 0);
+                enterLedgerTrimmedBranch = true;
+            } else {
+                Assert.assertEquals(readPosition.getLedgerId(), 
messageId.getLedgerId());
+                Assert.assertEquals(readPosition.getEntryId(), 
messageId.getEntryId());
+            }
+        }
+        // May have a chance to cause flaky test, because the result of 
`ArrayUtils.shuffle(timestamps);` is random.
+        Assert.assertTrue(enterLedgerTrimmedBranch);
+    }
+
     @Test
     public void testSeekTimeByFunction() throws Exception {
         final String topicName = "persistent://prop/use/ns-abc/test" + 
UUID.randomUUID();

Reply via email to