This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e603405c2a [fix][broker] Fix return the earliest position when query 
position by timestamp. (#20457)
0e603405c2a is described below

commit 0e603405c2a6faaa50a47881916a2c1268de9830
Author: hanmz <[email protected]>
AuthorDate: Thu Jun 29 19:47:48 2023 +0800

    [fix][broker] Fix return the earliest position when query position by 
timestamp. (#20457)
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   | 17 +++++++++++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 12 +++++++-
 .../mledger/impl/ManagedCursorContainerTest.java   |  5 ++++
 .../persistent/PersistentMessageFinder.java        |  2 +-
 .../pulsar/broker/delayed/MockManagedCursor.java   |  6 ++++
 .../service/PersistentMessageFinderTest.java       | 33 ++++++++++++++++++++++
 6 files changed, 73 insertions(+), 2 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index edbfa0b4320..d1ffdf6d2d7 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -637,6 +637,23 @@ public interface ManagedCursor {
     void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
             FindEntryCallback callback, Object ctx);
 
+    /**
+     * Find the newest entry that matches the given predicate.
+     *
+     * @param constraint
+     *            search only active entries or all entries
+     * @param condition
+     *            predicate that reads an entry an applies a condition
+     * @param callback
+     *            callback object returning the resultant position
+     * @param ctx
+     *            opaque context
+     * @param isFindFromLedger
+     *            find the newest entry from ledger
+     */
+    void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
+            FindEntryCallback callback, Object ctx, boolean isFindFromLedger);
+
     /**
      * reset the cursor to specified position to enable replay of messages.
      *
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index f30f9553e15..8ce3a322c09 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1182,6 +1182,12 @@ public class ManagedCursorImpl implements ManagedCursor {
     @Override
     public void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
             FindEntryCallback callback, Object ctx) {
+        asyncFindNewestMatching(constraint, condition, callback, ctx, false);
+    }
+
+    @Override
+    public void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
+            FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
         OpFindNewest op;
         PositionImpl startPosition = null;
         long max = 0;
@@ -1203,7 +1209,11 @@ public class ManagedCursorImpl implements ManagedCursor {
                     Optional.empty(), ctx);
             return;
         }
-        op = new OpFindNewest(this, startPosition, condition, max, callback, 
ctx);
+        if (isFindFromLedger) {
+            op = new OpFindNewest(this.ledger, startPosition, condition, max, 
callback, ctx);
+        } else {
+            op = new OpFindNewest(this, startPosition, condition, max, 
callback, ctx);
+        }
         op.find();
     }
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 2c01b778caf..04d99d3bdf4 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -258,6 +258,11 @@ public class ManagedCursorContainerTest {
                 AsyncCallbacks.FindEntryCallback callback, Object ctx) {
         }
 
+        @Override
+        public void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
+                AsyncCallbacks.FindEntryCallback callback, Object ctx, boolean 
isFindFromLedger) {
+        }
+
         @Override
         public void asyncResetCursor(final Position position, boolean 
forceReset,
                 AsyncCallbacks.ResetCursorCallback callback) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
index d2e6f6f5ff8..08273155e4c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
@@ -71,7 +71,7 @@ public class PersistentMessageFinder implements 
AsyncCallbacks.FindEntryCallback
                     entry.release();
                 }
                 return false;
-            }, this, callback);
+            }, this, callback, true);
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Ignore message position find scheduled 
task, last find is still running", topicName,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java
index 499262c1e60..477290fc683 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -276,6 +277,11 @@ public class MockManagedCursor implements ManagedCursor {
 
     }
 
+    @Override
+    public void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
+            AsyncCallbacks.FindEntryCallback callback, Object ctx, boolean 
isFindFromLedger) {
+    }
+
     @Override
     public void resetCursor(Position position) throws InterruptedException, 
ManagedLedgerException {
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 48798f0020f..e77fd07c6ef 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -243,6 +243,39 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         factory.shutdown();
     }
 
+    @Test
+    void testPersistentMessageFinderWhenLastMessageDelete() throws Exception {
+        final String ledgerAndCursorName = 
"testPersistentMessageFinderWhenLastMessageDelete";
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setRetentionSizeInMB(10);
+        config.setMaxEntriesPerLedger(10);
+        config.setRetentionTime(1, TimeUnit.HOURS);
+        ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ledger.openCursor(ledgerAndCursorName);
+
+        ledger.addEntry(createMessageWrittenToLedger("msg1"));
+        ledger.addEntry(createMessageWrittenToLedger("msg2"));
+        ledger.addEntry(createMessageWrittenToLedger("msg3"));
+        Position lastPosition = 
ledger.addEntry(createMessageWrittenToLedger("last-message"));
+        
+        long endTimestamp = System.currentTimeMillis() + 1000;
+
+        Result result = new Result();
+        // delete last position message
+        cursor.delete(lastPosition);
+        CompletableFuture<Void> future = findMessage(result, cursor, 
endTimestamp);
+        future.get();
+        assertNull(result.exception);
+        assertNotEquals(result.position, null);
+        assertEquals(result.position, lastPosition);
+
+        result.reset();
+        cursor.close();
+        ledger.close();
+        factory.shutdown();
+    }
+
     @Test
     void testPersistentMessageFinderWithBrokerTimestampForMessage() throws 
Exception {
 

Reply via email to