This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2eb4eabc84f [improve][broker] Optimize subscription seek (cursor 
reset) by timestamp (#22792)
2eb4eabc84f is described below

commit 2eb4eabc84f68fef5b29d894631c7c23d06ec3af
Author: 道君 <[email protected]>
AuthorDate: Thu Jan 9 21:05:39 2025 +0800

    [improve][broker] Optimize subscription seek (cursor reset) by timestamp 
(#22792)
    
    Co-authored-by: Lari Hotari <[email protected]>
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   |  25 ++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  56 +++-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 291 +++++++++++++++++++++
 .../apache/pulsar/broker/ServiceConfiguration.java |  18 ++
 .../persistent/PersistentMessageFinder.java        |  69 ++++-
 .../service/persistent/PersistentSubscription.java |  19 +-
 .../service/PersistentMessageFinderTest.java       | 242 ++++++++++++++++-
 7 files changed, 690 insertions(+), 30 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 042e0399869..4e5e1236548 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -660,6 +660,31 @@ public interface ManagedCursor {
     void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
             FindEntryCallback callback, Object ctx, boolean isFindFromLedger);
 
+
+    /**
+     * Find the newest entry that matches the given predicate.
+     *
+     * @param constraint
+     *            search only active entries or all entries
+     * @param condition
+     *            predicate that reads an entry an applies a condition
+     * @param callback
+     *            callback object returning the resultant position
+     * @param startPosition
+     *           start position to search from.
+     * @param endPosition
+     *          end position to search to.
+     * @param ctx
+     *            opaque context
+     * @param isFindFromLedger
+     *            find the newest entry from ledger
+     */
+    default void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
+                                 Position startPosition, Position endPosition, 
FindEntryCallback callback,
+                                 Object ctx, boolean isFindFromLedger) {
+        asyncFindNewestMatching(constraint, condition, callback, ctx, 
isFindFromLedger);
+    }
+
     /**
      * reset the cursor to specified position to enable replay of messages.
      *
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 934bfba4b0d..50f5f36b2d5 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1272,27 +1272,55 @@ public class ManagedCursorImpl implements ManagedCursor 
{
     @Override
     public void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
             FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
-        OpFindNewest op;
-        Position startPosition = null;
-        long max = 0;
+        asyncFindNewestMatching(constraint, condition, null, null, callback, 
ctx,
+                isFindFromLedger);
+    }
+
+
+    @Override
+    public void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
+                                        Position start, Position end, 
FindEntryCallback callback,
+                                        Object ctx, boolean isFindFromLedger) {
+        Position startPosition;
         switch (constraint) {
-        case SearchAllAvailableEntries:
-            startPosition = getFirstPosition();
-            max = ledger.getNumberOfEntries() - 1;
-            break;
-        case SearchActiveEntries:
-            startPosition = ledger.getNextValidPosition(markDeletePosition);
-            max = getNumberOfEntriesInStorage();
-            break;
-        default:
-            callback.findEntryFailed(new ManagedLedgerException("Unknown 
position constraint"), Optional.empty(), ctx);
-            return;
+            case SearchAllAvailableEntries ->
+                    startPosition = start == null ?  getFirstPosition() : 
start;
+            case SearchActiveEntries -> {
+                if (start == null) {
+                    startPosition = 
ledger.getNextValidPosition(markDeletePosition);
+                } else {
+                    startPosition = start;
+                    startPosition = 
startPosition.compareTo(markDeletePosition) <= 0
+                            ? ledger.getNextValidPosition(startPosition) : 
startPosition;
+                }
+            }
+            default -> {
+                callback.findEntryFailed(
+                        new ManagedLedgerException("Unknown position 
constraint"), Optional.empty(), ctx);
+                return;
+            }
         }
+        // startPosition can't be null, should never go here.
         if (startPosition == null) {
             callback.findEntryFailed(new ManagedLedgerException("Couldn't find 
start position"),
                     Optional.empty(), ctx);
             return;
         }
+        // Calculate the end position
+        Position endPosition = end == null ? ledger.lastConfirmedEntry : end;
+        endPosition = endPosition.compareTo(ledger.lastConfirmedEntry) > 0 ? 
ledger.lastConfirmedEntry : endPosition;
+        // Calculate the number of entries between the startPosition and 
endPosition
+        long max = 0;
+        if (startPosition.compareTo(endPosition) <= 0) {
+            max = ledger.getNumberOfEntries(Range.closed(startPosition, 
endPosition));
+        }
+
+        if (max <= 0) {
+            callback.findEntryComplete(null, ctx);
+            return;
+        }
+
+        OpFindNewest op;
         if (isFindFromLedger) {
             op = new OpFindNewest(this.ledger, startPosition, condition, max, 
callback, ctx);
         } else {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 69b74fcf8f5..d3ea98131ad 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -4873,6 +4873,297 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext());
     }
 
+    @Test
+    public void 
testFindNewestMatching_SearchAllAvailableEntries_ByStartAndEnd() throws 
Exception {
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setMaxEntriesPerLedger(2);
+        managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+        @Cleanup
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("testFindNewestMatching_SearchAllAvailableEntries_ByStartAndEnd", 
managedLedgerConfig);
+        @Cleanup
+        ManagedCursor managedCursor = ledger.openCursor("test");
+
+        Position position = ledger.addEntry("test".getBytes(Encoding));
+        Position position1 = ledger.addEntry("test1".getBytes(Encoding));
+        Position position2 = ledger.addEntry("test2".getBytes(Encoding));
+        Position position3 = ledger.addEntry("test3".getBytes(Encoding));
+
+        Predicate<Entry> condition = entry -> {
+            try {
+                Position p = entry.getPosition();
+                return p.compareTo(position1) <= 0;
+            } finally {
+                entry.release();
+            }
+        };
+
+        // find the newest entry with start and end position
+        AtomicBoolean failed = new AtomicBoolean(false);
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<Position> positionRef = new AtomicReference<>();
+        
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
 condition, position, position2, new AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionRef.set(position);
+                latch.countDown();
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
+                failed.set(true);
+                latch.countDown();
+            }
+        }, null, true);
+
+        latch.await();
+        assertFalse(failed.get());
+        assertNotNull(positionRef.get());
+        assertEquals(positionRef.get(), position1);
+
+        // find the newest entry with start
+        AtomicBoolean failed1 = new AtomicBoolean(false);
+        CountDownLatch latch1 = new CountDownLatch(1);
+        AtomicReference<Position> positionRef1 = new AtomicReference<>();
+        
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
 condition, position, null, new AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionRef1.set(position);
+                latch1.countDown();
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
+                failed1.set(true);
+                latch1.countDown();
+            }
+        }, null, true);
+        latch1.await();
+        assertFalse(failed1.get());
+        assertNotNull(positionRef1.get());
+        assertEquals(positionRef1.get(), position1);
+
+        // find the newest entry with end
+        AtomicBoolean failed2 = new AtomicBoolean(false);
+        CountDownLatch latch2 = new CountDownLatch(1);
+        AtomicReference<Position> positionRef2 = new AtomicReference<>();
+        
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
 condition, null, position2, new AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionRef2.set(position);
+                latch2.countDown();
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
+                failed2.set(true);
+                latch2.countDown();
+            }
+        }, null, true);
+        latch2.await();
+        assertFalse(failed2.get());
+        assertNotNull(positionRef2.get());
+        assertEquals(positionRef2.get(), position1);
+
+        // find the newest entry without start and end position
+        AtomicBoolean failed3 = new AtomicBoolean(false);
+        CountDownLatch latch3 = new CountDownLatch(1);
+        AtomicReference<Position> positionRef3 = new AtomicReference<>();
+        
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
 condition, null, null, new AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionRef3.set(position);
+                latch3.countDown();
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
+                failed3.set(true);
+                latch3.countDown();
+            }
+        }, null, true);
+        latch3.await();
+        assertFalse(failed3.get());
+        assertNotNull(positionRef3.get());
+        assertEquals(positionRef3.get(), position1);
+
+        // find position3
+        AtomicBoolean failed4 = new AtomicBoolean(false);
+        CountDownLatch latch4 = new CountDownLatch(1);
+        AtomicReference<Position> positionRef4 = new AtomicReference<>();
+        
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
 entry -> {
+            try {
+                Position p = entry.getPosition();
+                return p.compareTo(position3) <= 0;
+            } finally {
+                entry.release();
+            }
+        }, position3, position3, new AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionRef4.set(position);
+                latch4.countDown();
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
+                failed4.set(true);
+                latch4.countDown();
+            }
+        }, null, true);
+        latch4.await();
+        assertFalse(failed4.get());
+        assertNotNull(positionRef4.get());
+        assertEquals(positionRef4.get(), position3);
+    }
+
+
+    @Test
+    public void testFindNewestMatching_SearchActiveEntries_ByStartAndEnd() 
throws Exception {
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setMaxEntriesPerLedger(2);
+        managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+        @Cleanup
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("testFindNewestMatching_SearchActiveEntries_ByStartAndEnd", 
managedLedgerConfig);
+        @Cleanup
+        ManagedCursorImpl managedCursor = (ManagedCursorImpl) 
ledger.openCursor("test");
+
+        Position position = ledger.addEntry("test".getBytes(Encoding));
+        Position position1 = ledger.addEntry("test1".getBytes(Encoding));
+        Position position2 = ledger.addEntry("test2".getBytes(Encoding));
+        Position position3 = ledger.addEntry("test3".getBytes(Encoding));
+        Position position4 = ledger.addEntry("test4".getBytes(Encoding));
+        managedCursor.markDelete(position1);
+        assertEquals(managedCursor.getNumberOfEntries(), 3);
+
+        Predicate<Entry> condition = entry -> {
+            try {
+                Position p = entry.getPosition();
+                return p.compareTo(position3) <= 0;
+            } finally {
+                entry.release();
+            }
+        };
+
+        // find the newest entry with start and end position
+        AtomicBoolean failed = new AtomicBoolean(false);
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<Position> positionRef = new AtomicReference<>();
+        
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
 condition, position2, position4, new AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionRef.set(position);
+                latch.countDown();
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
+                failed.set(true);
+                latch.countDown();
+            }
+        }, null, true);
+        latch.await();
+        assertFalse(failed.get());
+        assertNotNull(positionRef.get());
+        assertEquals(positionRef.get(), position3);
+
+        // find the newest entry with start
+        AtomicBoolean failed1 = new AtomicBoolean(false);
+        CountDownLatch latch1 = new CountDownLatch(1);
+        AtomicReference<Position> positionRef1 = new AtomicReference<>();
+        
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
 condition, position2, null, new AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionRef1.set(position);
+                latch1.countDown();
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
+                failed1.set(true);
+                latch1.countDown();
+            }
+        }, null, true);
+
+        latch1.await();
+        assertFalse(failed1.get());
+        assertNotNull(positionRef1.get());
+        assertEquals(positionRef1.get(), position3);
+
+        // find the newest entry with end
+        AtomicBoolean failed2 = new AtomicBoolean(false);
+        CountDownLatch latch2 = new CountDownLatch(1);
+        AtomicReference<Position> positionRef2 = new AtomicReference<>();
+        
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
 condition, null, position4, new AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionRef2.set(position);
+                latch2.countDown();
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
+                failed2.set(true);
+                latch2.countDown();
+            }
+        }, null, true);
+
+        latch2.await();
+        assertFalse(failed2.get());
+        assertNotNull(positionRef2.get());
+        assertEquals(positionRef2.get(), position3);
+
+        // find the newest entry without start and end position
+        AtomicBoolean failed3 = new AtomicBoolean(false);
+        CountDownLatch latch3 = new CountDownLatch(1);
+        AtomicReference<Position> positionRef3 = new AtomicReference<>();
+        
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
 condition, null, null, new AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionRef3.set(position);
+                latch3.countDown();
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
+                failed3.set(true);
+                latch3.countDown();
+            }
+        }, null, true);
+        latch3.await();
+        assertFalse(failed3.get());
+        assertNotNull(positionRef3.get());
+        assertEquals(positionRef3.get(), position3);
+
+        // find position4
+        AtomicBoolean failed4 = new AtomicBoolean(false);
+        CountDownLatch latch4 = new CountDownLatch(1);
+        AtomicReference<Position> positionRef4 = new AtomicReference<>();
+        
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
 entry -> {
+            try {
+                Position p = entry.getPosition();
+                return p.compareTo(position4) <= 0;
+            } finally {
+                entry.release();
+            }
+        }, position4, position4, new AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionRef4.set(position);
+                latch4.countDown();
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
+                failed4.set(true);
+                latch4.countDown();
+            }
+        }, null, true);
+        latch4.await();
+        assertFalse(failed4.get());
+        assertNotNull(positionRef4.get());
+        assertEquals(positionRef4.get(), position4);
+    }
+
     @Test
     void testForceCursorRecovery() throws Exception {
         TestPulsarMockBookKeeper bk = new TestPulsarMockBookKeeper(executor);
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 0b6f0e9418c..d27661d0ee6 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2238,6 +2238,24 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         doc = "Max time before triggering a rollover on a cursor ledger"
     )
     private int managedLedgerCursorRolloverTimeInSeconds = 14400;
+
+    @FieldContext(
+            category = CATEGORY_STORAGE_ML,
+            dynamic = true,
+            doc = "When resetting a subscription by timestamp, the broker will 
use the"
+                    + " ledger closing timestamp metadata to determine the 
range of ledgers"
+                    + " to search for the message where the subscription 
position is reset to. "
+                    + " Since by default, the search condition is based on the 
message publish time provided by the "
+                    + " client at the publish time, there will be some clock 
skew between the ledger closing timestamp "
+                    + " metadata and the publish time."
+                    + " This configuration is used to set the max clock skew 
between the ledger closing"
+                    + " timestamp and the message publish time for finding the 
range of ledgers to open for searching."
+                    + " The default value is 60000 milliseconds (60 seconds). 
When set to -1, the broker will not"
+                    + " use the ledger closing timestamp metadata to determine 
the range of ledgers to search for the"
+                    + " message."
+    )
+    private int managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis 
= 60000;
+
     @FieldContext(
         category = CATEGORY_STORAGE_ML,
         doc = "Max number of `acknowledgment holes` that are going to be 
persistently stored.\n\n"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
index 08273155e4c..5a4631cf205 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
@@ -25,6 +25,9 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.Codec;
@@ -37,6 +40,7 @@ import org.slf4j.LoggerFactory;
 public class PersistentMessageFinder implements 
AsyncCallbacks.FindEntryCallback {
     private final ManagedCursor cursor;
     private final String subName;
+    private final int ledgerCloseTimestampMaxClockSkewMillis;
     private final String topicName;
     private long timestamp = 0;
 
@@ -48,19 +52,23 @@ public class PersistentMessageFinder implements 
AsyncCallbacks.FindEntryCallback
             AtomicIntegerFieldUpdater
                     .newUpdater(PersistentMessageFinder.class, 
"messageFindInProgress");
 
-    public PersistentMessageFinder(String topicName, ManagedCursor cursor) {
+    public PersistentMessageFinder(String topicName, ManagedCursor cursor, int 
ledgerCloseTimestampMaxClockSkewMillis) {
         this.topicName = topicName;
         this.cursor = cursor;
         this.subName = Codec.decode(cursor.getName());
+        this.ledgerCloseTimestampMaxClockSkewMillis = 
ledgerCloseTimestampMaxClockSkewMillis;
     }
 
     public void findMessages(final long timestamp, 
AsyncCallbacks.FindEntryCallback callback) {
-        this.timestamp = timestamp;
         if (messageFindInProgressUpdater.compareAndSet(this, FALSE, TRUE)) {
+            this.timestamp = timestamp;
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Starting message position find at timestamp 
{}", subName, timestamp);
             }
-
+            Pair<Position, Position> range =
+                    
getFindPositionRange(cursor.getManagedLedger().getLedgersInfo().values(),
+                            cursor.getManagedLedger().getLastConfirmedEntry(), 
timestamp,
+                            ledgerCloseTimestampMaxClockSkewMillis);
             
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
 entry -> {
                 try {
                     long entryTimestamp = 
Commands.getEntryTimestamp(entry.getDataBuffer());
@@ -71,7 +79,7 @@ public class PersistentMessageFinder implements 
AsyncCallbacks.FindEntryCallback
                     entry.release();
                 }
                 return false;
-            }, this, callback, true);
+            }, range.getLeft(), range.getRight(), this, callback, true);
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Ignore message position find scheduled 
task, last find is still running", topicName,
@@ -83,6 +91,59 @@ public class PersistentMessageFinder implements 
AsyncCallbacks.FindEntryCallback
         }
     }
 
+    public static Pair<Position, Position> 
getFindPositionRange(Iterable<LedgerInfo> ledgerInfos,
+                                                                Position 
lastConfirmedEntry, long targetTimestamp,
+                                                                int 
ledgerCloseTimestampMaxClockSkewMillis) {
+        if (ledgerCloseTimestampMaxClockSkewMillis < 0) {
+            // this feature is disabled when the value is negative
+            return Pair.of(null, null);
+        }
+
+        long targetTimestampMin = targetTimestamp - 
ledgerCloseTimestampMaxClockSkewMillis;
+        long targetTimestampMax = targetTimestamp + 
ledgerCloseTimestampMaxClockSkewMillis;
+
+        Position start = null;
+        Position end = null;
+
+        LedgerInfo secondToLastLedgerInfo = null;
+        LedgerInfo lastLedgerInfo = null;
+        for (LedgerInfo info : ledgerInfos) {
+            if (!info.hasTimestamp()) {
+                // unexpected case, don't set start and end
+                return Pair.of(null, null);
+            }
+            secondToLastLedgerInfo = lastLedgerInfo;
+            lastLedgerInfo = info;
+            long closeTimestamp = info.getTimestamp();
+            // For an open ledger, closeTimestamp is 0
+            if (closeTimestamp == 0) {
+                end = null;
+                break;
+            }
+            if (closeTimestamp <= targetTimestampMin) {
+                start = PositionFactory.create(info.getLedgerId(), 0);
+            } else if (closeTimestamp > targetTimestampMax) {
+                // If the close timestamp is greater than the timestamp
+                end = PositionFactory.create(info.getLedgerId(), 
info.getEntries() - 1);
+                break;
+            }
+        }
+        // If the second-to-last ledger's close timestamp is less than the 
target timestamp, then start from the
+        // first entry of the last ledger when there are confirmed entries in 
the ledger
+        if (lastLedgerInfo != null && secondToLastLedgerInfo != null
+                && secondToLastLedgerInfo.getTimestamp() > 0
+                && secondToLastLedgerInfo.getTimestamp() < targetTimestampMin) 
{
+            Position firstPositionInLedger = 
PositionFactory.create(lastLedgerInfo.getLedgerId(), 0);
+            if (lastConfirmedEntry != null
+                    && lastConfirmedEntry.compareTo(firstPositionInLedger) >= 
0) {
+                start = firstPositionInLedger;
+            } else {
+                start = lastConfirmedEntry;
+            }
+        }
+        return Pair.of(start, end);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PersistentMessageFinder.class);
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index b5a1a9db5de..a96a7e75506 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -134,6 +134,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
     private volatile CompletableFuture<Void> fenceFuture;
     private volatile CompletableFuture<Void> inProgressResetCursorFuture;
     private volatile Boolean replicatedControlled;
+    private final ServiceConfiguration config;
 
     static Map<String, Long> getBaseCursorProperties(Boolean isReplicated) {
         return isReplicated != null && isReplicated ? 
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES :
@@ -156,6 +157,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
     public PersistentSubscription(PersistentTopic topic, String 
subscriptionName, ManagedCursor cursor,
                                   Boolean replicated, Map<String, String> 
subscriptionProperties) {
         this.topic = topic;
+        this.config = topic.getBrokerService().getPulsar().getConfig();
         this.cursor = cursor;
         this.topicName = topic.getName();
         this.subName = subscriptionName;
@@ -166,7 +168,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
         }
         this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
                 ? Collections.emptyMap() : 
Collections.unmodifiableMap(subscriptionProperties);
-        if 
(topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
+        if (config.isTransactionCoordinatorEnabled()
                 && !isEventSystemTopic(TopicName.get(topicName))
                 && !ExtensibleLoadManagerImpl.isInternalTopic(topicName)) {
             this.pendingAckHandle = new PendingAckHandleImpl(this);
@@ -203,7 +205,6 @@ public class PersistentSubscription extends 
AbstractSubscription {
 
     public boolean setReplicated(boolean replicated) {
         replicatedControlled = replicated;
-        ServiceConfiguration config = 
topic.getBrokerService().getPulsar().getConfig();
 
         if (!replicated || !config.isEnableReplicatedSubscriptions()) {
             this.replicatedSubscriptionSnapshotCache = null;
@@ -261,7 +262,6 @@ public class PersistentSubscription extends 
AbstractSubscription {
                         case Shared:
                             if (dispatcher == null || dispatcher.getType() != 
SubType.Shared) {
                                 previousDispatcher = dispatcher;
-                                ServiceConfiguration config = 
topic.getBrokerService().getPulsar().getConfig();
                                 if 
(config.isSubscriptionSharedUseClassicPersistentImplementation()) {
                                     dispatcher = new 
PersistentDispatcherMultipleConsumersClassic(topic, cursor, this);
                                 } else {
@@ -290,7 +290,6 @@ public class PersistentSubscription extends 
AbstractSubscription {
                                     || !((StickyKeyDispatcher) dispatcher)
                                     .hasSameKeySharedPolicy(ksm)) {
                                 previousDispatcher = dispatcher;
-                                ServiceConfiguration config = 
topic.getBrokerService().getPulsar().getConfig();
                                 if 
(config.isSubscriptionKeySharedUseClassicPersistentImplementation()) {
                                     dispatcher =
                                             new 
PersistentStickyKeyDispatcherMultipleConsumersClassic(topic, cursor,
@@ -426,7 +425,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
                 log.debug("[{}][{}] Individual acks on {}", topicName, 
subName, positions);
             }
             cursor.asyncDelete(positions, deleteCallback, 
previousMarkDeletePosition);
-            if 
(topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled())
 {
+            if (config.isTransactionCoordinatorEnabled()) {
                 positions.forEach(position -> {
                     if ((cursor.isMessageDeleted(position))) {
                         pendingAckHandle.clearIndividualPosition(position);
@@ -602,10 +601,9 @@ public class PersistentSubscription extends 
AbstractSubscription {
         final EntryFilterSupport entryFilterSupport = dispatcher != null
                 ? (EntryFilterSupport) dispatcher : new 
EntryFilterSupport(this);
         // we put some hard limits on the scan, in order to prevent denial of 
services
-        ServiceConfiguration configuration = 
topic.getBrokerService().getPulsar().getConfiguration();
-        long maxEntries = configuration.getSubscriptionBacklogScanMaxEntries();
-        long timeOutMs = configuration.getSubscriptionBacklogScanMaxTimeMs();
-        int batchSize = configuration.getDispatcherMaxReadBatchSize();
+        long maxEntries = config.getSubscriptionBacklogScanMaxEntries();
+        long timeOutMs = config.getSubscriptionBacklogScanMaxTimeMs();
+        int batchSize = config.getDispatcherMaxReadBatchSize();
         AtomicReference<Position> firstPosition = new AtomicReference<>();
         AtomicReference<Position> lastPosition = new AtomicReference<>();
         final Predicate<Entry> condition = entry -> {
@@ -780,7 +778,8 @@ public class PersistentSubscription extends 
AbstractSubscription {
     @Override
     public CompletableFuture<Void> resetCursor(long timestamp) {
         CompletableFuture<Void> future = new CompletableFuture<>();
-        PersistentMessageFinder persistentMessageFinder = new 
PersistentMessageFinder(topicName, cursor);
+        PersistentMessageFinder persistentMessageFinder = new 
PersistentMessageFinder(topicName, cursor,
+                
config.getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis());
 
         if (log.isDebugEnabled()) {
             log.debug("[{}][{}] Resetting subscription to timestamp {}", 
topicName, subName, timestamp);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 176a799292a..6f2f1f3a1a2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -59,6 +59,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -138,7 +139,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
     }
 
     CompletableFuture<Void> findMessage(final Result result, final 
ManagedCursor c1, final long timestamp) {
-        PersistentMessageFinder messageFinder = new 
PersistentMessageFinder("topicname", c1);
+        PersistentMessageFinder messageFinder = new 
PersistentMessageFinder("topicname", c1, 0);
 
         final CompletableFuture<Void> future = new CompletableFuture<>();
         messageFinder.findMessages(timestamp, new 
AsyncCallbacks.FindEntryCallback() {
@@ -217,7 +218,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         assertNotEquals(result.position, null);
         assertEquals(result.position, lastPosition);
 
-        PersistentMessageFinder messageFinder = new 
PersistentMessageFinder("topicname", c1);
+        PersistentMessageFinder messageFinder = new 
PersistentMessageFinder("topicname", c1, 0);
         final AtomicBoolean ex = new AtomicBoolean(false);
         messageFinder.findEntryFailed(new ManagedLedgerException("failed"), 
Optional.empty(),
                 new AsyncCallbacks.FindEntryCallback() {
@@ -589,4 +590,241 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         resetCursorData.setExcluded(true);
         System.out.println(Entity.entity(resetCursorData, 
MediaType.APPLICATION_JSON));
     }
+
+    @Test
+    public void testGetFindPositionRange_EmptyLedgerInfos() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        Position lastConfirmedEntry = null;
+        long targetTimestamp = 2000;
+        Pair<Position, Position> range =
+                PersistentMessageFinder.getFindPositionRange(ledgerInfos, 
lastConfirmedEntry, targetTimestamp, 0);
+
+        assertNotNull(range);
+        assertNull(range.getLeft());
+        assertNull(range.getRight());
+    }
+
+    @Test
+    public void testGetFindPositionRange_AllTimestampsLessThanTarget() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(1500).build());
+        Position lastConfirmedEntry = PositionFactory.create(2, 9);
+
+        long targetTimestamp = 2000;
+        Pair<Position, Position> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 0);
+
+        assertNotNull(range);
+        assertNotNull(range.getLeft());
+        assertNull(range.getRight());
+        assertEquals(range.getLeft(), PositionFactory.create(2, 0));
+    }
+
+    @Test
+    public void testGetFindPositionRange_LastTimestampIsZero() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(1500).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(0).build());
+        Position lastConfirmedEntry = PositionFactory.create(3, 5);
+
+        long targetTimestamp = 2000;
+        Pair<Position, Position> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 0);
+
+        assertNotNull(range);
+        assertNotNull(range.getLeft());
+        assertNull(range.getRight());
+        assertEquals(range.getLeft(), PositionFactory.create(3, 0));
+    }
+
+    @Test
+    public void testGetFindPositionRange_LastTimestampIsZeroWithNoEntries() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(1500).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(0).build());
+        Position lastConfirmedEntry = PositionFactory.create(2, 9);
+
+        long targetTimestamp = 2000;
+        Pair<Position, Position> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 0);
+
+        assertNotNull(range);
+        assertNotNull(range.getLeft());
+        assertNull(range.getRight());
+        assertEquals(range.getLeft(), PositionFactory.create(2, 9));
+    }
+
+    @Test
+    public void testGetFindPositionRange_AllTimestampsGreaterThanTarget() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(3000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(4000).build());
+        Position lastConfirmedEntry = PositionFactory.create(2, 9);
+
+        long targetTimestamp = 2000;
+        Pair<Position, Position> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 0);
+
+        assertNotNull(range);
+        assertNull(range.getLeft());
+        assertNotNull(range.getRight());
+        assertEquals(range.getRight(), PositionFactory.create(1, 9));
+    }
+
+    @Test
+    public void testGetFindPositionRange_MixedTimestamps() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(3000).build());
+        Position lastConfirmedEntry = PositionFactory.create(3, 9);
+
+        long targetTimestamp = 2500;
+        Pair<Position, Position> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 0);
+
+        assertNotNull(range);
+        assertNotNull(range.getLeft());
+        assertNotNull(range.getRight());
+        assertEquals(range.getLeft(), PositionFactory.create(3, 0));
+        assertEquals(range.getRight(), PositionFactory.create(3, 9));
+    }
+
+    @Test
+    public void testGetFindPositionRange_TimestampAtBoundary() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(3000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build());
+        Position lastConfirmedEntry = PositionFactory.create(4, 9);
+
+        long targetTimestamp = 3000;
+        Pair<Position, Position> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 0);
+
+        assertNotNull(range);
+        assertNotNull(range.getLeft());
+        assertNotNull(range.getRight());
+        assertEquals(range.getLeft(), PositionFactory.create(3, 0));
+        // there might be entries in the next ledger with the same timestamp 
as the target timestamp, even though
+        // the close timestamp of ledger 3 is equals to the target timestamp
+        assertEquals(range.getRight(), PositionFactory.create(4, 9));
+    }
+
+    @Test
+    public void testGetFindPositionRange_ClockSkew() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(2010).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(5).setTimestamp(0).build());
+        Position lastConfirmedEntry = PositionFactory.create(5, 5);
+
+        long targetTimestamp = 2009;
+        Pair<Position, Position> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 10);
+
+        assertNotNull(range);
+        assertNotNull(range.getLeft());
+        assertNotNull(range.getRight());
+        assertEquals(range.getLeft(), PositionFactory.create(1, 0));
+        assertEquals(range.getRight(), PositionFactory.create(4, 9));
+    }
+
+    @Test
+    public void testGetFindPositionRange_ClockSkewCase2() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(3000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(5).setTimestamp(0).build());
+        Position lastConfirmedEntry = PositionFactory.create(5, 5);
+
+        long targetTimestamp = 2995;
+        Pair<Position, Position> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 10);
+
+        assertNotNull(range);
+        assertNotNull(range.getLeft());
+        assertNotNull(range.getRight());
+        assertEquals(range.getLeft(), PositionFactory.create(2, 0));
+        assertEquals(range.getRight(), PositionFactory.create(4, 9));
+    }
+
+    @Test
+    public void testGetFindPositionRange_ClockSkewCase3() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(3000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(5).setTimestamp(0).build());
+        Position lastConfirmedEntry = PositionFactory.create(5, 5);
+
+        long targetTimestamp = 3005;
+        Pair<Position, Position> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 10);
+
+        assertNotNull(range);
+        assertNotNull(range.getLeft());
+        assertNotNull(range.getRight());
+        assertEquals(range.getLeft(), PositionFactory.create(2, 0));
+        assertEquals(range.getRight(), PositionFactory.create(4, 9));
+    }
+
+    @Test
+    public void 
testGetFindPositionRange_FeatureDisabledWithNegativeClockSkew() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(2010).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(5).setTimestamp(0).build());
+        Position lastConfirmedEntry = PositionFactory.create(5, 5);
+
+        long targetTimestamp = 2009;
+        Pair<Position, Position> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, -1);
+
+        assertNotNull(range);
+        assertNull(range.getLeft());
+        assertNull(range.getRight());
+    }
+
+    @Test
+    public void testGetFindPositionRange_SingleLedger() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setTimestamp(0).build());
+        Position lastConfirmedEntry = PositionFactory.create(1, 5);
+
+        long targetTimestamp = 2500;
+        Pair<Position, Position> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 0);
+
+        assertNotNull(range);
+        assertNull(range.getLeft());
+        assertNull(range.getRight());
+    }
+
+    @Test
+    public void testGetFindPositionRange_SingleClosedLedger() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+        Position lastConfirmedEntry = PositionFactory.create(1, 9);
+
+        long targetTimestamp = 2500;
+        Pair<Position, Position> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 0);
+
+        assertNotNull(range);
+        assertNotNull(range.getLeft());
+        assertNull(range.getRight());
+        assertEquals(range.getLeft(), PositionFactory.create(1, 0));
+    }
 }

Reply via email to