This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 948d25128a1 [improve][broker] Optimize subscription seek (cursor
reset) by timestamp (#22792)
948d25128a1 is described below
commit 948d25128a192c606484969bd772998b1ba7fc30
Author: 道君 <[email protected]>
AuthorDate: Thu Jan 9 21:05:39 2025 +0800
[improve][broker] Optimize subscription seek (cursor reset) by timestamp
(#22792)
Co-authored-by: Lari Hotari <[email protected]>
(cherry picked from commit 2eb4eabc84f68fef5b29d894631c7c23d06ec3af)
---
.../apache/bookkeeper/mledger/ManagedCursor.java | 25 ++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 56 +++-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 291 +++++++++++++++++++++
.../apache/pulsar/broker/ServiceConfiguration.java | 18 ++
.../persistent/PersistentMessageFinder.java | 69 ++++-
.../service/persistent/PersistentSubscription.java | 19 +-
.../service/PersistentMessageFinderTest.java | 242 ++++++++++++++++-
7 files changed, 690 insertions(+), 30 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 042e0399869..4e5e1236548 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -660,6 +660,31 @@ public interface ManagedCursor {
void asyncFindNewestMatching(FindPositionConstraint constraint,
Predicate<Entry> condition,
FindEntryCallback callback, Object ctx, boolean isFindFromLedger);
+
+ /**
+ * Find the newest entry that matches the given predicate.
+ *
+ * @param constraint
+ * search only active entries or all entries
+ * @param condition
+ * predicate that reads an entry an applies a condition
+ * @param callback
+ * callback object returning the resultant position
+ * @param startPosition
+ * start position to search from.
+ * @param endPosition
+ * end position to search to.
+ * @param ctx
+ * opaque context
+ * @param isFindFromLedger
+ * find the newest entry from ledger
+ */
+ default void asyncFindNewestMatching(FindPositionConstraint constraint,
Predicate<Entry> condition,
+ Position startPosition, Position endPosition,
FindEntryCallback callback,
+ Object ctx, boolean isFindFromLedger) {
+ asyncFindNewestMatching(constraint, condition, callback, ctx,
isFindFromLedger);
+ }
+
/**
* reset the cursor to specified position to enable replay of messages.
*
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 0cd9fc0d54c..cc7f009f3f6 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1272,27 +1272,55 @@ public class ManagedCursorImpl implements ManagedCursor
{
@Override
public void asyncFindNewestMatching(FindPositionConstraint constraint,
Predicate<Entry> condition,
FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
- OpFindNewest op;
- Position startPosition = null;
- long max = 0;
+ asyncFindNewestMatching(constraint, condition, null, null, callback,
ctx,
+ isFindFromLedger);
+ }
+
+
+ @Override
+ public void asyncFindNewestMatching(FindPositionConstraint constraint,
Predicate<Entry> condition,
+ Position start, Position end,
FindEntryCallback callback,
+ Object ctx, boolean isFindFromLedger) {
+ Position startPosition;
switch (constraint) {
- case SearchAllAvailableEntries:
- startPosition = getFirstPosition();
- max = ledger.getNumberOfEntries() - 1;
- break;
- case SearchActiveEntries:
- startPosition = ledger.getNextValidPosition(markDeletePosition);
- max = getNumberOfEntriesInStorage();
- break;
- default:
- callback.findEntryFailed(new ManagedLedgerException("Unknown
position constraint"), Optional.empty(), ctx);
- return;
+ case SearchAllAvailableEntries ->
+ startPosition = start == null ? getFirstPosition() :
start;
+ case SearchActiveEntries -> {
+ if (start == null) {
+ startPosition =
ledger.getNextValidPosition(markDeletePosition);
+ } else {
+ startPosition = start;
+ startPosition =
startPosition.compareTo(markDeletePosition) <= 0
+ ? ledger.getNextValidPosition(startPosition) :
startPosition;
+ }
+ }
+ default -> {
+ callback.findEntryFailed(
+ new ManagedLedgerException("Unknown position
constraint"), Optional.empty(), ctx);
+ return;
+ }
}
+ // startPosition can't be null, should never go here.
if (startPosition == null) {
callback.findEntryFailed(new ManagedLedgerException("Couldn't find
start position"),
Optional.empty(), ctx);
return;
}
+ // Calculate the end position
+ Position endPosition = end == null ? ledger.lastConfirmedEntry : end;
+ endPosition = endPosition.compareTo(ledger.lastConfirmedEntry) > 0 ?
ledger.lastConfirmedEntry : endPosition;
+ // Calculate the number of entries between the startPosition and
endPosition
+ long max = 0;
+ if (startPosition.compareTo(endPosition) <= 0) {
+ max = ledger.getNumberOfEntries(Range.closed(startPosition,
endPosition));
+ }
+
+ if (max <= 0) {
+ callback.findEntryComplete(null, ctx);
+ return;
+ }
+
+ OpFindNewest op;
if (isFindFromLedger) {
op = new OpFindNewest(this.ledger, startPosition, condition, max,
callback, ctx);
} else {
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 69b74fcf8f5..d3ea98131ad 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -4873,6 +4873,297 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext());
}
+ @Test
+ public void
testFindNewestMatching_SearchAllAvailableEntries_ByStartAndEnd() throws
Exception {
+ ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ managedLedgerConfig.setMaxEntriesPerLedger(2);
+ managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+ @Cleanup
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("testFindNewestMatching_SearchAllAvailableEntries_ByStartAndEnd",
managedLedgerConfig);
+ @Cleanup
+ ManagedCursor managedCursor = ledger.openCursor("test");
+
+ Position position = ledger.addEntry("test".getBytes(Encoding));
+ Position position1 = ledger.addEntry("test1".getBytes(Encoding));
+ Position position2 = ledger.addEntry("test2".getBytes(Encoding));
+ Position position3 = ledger.addEntry("test3".getBytes(Encoding));
+
+ Predicate<Entry> condition = entry -> {
+ try {
+ Position p = entry.getPosition();
+ return p.compareTo(position1) <= 0;
+ } finally {
+ entry.release();
+ }
+ };
+
+ // find the newest entry with start and end position
+ AtomicBoolean failed = new AtomicBoolean(false);
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<Position> positionRef = new AtomicReference<>();
+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
condition, position, position2, new AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionRef.set(position);
+ latch.countDown();
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
+ failed.set(true);
+ latch.countDown();
+ }
+ }, null, true);
+
+ latch.await();
+ assertFalse(failed.get());
+ assertNotNull(positionRef.get());
+ assertEquals(positionRef.get(), position1);
+
+ // find the newest entry with start
+ AtomicBoolean failed1 = new AtomicBoolean(false);
+ CountDownLatch latch1 = new CountDownLatch(1);
+ AtomicReference<Position> positionRef1 = new AtomicReference<>();
+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
condition, position, null, new AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionRef1.set(position);
+ latch1.countDown();
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
+ failed1.set(true);
+ latch1.countDown();
+ }
+ }, null, true);
+ latch1.await();
+ assertFalse(failed1.get());
+ assertNotNull(positionRef1.get());
+ assertEquals(positionRef1.get(), position1);
+
+ // find the newest entry with end
+ AtomicBoolean failed2 = new AtomicBoolean(false);
+ CountDownLatch latch2 = new CountDownLatch(1);
+ AtomicReference<Position> positionRef2 = new AtomicReference<>();
+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
condition, null, position2, new AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionRef2.set(position);
+ latch2.countDown();
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
+ failed2.set(true);
+ latch2.countDown();
+ }
+ }, null, true);
+ latch2.await();
+ assertFalse(failed2.get());
+ assertNotNull(positionRef2.get());
+ assertEquals(positionRef2.get(), position1);
+
+ // find the newest entry without start and end position
+ AtomicBoolean failed3 = new AtomicBoolean(false);
+ CountDownLatch latch3 = new CountDownLatch(1);
+ AtomicReference<Position> positionRef3 = new AtomicReference<>();
+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
condition, null, null, new AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionRef3.set(position);
+ latch3.countDown();
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
+ failed3.set(true);
+ latch3.countDown();
+ }
+ }, null, true);
+ latch3.await();
+ assertFalse(failed3.get());
+ assertNotNull(positionRef3.get());
+ assertEquals(positionRef3.get(), position1);
+
+ // find position3
+ AtomicBoolean failed4 = new AtomicBoolean(false);
+ CountDownLatch latch4 = new CountDownLatch(1);
+ AtomicReference<Position> positionRef4 = new AtomicReference<>();
+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
entry -> {
+ try {
+ Position p = entry.getPosition();
+ return p.compareTo(position3) <= 0;
+ } finally {
+ entry.release();
+ }
+ }, position3, position3, new AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionRef4.set(position);
+ latch4.countDown();
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
+ failed4.set(true);
+ latch4.countDown();
+ }
+ }, null, true);
+ latch4.await();
+ assertFalse(failed4.get());
+ assertNotNull(positionRef4.get());
+ assertEquals(positionRef4.get(), position3);
+ }
+
+
+ @Test
+ public void testFindNewestMatching_SearchActiveEntries_ByStartAndEnd()
throws Exception {
+ ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ managedLedgerConfig.setMaxEntriesPerLedger(2);
+ managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+ @Cleanup
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("testFindNewestMatching_SearchActiveEntries_ByStartAndEnd",
managedLedgerConfig);
+ @Cleanup
+ ManagedCursorImpl managedCursor = (ManagedCursorImpl)
ledger.openCursor("test");
+
+ Position position = ledger.addEntry("test".getBytes(Encoding));
+ Position position1 = ledger.addEntry("test1".getBytes(Encoding));
+ Position position2 = ledger.addEntry("test2".getBytes(Encoding));
+ Position position3 = ledger.addEntry("test3".getBytes(Encoding));
+ Position position4 = ledger.addEntry("test4".getBytes(Encoding));
+ managedCursor.markDelete(position1);
+ assertEquals(managedCursor.getNumberOfEntries(), 3);
+
+ Predicate<Entry> condition = entry -> {
+ try {
+ Position p = entry.getPosition();
+ return p.compareTo(position3) <= 0;
+ } finally {
+ entry.release();
+ }
+ };
+
+ // find the newest entry with start and end position
+ AtomicBoolean failed = new AtomicBoolean(false);
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<Position> positionRef = new AtomicReference<>();
+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
condition, position2, position4, new AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionRef.set(position);
+ latch.countDown();
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
+ failed.set(true);
+ latch.countDown();
+ }
+ }, null, true);
+ latch.await();
+ assertFalse(failed.get());
+ assertNotNull(positionRef.get());
+ assertEquals(positionRef.get(), position3);
+
+ // find the newest entry with start
+ AtomicBoolean failed1 = new AtomicBoolean(false);
+ CountDownLatch latch1 = new CountDownLatch(1);
+ AtomicReference<Position> positionRef1 = new AtomicReference<>();
+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
condition, position2, null, new AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionRef1.set(position);
+ latch1.countDown();
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
+ failed1.set(true);
+ latch1.countDown();
+ }
+ }, null, true);
+
+ latch1.await();
+ assertFalse(failed1.get());
+ assertNotNull(positionRef1.get());
+ assertEquals(positionRef1.get(), position3);
+
+ // find the newest entry with end
+ AtomicBoolean failed2 = new AtomicBoolean(false);
+ CountDownLatch latch2 = new CountDownLatch(1);
+ AtomicReference<Position> positionRef2 = new AtomicReference<>();
+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
condition, null, position4, new AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionRef2.set(position);
+ latch2.countDown();
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
+ failed2.set(true);
+ latch2.countDown();
+ }
+ }, null, true);
+
+ latch2.await();
+ assertFalse(failed2.get());
+ assertNotNull(positionRef2.get());
+ assertEquals(positionRef2.get(), position3);
+
+ // find the newest entry without start and end position
+ AtomicBoolean failed3 = new AtomicBoolean(false);
+ CountDownLatch latch3 = new CountDownLatch(1);
+ AtomicReference<Position> positionRef3 = new AtomicReference<>();
+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
condition, null, null, new AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionRef3.set(position);
+ latch3.countDown();
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
+ failed3.set(true);
+ latch3.countDown();
+ }
+ }, null, true);
+ latch3.await();
+ assertFalse(failed3.get());
+ assertNotNull(positionRef3.get());
+ assertEquals(positionRef3.get(), position3);
+
+ // find position4
+ AtomicBoolean failed4 = new AtomicBoolean(false);
+ CountDownLatch latch4 = new CountDownLatch(1);
+ AtomicReference<Position> positionRef4 = new AtomicReference<>();
+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
entry -> {
+ try {
+ Position p = entry.getPosition();
+ return p.compareTo(position4) <= 0;
+ } finally {
+ entry.release();
+ }
+ }, position4, position4, new AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionRef4.set(position);
+ latch4.countDown();
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
+ failed4.set(true);
+ latch4.countDown();
+ }
+ }, null, true);
+ latch4.await();
+ assertFalse(failed4.get());
+ assertNotNull(positionRef4.get());
+ assertEquals(positionRef4.get(), position4);
+ }
+
@Test
void testForceCursorRecovery() throws Exception {
TestPulsarMockBookKeeper bk = new TestPulsarMockBookKeeper(executor);
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 0f7ae00713d..951192d94a9 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2233,6 +2233,24 @@ public class ServiceConfiguration implements
PulsarConfiguration {
doc = "Max time before triggering a rollover on a cursor ledger"
)
private int managedLedgerCursorRolloverTimeInSeconds = 14400;
+
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ dynamic = true,
+ doc = "When resetting a subscription by timestamp, the broker will
use the"
+ + " ledger closing timestamp metadata to determine the
range of ledgers"
+ + " to search for the message where the subscription
position is reset to. "
+ + " Since by default, the search condition is based on the
message publish time provided by the "
+ + " client at the publish time, there will be some clock
skew between the ledger closing timestamp "
+ + " metadata and the publish time."
+ + " This configuration is used to set the max clock skew
between the ledger closing"
+ + " timestamp and the message publish time for finding the
range of ledgers to open for searching."
+ + " The default value is 60000 milliseconds (60 seconds).
When set to -1, the broker will not"
+ + " use the ledger closing timestamp metadata to determine
the range of ledgers to search for the"
+ + " message."
+ )
+ private int managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis
= 60000;
+
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Max number of `acknowledgment holes` that are going to be
persistently stored.\n\n"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
index 08273155e4c..5a4631cf205 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
@@ -25,6 +25,9 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Codec;
@@ -37,6 +40,7 @@ import org.slf4j.LoggerFactory;
public class PersistentMessageFinder implements
AsyncCallbacks.FindEntryCallback {
private final ManagedCursor cursor;
private final String subName;
+ private final int ledgerCloseTimestampMaxClockSkewMillis;
private final String topicName;
private long timestamp = 0;
@@ -48,19 +52,23 @@ public class PersistentMessageFinder implements
AsyncCallbacks.FindEntryCallback
AtomicIntegerFieldUpdater
.newUpdater(PersistentMessageFinder.class,
"messageFindInProgress");
- public PersistentMessageFinder(String topicName, ManagedCursor cursor) {
+ public PersistentMessageFinder(String topicName, ManagedCursor cursor, int
ledgerCloseTimestampMaxClockSkewMillis) {
this.topicName = topicName;
this.cursor = cursor;
this.subName = Codec.decode(cursor.getName());
+ this.ledgerCloseTimestampMaxClockSkewMillis =
ledgerCloseTimestampMaxClockSkewMillis;
}
public void findMessages(final long timestamp,
AsyncCallbacks.FindEntryCallback callback) {
- this.timestamp = timestamp;
if (messageFindInProgressUpdater.compareAndSet(this, FALSE, TRUE)) {
+ this.timestamp = timestamp;
if (log.isDebugEnabled()) {
log.debug("[{}] Starting message position find at timestamp
{}", subName, timestamp);
}
-
+ Pair<Position, Position> range =
+
getFindPositionRange(cursor.getManagedLedger().getLedgersInfo().values(),
+ cursor.getManagedLedger().getLastConfirmedEntry(),
timestamp,
+ ledgerCloseTimestampMaxClockSkewMillis);
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
entry -> {
try {
long entryTimestamp =
Commands.getEntryTimestamp(entry.getDataBuffer());
@@ -71,7 +79,7 @@ public class PersistentMessageFinder implements
AsyncCallbacks.FindEntryCallback
entry.release();
}
return false;
- }, this, callback, true);
+ }, range.getLeft(), range.getRight(), this, callback, true);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Ignore message position find scheduled
task, last find is still running", topicName,
@@ -83,6 +91,59 @@ public class PersistentMessageFinder implements
AsyncCallbacks.FindEntryCallback
}
}
+ public static Pair<Position, Position>
getFindPositionRange(Iterable<LedgerInfo> ledgerInfos,
+ Position
lastConfirmedEntry, long targetTimestamp,
+ int
ledgerCloseTimestampMaxClockSkewMillis) {
+ if (ledgerCloseTimestampMaxClockSkewMillis < 0) {
+ // this feature is disabled when the value is negative
+ return Pair.of(null, null);
+ }
+
+ long targetTimestampMin = targetTimestamp -
ledgerCloseTimestampMaxClockSkewMillis;
+ long targetTimestampMax = targetTimestamp +
ledgerCloseTimestampMaxClockSkewMillis;
+
+ Position start = null;
+ Position end = null;
+
+ LedgerInfo secondToLastLedgerInfo = null;
+ LedgerInfo lastLedgerInfo = null;
+ for (LedgerInfo info : ledgerInfos) {
+ if (!info.hasTimestamp()) {
+ // unexpected case, don't set start and end
+ return Pair.of(null, null);
+ }
+ secondToLastLedgerInfo = lastLedgerInfo;
+ lastLedgerInfo = info;
+ long closeTimestamp = info.getTimestamp();
+ // For an open ledger, closeTimestamp is 0
+ if (closeTimestamp == 0) {
+ end = null;
+ break;
+ }
+ if (closeTimestamp <= targetTimestampMin) {
+ start = PositionFactory.create(info.getLedgerId(), 0);
+ } else if (closeTimestamp > targetTimestampMax) {
+ // If the close timestamp is greater than the timestamp
+ end = PositionFactory.create(info.getLedgerId(),
info.getEntries() - 1);
+ break;
+ }
+ }
+ // If the second-to-last ledger's close timestamp is less than the
target timestamp, then start from the
+ // first entry of the last ledger when there are confirmed entries in
the ledger
+ if (lastLedgerInfo != null && secondToLastLedgerInfo != null
+ && secondToLastLedgerInfo.getTimestamp() > 0
+ && secondToLastLedgerInfo.getTimestamp() < targetTimestampMin)
{
+ Position firstPositionInLedger =
PositionFactory.create(lastLedgerInfo.getLedgerId(), 0);
+ if (lastConfirmedEntry != null
+ && lastConfirmedEntry.compareTo(firstPositionInLedger) >=
0) {
+ start = firstPositionInLedger;
+ } else {
+ start = lastConfirmedEntry;
+ }
+ }
+ return Pair.of(start, end);
+ }
+
private static final Logger log =
LoggerFactory.getLogger(PersistentMessageFinder.class);
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 8cebbd52695..609bbd0d734 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -134,6 +134,7 @@ public class PersistentSubscription extends
AbstractSubscription {
private volatile CompletableFuture<Void> fenceFuture;
private volatile CompletableFuture<Void> inProgressResetCursorFuture;
private volatile Boolean replicatedControlled;
+ private final ServiceConfiguration config;
static Map<String, Long> getBaseCursorProperties(Boolean isReplicated) {
return isReplicated != null && isReplicated ?
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES :
@@ -152,6 +153,7 @@ public class PersistentSubscription extends
AbstractSubscription {
public PersistentSubscription(PersistentTopic topic, String
subscriptionName, ManagedCursor cursor,
Boolean replicated, Map<String, String>
subscriptionProperties) {
this.topic = topic;
+ this.config = topic.getBrokerService().getPulsar().getConfig();
this.cursor = cursor;
this.topicName = topic.getName();
this.subName = subscriptionName;
@@ -162,7 +164,7 @@ public class PersistentSubscription extends
AbstractSubscription {
}
this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
? Collections.emptyMap() :
Collections.unmodifiableMap(subscriptionProperties);
- if
(topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
+ if (config.isTransactionCoordinatorEnabled()
&& !isEventSystemTopic(TopicName.get(topicName))
&& !ExtensibleLoadManagerImpl.isInternalTopic(topicName)) {
this.pendingAckHandle = new PendingAckHandleImpl(this);
@@ -199,7 +201,6 @@ public class PersistentSubscription extends
AbstractSubscription {
public boolean setReplicated(boolean replicated) {
replicatedControlled = replicated;
- ServiceConfiguration config =
topic.getBrokerService().getPulsar().getConfig();
if (!replicated || !config.isEnableReplicatedSubscriptions()) {
this.replicatedSubscriptionSnapshotCache = null;
@@ -257,7 +258,6 @@ public class PersistentSubscription extends
AbstractSubscription {
case Shared:
if (dispatcher == null || dispatcher.getType() !=
SubType.Shared) {
previousDispatcher = dispatcher;
- ServiceConfiguration config =
topic.getBrokerService().getPulsar().getConfig();
if
(config.isSubscriptionSharedUseClassicPersistentImplementation()) {
dispatcher = new
PersistentDispatcherMultipleConsumersClassic(topic, cursor, this);
} else {
@@ -286,7 +286,6 @@ public class PersistentSubscription extends
AbstractSubscription {
|| !((StickyKeyDispatcher) dispatcher)
.hasSameKeySharedPolicy(ksm)) {
previousDispatcher = dispatcher;
- ServiceConfiguration config =
topic.getBrokerService().getPulsar().getConfig();
if
(config.isSubscriptionKeySharedUseClassicPersistentImplementation()) {
dispatcher =
new
PersistentStickyKeyDispatcherMultipleConsumersClassic(topic, cursor,
@@ -422,7 +421,7 @@ public class PersistentSubscription extends
AbstractSubscription {
log.debug("[{}][{}] Individual acks on {}", topicName,
subName, positions);
}
cursor.asyncDelete(positions, deleteCallback,
previousMarkDeletePosition);
- if
(topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled())
{
+ if (config.isTransactionCoordinatorEnabled()) {
positions.forEach(position -> {
if ((cursor.isMessageDeleted(position))) {
pendingAckHandle.clearIndividualPosition(position);
@@ -598,10 +597,9 @@ public class PersistentSubscription extends
AbstractSubscription {
final EntryFilterSupport entryFilterSupport = dispatcher != null
? (EntryFilterSupport) dispatcher : new
EntryFilterSupport(this);
// we put some hard limits on the scan, in order to prevent denial of
services
- ServiceConfiguration configuration =
topic.getBrokerService().getPulsar().getConfiguration();
- long maxEntries = configuration.getSubscriptionBacklogScanMaxEntries();
- long timeOutMs = configuration.getSubscriptionBacklogScanMaxTimeMs();
- int batchSize = configuration.getDispatcherMaxReadBatchSize();
+ long maxEntries = config.getSubscriptionBacklogScanMaxEntries();
+ long timeOutMs = config.getSubscriptionBacklogScanMaxTimeMs();
+ int batchSize = config.getDispatcherMaxReadBatchSize();
AtomicReference<Position> firstPosition = new AtomicReference<>();
AtomicReference<Position> lastPosition = new AtomicReference<>();
final Predicate<Entry> condition = entry -> {
@@ -776,7 +774,8 @@ public class PersistentSubscription extends
AbstractSubscription {
@Override
public CompletableFuture<Void> resetCursor(long timestamp) {
CompletableFuture<Void> future = new CompletableFuture<>();
- PersistentMessageFinder persistentMessageFinder = new
PersistentMessageFinder(topicName, cursor);
+ PersistentMessageFinder persistentMessageFinder = new
PersistentMessageFinder(topicName, cursor,
+
config.getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis());
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Resetting subscription to timestamp {}",
topicName, subName, timestamp);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 176a799292a..6f2f1f3a1a2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -59,6 +59,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.tuple.Pair;
import
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -138,7 +139,7 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
}
CompletableFuture<Void> findMessage(final Result result, final
ManagedCursor c1, final long timestamp) {
- PersistentMessageFinder messageFinder = new
PersistentMessageFinder("topicname", c1);
+ PersistentMessageFinder messageFinder = new
PersistentMessageFinder("topicname", c1, 0);
final CompletableFuture<Void> future = new CompletableFuture<>();
messageFinder.findMessages(timestamp, new
AsyncCallbacks.FindEntryCallback() {
@@ -217,7 +218,7 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
assertNotEquals(result.position, null);
assertEquals(result.position, lastPosition);
- PersistentMessageFinder messageFinder = new
PersistentMessageFinder("topicname", c1);
+ PersistentMessageFinder messageFinder = new
PersistentMessageFinder("topicname", c1, 0);
final AtomicBoolean ex = new AtomicBoolean(false);
messageFinder.findEntryFailed(new ManagedLedgerException("failed"),
Optional.empty(),
new AsyncCallbacks.FindEntryCallback() {
@@ -589,4 +590,241 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
resetCursorData.setExcluded(true);
System.out.println(Entity.entity(resetCursorData,
MediaType.APPLICATION_JSON));
}
+
+ @Test
+ public void testGetFindPositionRange_EmptyLedgerInfos() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+ Position lastConfirmedEntry = null;
+ long targetTimestamp = 2000;
+ Pair<Position, Position> range =
+ PersistentMessageFinder.getFindPositionRange(ledgerInfos,
lastConfirmedEntry, targetTimestamp, 0);
+
+ assertNotNull(range);
+ assertNull(range.getLeft());
+ assertNull(range.getRight());
+ }
+
+ @Test
+ public void testGetFindPositionRange_AllTimestampsLessThanTarget() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(1500).build());
+ Position lastConfirmedEntry = PositionFactory.create(2, 9);
+
+ long targetTimestamp = 2000;
+ Pair<Position, Position> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 0);
+
+ assertNotNull(range);
+ assertNotNull(range.getLeft());
+ assertNull(range.getRight());
+ assertEquals(range.getLeft(), PositionFactory.create(2, 0));
+ }
+
+ @Test
+ public void testGetFindPositionRange_LastTimestampIsZero() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(1500).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(0).build());
+ Position lastConfirmedEntry = PositionFactory.create(3, 5);
+
+ long targetTimestamp = 2000;
+ Pair<Position, Position> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 0);
+
+ assertNotNull(range);
+ assertNotNull(range.getLeft());
+ assertNull(range.getRight());
+ assertEquals(range.getLeft(), PositionFactory.create(3, 0));
+ }
+
+ @Test
+ public void testGetFindPositionRange_LastTimestampIsZeroWithNoEntries() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(1500).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(0).build());
+ Position lastConfirmedEntry = PositionFactory.create(2, 9);
+
+ long targetTimestamp = 2000;
+ Pair<Position, Position> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 0);
+
+ assertNotNull(range);
+ assertNotNull(range.getLeft());
+ assertNull(range.getRight());
+ assertEquals(range.getLeft(), PositionFactory.create(2, 9));
+ }
+
+ @Test
+ public void testGetFindPositionRange_AllTimestampsGreaterThanTarget() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(3000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(4000).build());
+ Position lastConfirmedEntry = PositionFactory.create(2, 9);
+
+ long targetTimestamp = 2000;
+ Pair<Position, Position> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 0);
+
+ assertNotNull(range);
+ assertNull(range.getLeft());
+ assertNotNull(range.getRight());
+ assertEquals(range.getRight(), PositionFactory.create(1, 9));
+ }
+
+ @Test
+ public void testGetFindPositionRange_MixedTimestamps() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(3000).build());
+ Position lastConfirmedEntry = PositionFactory.create(3, 9);
+
+ long targetTimestamp = 2500;
+ Pair<Position, Position> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 0);
+
+ assertNotNull(range);
+ assertNotNull(range.getLeft());
+ assertNotNull(range.getRight());
+ assertEquals(range.getLeft(), PositionFactory.create(3, 0));
+ assertEquals(range.getRight(), PositionFactory.create(3, 9));
+ }
+
+ @Test
+ public void testGetFindPositionRange_TimestampAtBoundary() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(3000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build());
+ Position lastConfirmedEntry = PositionFactory.create(4, 9);
+
+ long targetTimestamp = 3000;
+ Pair<Position, Position> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 0);
+
+ assertNotNull(range);
+ assertNotNull(range.getLeft());
+ assertNotNull(range.getRight());
+ assertEquals(range.getLeft(), PositionFactory.create(3, 0));
+ // there might be entries in the next ledger with the same timestamp
as the target timestamp, even though
+ // the close timestamp of ledger 3 is equals to the target timestamp
+ assertEquals(range.getRight(), PositionFactory.create(4, 9));
+ }
+
+ @Test
+ public void testGetFindPositionRange_ClockSkew() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(2010).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(5).setTimestamp(0).build());
+ Position lastConfirmedEntry = PositionFactory.create(5, 5);
+
+ long targetTimestamp = 2009;
+ Pair<Position, Position> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 10);
+
+ assertNotNull(range);
+ assertNotNull(range.getLeft());
+ assertNotNull(range.getRight());
+ assertEquals(range.getLeft(), PositionFactory.create(1, 0));
+ assertEquals(range.getRight(), PositionFactory.create(4, 9));
+ }
+
+ @Test
+ public void testGetFindPositionRange_ClockSkewCase2() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(3000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(5).setTimestamp(0).build());
+ Position lastConfirmedEntry = PositionFactory.create(5, 5);
+
+ long targetTimestamp = 2995;
+ Pair<Position, Position> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 10);
+
+ assertNotNull(range);
+ assertNotNull(range.getLeft());
+ assertNotNull(range.getRight());
+ assertEquals(range.getLeft(), PositionFactory.create(2, 0));
+ assertEquals(range.getRight(), PositionFactory.create(4, 9));
+ }
+
+ @Test
+ public void testGetFindPositionRange_ClockSkewCase3() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(3000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(5).setTimestamp(0).build());
+ Position lastConfirmedEntry = PositionFactory.create(5, 5);
+
+ long targetTimestamp = 3005;
+ Pair<Position, Position> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 10);
+
+ assertNotNull(range);
+ assertNotNull(range.getLeft());
+ assertNotNull(range.getRight());
+ assertEquals(range.getLeft(), PositionFactory.create(2, 0));
+ assertEquals(range.getRight(), PositionFactory.create(4, 9));
+ }
+
+ @Test
+ public void
testGetFindPositionRange_FeatureDisabledWithNegativeClockSkew() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(2010).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(5).setTimestamp(0).build());
+ Position lastConfirmedEntry = PositionFactory.create(5, 5);
+
+ long targetTimestamp = 2009;
+ Pair<Position, Position> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, -1);
+
+ assertNotNull(range);
+ assertNull(range.getLeft());
+ assertNull(range.getRight());
+ }
+
+ @Test
+ public void testGetFindPositionRange_SingleLedger() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setTimestamp(0).build());
+ Position lastConfirmedEntry = PositionFactory.create(1, 5);
+
+ long targetTimestamp = 2500;
+ Pair<Position, Position> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 0);
+
+ assertNotNull(range);
+ assertNull(range.getLeft());
+ assertNull(range.getRight());
+ }
+
+ @Test
+ public void testGetFindPositionRange_SingleClosedLedger() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+ Position lastConfirmedEntry = PositionFactory.create(1, 9);
+
+ long targetTimestamp = 2500;
+ Pair<Position, Position> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 0);
+
+ assertNotNull(range);
+ assertNotNull(range.getLeft());
+ assertNull(range.getRight());
+ assertEquals(range.getLeft(), PositionFactory.create(1, 0));
+ }
}