This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new da8b69fd45b Fix return the earliest position when query position by 
timestamp. #20457
da8b69fd45b is described below

commit da8b69fd45bf00cdcbb6867145d4a004ce5985e7
Author: Jiwe Guo <[email protected]>
AuthorDate: Tue Jul 11 13:34:37 2023 +0800

    Fix return the earliest position when query position by timestamp. #20457
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   | 17 +++++++++++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 14 +++++++--
 .../mledger/impl/ManagedCursorContainerTest.java   |  5 ++++
 .../persistent/PersistentMessageFinder.java        |  2 +-
 .../service/PersistentMessageFinderTest.java       | 33 ++++++++++++++++++++++
 5 files changed, 68 insertions(+), 3 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 273c7df5b8c..0604e52596a 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -550,6 +550,23 @@ public interface ManagedCursor {
     void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
             FindEntryCallback callback, Object ctx);
 
+    /**
+     * Find the newest entry that matches the given predicate.
+     *
+     * @param constraint
+     *            search only active entries or all entries
+     * @param condition
+     *            predicate that reads an entry an applies a condition
+     * @param callback
+     *            callback object returning the resultant position
+     * @param ctx
+     *            opaque context
+     * @param isFindFromLedger
+     *            find the newest entry from ledger
+     */
+    void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
+                                 FindEntryCallback callback, Object ctx, 
boolean isFindFromLedger);
+
     /**
      * reset the cursor to specified position to enable replay of messages.
      *
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 14b5d7965d6..c111fb6a6d9 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1101,7 +1101,13 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     @Override
     public void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
-            FindEntryCallback callback, Object ctx) {
+                                        FindEntryCallback callback, Object 
ctx) {
+        asyncFindNewestMatching(constraint, condition, callback, ctx, false);
+    }
+
+    @Override
+    public void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
+            FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
         OpFindNewest op;
         PositionImpl startPosition = null;
         long max = 0;
@@ -1123,7 +1129,11 @@ public class ManagedCursorImpl implements ManagedCursor {
                     Optional.empty(), ctx);
             return;
         }
-        op = new OpFindNewest(this, startPosition, condition, max, callback, 
ctx);
+        if (isFindFromLedger) {
+            op = new OpFindNewest(this.ledger, startPosition, condition, max, 
callback, ctx);
+        } else {
+            op = new OpFindNewest(this, startPosition, condition, max, 
callback, ctx);
+        }
         op.find();
     }
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 6446eca5ae0..fef43e30082 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -243,6 +243,11 @@ public class ManagedCursorContainerTest {
                 AsyncCallbacks.FindEntryCallback callback, Object ctx) {
         }
 
+        @Override
+        public void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
+                AsyncCallbacks.FindEntryCallback callback, Object ctx, boolean 
isFindFromLedger) {
+        }
+
         @Override
         public void asyncResetCursor(final Position position, boolean 
forceReset,
                 AsyncCallbacks.ResetCursorCallback callback) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
index 825bc546f43..838771e6d31 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
@@ -71,7 +71,7 @@ public class PersistentMessageFinder implements 
AsyncCallbacks.FindEntryCallback
                     entry.release();
                 }
                 return false;
-            }, this, callback);
+            }, this, callback, true);
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Ignore message position find scheduled 
task, last find is still running", topicName,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 1c1434caf21..4c1f29151e4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -243,6 +243,39 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         factory.shutdown();
     }
 
+    @Test
+    void testPersistentMessageFinderWhenLastMessageDelete() throws Exception {
+        final String ledgerAndCursorName = 
"testPersistentMessageFinderWhenLastMessageDelete";
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setRetentionSizeInMB(10);
+        config.setMaxEntriesPerLedger(10);
+        config.setRetentionTime(1, TimeUnit.HOURS);
+        ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ledger.openCursor(ledgerAndCursorName);
+
+        ledger.addEntry(createMessageWrittenToLedger("msg1"));
+        ledger.addEntry(createMessageWrittenToLedger("msg2"));
+        ledger.addEntry(createMessageWrittenToLedger("msg3"));
+        Position lastPosition = 
ledger.addEntry(createMessageWrittenToLedger("last-message"));
+
+        long endTimestamp = System.currentTimeMillis() + 1000;
+
+        Result result = new Result();
+        // delete last position message
+        cursor.delete(lastPosition);
+        CompletableFuture<Void> future = findMessage(result, cursor, 
endTimestamp);
+        future.get();
+        assertNull(result.exception);
+        assertNotEquals(result.position, null);
+        assertEquals(result.position, lastPosition);
+
+        result.reset();
+        cursor.close();
+        ledger.close();
+        factory.shutdown();
+    }
+
     @Test
     void testPersistentMessageFinderWithBrokerTimestampForMessage() throws 
Exception {
 

Reply via email to