This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0e603405c2a [fix][broker] Fix return the earliest position when query
position by timestamp. (#20457)
0e603405c2a is described below
commit 0e603405c2a6faaa50a47881916a2c1268de9830
Author: hanmz <[email protected]>
AuthorDate: Thu Jun 29 19:47:48 2023 +0800
[fix][broker] Fix return the earliest position when query position by
timestamp. (#20457)
---
.../apache/bookkeeper/mledger/ManagedCursor.java | 17 +++++++++++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 12 +++++++-
.../mledger/impl/ManagedCursorContainerTest.java | 5 ++++
.../persistent/PersistentMessageFinder.java | 2 +-
.../pulsar/broker/delayed/MockManagedCursor.java | 6 ++++
.../service/PersistentMessageFinderTest.java | 33 ++++++++++++++++++++++
6 files changed, 73 insertions(+), 2 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index edbfa0b4320..d1ffdf6d2d7 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -637,6 +637,23 @@ public interface ManagedCursor {
void asyncFindNewestMatching(FindPositionConstraint constraint,
Predicate<Entry> condition,
FindEntryCallback callback, Object ctx);
+ /**
+ * Find the newest entry that matches the given predicate.
+ *
+ * @param constraint
+ * search only active entries or all entries
+ * @param condition
+ * predicate that reads an entry an applies a condition
+ * @param callback
+ * callback object returning the resultant position
+ * @param ctx
+ * opaque context
+ * @param isFindFromLedger
+ * find the newest entry from ledger
+ */
+ void asyncFindNewestMatching(FindPositionConstraint constraint,
Predicate<Entry> condition,
+ FindEntryCallback callback, Object ctx, boolean isFindFromLedger);
+
/**
* reset the cursor to specified position to enable replay of messages.
*
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index f30f9553e15..8ce3a322c09 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1182,6 +1182,12 @@ public class ManagedCursorImpl implements ManagedCursor {
@Override
public void asyncFindNewestMatching(FindPositionConstraint constraint,
Predicate<Entry> condition,
FindEntryCallback callback, Object ctx) {
+ asyncFindNewestMatching(constraint, condition, callback, ctx, false);
+ }
+
+ @Override
+ public void asyncFindNewestMatching(FindPositionConstraint constraint,
Predicate<Entry> condition,
+ FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
OpFindNewest op;
PositionImpl startPosition = null;
long max = 0;
@@ -1203,7 +1209,11 @@ public class ManagedCursorImpl implements ManagedCursor {
Optional.empty(), ctx);
return;
}
- op = new OpFindNewest(this, startPosition, condition, max, callback,
ctx);
+ if (isFindFromLedger) {
+ op = new OpFindNewest(this.ledger, startPosition, condition, max,
callback, ctx);
+ } else {
+ op = new OpFindNewest(this, startPosition, condition, max,
callback, ctx);
+ }
op.find();
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 2c01b778caf..04d99d3bdf4 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -258,6 +258,11 @@ public class ManagedCursorContainerTest {
AsyncCallbacks.FindEntryCallback callback, Object ctx) {
}
+ @Override
+ public void asyncFindNewestMatching(FindPositionConstraint constraint,
Predicate<Entry> condition,
+ AsyncCallbacks.FindEntryCallback callback, Object ctx, boolean
isFindFromLedger) {
+ }
+
@Override
public void asyncResetCursor(final Position position, boolean
forceReset,
AsyncCallbacks.ResetCursorCallback callback) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
index d2e6f6f5ff8..08273155e4c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
@@ -71,7 +71,7 @@ public class PersistentMessageFinder implements
AsyncCallbacks.FindEntryCallback
entry.release();
}
return false;
- }, this, callback);
+ }, this, callback, true);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Ignore message position find scheduled
task, last find is still running", topicName,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java
index 499262c1e60..477290fc683 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -276,6 +277,11 @@ public class MockManagedCursor implements ManagedCursor {
}
+ @Override
+ public void asyncFindNewestMatching(FindPositionConstraint constraint,
Predicate<Entry> condition,
+ AsyncCallbacks.FindEntryCallback callback, Object ctx, boolean
isFindFromLedger) {
+ }
+
@Override
public void resetCursor(Position position) throws InterruptedException,
ManagedLedgerException {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 48798f0020f..e77fd07c6ef 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -243,6 +243,39 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
factory.shutdown();
}
+ @Test
+ void testPersistentMessageFinderWhenLastMessageDelete() throws Exception {
+ final String ledgerAndCursorName =
"testPersistentMessageFinderWhenLastMessageDelete";
+
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setRetentionSizeInMB(10);
+ config.setMaxEntriesPerLedger(10);
+ config.setRetentionTime(1, TimeUnit.HOURS);
+ ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
ledger.openCursor(ledgerAndCursorName);
+
+ ledger.addEntry(createMessageWrittenToLedger("msg1"));
+ ledger.addEntry(createMessageWrittenToLedger("msg2"));
+ ledger.addEntry(createMessageWrittenToLedger("msg3"));
+ Position lastPosition =
ledger.addEntry(createMessageWrittenToLedger("last-message"));
+
+ long endTimestamp = System.currentTimeMillis() + 1000;
+
+ Result result = new Result();
+ // delete last position message
+ cursor.delete(lastPosition);
+ CompletableFuture<Void> future = findMessage(result, cursor,
endTimestamp);
+ future.get();
+ assertNull(result.exception);
+ assertNotEquals(result.position, null);
+ assertEquals(result.position, lastPosition);
+
+ result.reset();
+ cursor.close();
+ ledger.close();
+ factory.shutdown();
+ }
+
@Test
void testPersistentMessageFinderWithBrokerTimestampForMessage() throws
Exception {