This is an automated email from the ASF dual-hosted git repository.
daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c66167be55e [improve][ml] Filter out deleted entries before read
entries from ledger. (#21739)
c66167be55e is described below
commit c66167be55e9ed14261174a672952136c6fdb441
Author: 道君 <[email protected]>
AuthorDate: Fri Jan 19 14:38:14 2024 +0800
[improve][ml] Filter out deleted entries before read entries from ledger.
(#21739)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 4 +
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +-
.../mledger/impl/ReadOnlyCursorImpl.java | 6 +
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 232 +++++++++++++++++++--
4 files changed, 227 insertions(+), 17 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 060138f491e..38b142aca37 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -791,6 +791,8 @@ public class ManagedCursorImpl implements ManagedCursor {
int numOfEntriesToRead = applyMaxSizeCap(numberOfEntriesToRead,
maxSizeBytes);
PENDING_READ_OPS_UPDATER.incrementAndGet(this);
+ // Skip deleted entries.
+ skipCondition = skipCondition == null ? this::isMessageDeleted :
skipCondition.or(this::isMessageDeleted);
OpReadEntry op =
OpReadEntry.create(this, readPosition, numOfEntriesToRead,
callback, ctx, maxPosition, skipCondition);
ledger.asyncReadEntries(op);
@@ -949,6 +951,8 @@ public class ManagedCursorImpl implements ManagedCursor {
asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT,
callback, ctx,
maxPosition, skipCondition);
} else {
+ // Skip deleted entries.
+ skipCondition = skipCondition == null ? this::isMessageDeleted :
skipCondition.or(this::isMessageDeleted);
OpReadEntry op = OpReadEntry.create(this, readPosition,
numberOfEntriesToRead, callback,
ctx, maxPosition, skipCondition);
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 569776edccf..c839ee6f77c 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -4539,4 +4539,4 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
return theSlowestNonDurableReadPosition;
}
-}
+}
\ No newline at end of file
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
index 9102339b290..1661613f07d 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
@@ -70,4 +71,9 @@ public class ReadOnlyCursorImpl extends ManagedCursorImpl
implements ReadOnlyCur
public long getNumberOfEntries(Range<PositionImpl> range) {
return this.ledger.getNumberOfEntries(range);
}
+
+ @Override
+ public boolean isMessageDeleted(Position position) {
+ return false;
+ }
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 627ae73d928..644f53c3a52 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -43,6 +43,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -65,6 +66,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Cleanup;
@@ -72,6 +74,7 @@ import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
@@ -766,7 +769,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
@Test(timeOut = 20000)
void testResetCursor1() throws Exception {
ManagedLedger ledger = factory.open("my_test_move_cursor_ledger",
- new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
+ new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
ManagedCursor cursor = ledger.openCursor("trc1");
PositionImpl actualEarliest = (PositionImpl)
ledger.addEntry("dummy-entry-1".getBytes(Encoding));
ledger.addEntry("dummy-entry-2".getBytes(Encoding));
@@ -2286,7 +2289,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
assertNull(c1.findNewestMatching(
- entry -> Arrays.equals(entry.getDataAndRelease(),
"expired".getBytes(Encoding))));
+ entry -> Arrays.equals(entry.getDataAndRelease(),
"expired".getBytes(Encoding))));
}
@Test(timeOut = 20000)
@@ -2595,7 +2598,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
@Override
public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition,
- Object ctx) {
+ Object ctx) {
result.exception = exception;
counter.countDown();
}
@@ -2621,7 +2624,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
}
void internalTestFindNewestMatchingAllEntries(final String name, final int
entriesPerLedger,
- final int expectedEntryId) throws Exception {
+ final int expectedEntryId)
throws Exception {
final String ledgerAndCursorName = name;
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionSizeInMB(10);
@@ -2715,7 +2718,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
assertTrue((Arrays.equals(entries.get(0).getData(),
"entry1".getBytes(Encoding))
&& Arrays.equals(entries.get(1).getData(),
"entry3".getBytes(Encoding)))
|| (Arrays.equals(entries.get(0).getData(),
"entry3".getBytes(Encoding))
- && Arrays.equals(entries.get(1).getData(),
"entry1".getBytes(Encoding))));
+ && Arrays.equals(entries.get(1).getData(),
"entry1".getBytes(Encoding))));
entries.forEach(Entry::release);
// 3. Fail on reading non-existing position
@@ -3142,7 +3145,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
try {
bkc.openLedgerNoRecovery(ledgerId,
DigestType.fromApiDigestType(mlConfig.getDigestType()),
- mlConfig.getPassword());
+ mlConfig.getPassword());
fail("ledger should have deleted due to update-cursor failure");
} catch (BKException e) {
// ok
@@ -3761,17 +3764,17 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
pos.ackSet = bitSet.toLongArray();
cursor.asyncDelete(pos,
- new DeleteCallback() {
- @Override
- public void deleteComplete(Object ctx) {
- latch.countDown();
- }
+ new DeleteCallback() {
+ @Override
+ public void deleteComplete(Object ctx) {
+ latch.countDown();
+ }
- @Override
- public void deleteFailed(ManagedLedgerException exception,
Object ctx) {
- latch.countDown();
- }
- }, null);
+ @Override
+ public void deleteFailed(ManagedLedgerException exception,
Object ctx) {
+ latch.countDown();
+ }
+ }, null);
latch.await();
pos.ackSet = null;
}
@@ -4484,5 +4487,202 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
ledger.close();
}
+
+ @Test
+ public void testReadEntriesWithSkipDeletedEntries() throws Exception {
+ @Cleanup
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("testReadEntriesWithSkipDeletedEntries");
+ ledger = Mockito.spy(ledger);
+ List<Long> actualReadEntryIds = new ArrayList<>();
+ Mockito.doAnswer(inv -> {
+ long start = inv.getArgument(1);
+ long end = inv.getArgument(2);
+ for (long i = start; i <= end; i++) {
+ actualReadEntryIds.add(i);
+ }
+ return inv.callRealMethod();
+ })
+ .when(ledger)
+ .asyncReadEntry(Mockito.any(ReadHandle.class),
Mockito.anyLong(), Mockito.anyLong(), Mockito.any(), Mockito.any());
+ @Cleanup
+ ManagedCursor cursor = ledger.openCursor("c");
+
+ int entries = 20;
+ Position maxReadPosition = null;
+ Map<Integer, Position> map = new HashMap<>();
+ for (int i = 0; i < entries; i++) {
+ maxReadPosition = ledger.addEntry(new byte[1024]);
+ map.put(i, maxReadPosition);
+ }
+
+
+ Set<Position> deletedPositions = new HashSet<>();
+ deletedPositions.add(map.get(1));
+ deletedPositions.add(map.get(4));
+ deletedPositions.add(map.get(5));
+ deletedPositions.add(map.get(8));
+ deletedPositions.add(map.get(9));
+ deletedPositions.add(map.get(10));
+ deletedPositions.add(map.get(15));
+ deletedPositions.add(map.get(17));
+ deletedPositions.add(map.get(19));
+ cursor.delete(deletedPositions);
+
+ CompletableFuture<Void> f0 = new CompletableFuture<>();
+ List<Entry> readEntries = new ArrayList<>();
+ cursor.asyncReadEntries(5, -1L, new ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object ctx) {
+ readEntries.addAll(entries);
+ f0.complete(null);
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception,
Object ctx) {
+ f0.completeExceptionally(exception);
+ }
+ }, null, PositionImpl.get(maxReadPosition.getLedgerId(),
maxReadPosition.getEntryId()).getNext());
+
+ f0.get();
+
+ CompletableFuture<Void> f1 = new CompletableFuture<>();
+ cursor.asyncReadEntries(5, -1L, new ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object ctx) {
+ readEntries.addAll(entries);
+ f1.complete(null);
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception,
Object ctx) {
+ f1.completeExceptionally(exception);
+ }
+ }, null, PositionImpl.get(maxReadPosition.getLedgerId(),
maxReadPosition.getEntryId()).getNext());
+
+
+ f1.get();
+ CompletableFuture<Void> f2 = new CompletableFuture<>();
+ cursor.asyncReadEntries(100, -1L, new ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object ctx) {
+ readEntries.addAll(entries);
+ f2.complete(null);
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception,
Object ctx) {
+ f2.completeExceptionally(exception);
+ }
+ }, null, PositionImpl.get(maxReadPosition.getLedgerId(),
maxReadPosition.getEntryId()).getNext());
+
+ f2.get();
+
+ Position cursorReadPosition = cursor.getReadPosition();
+ Position expectReadPosition = maxReadPosition.getNext();
+ assertTrue(cursorReadPosition.getLedgerId() ==
expectReadPosition.getLedgerId()
+ && cursorReadPosition.getEntryId() ==
expectReadPosition.getEntryId());
+
+ assertEquals(readEntries.size(), actualReadEntryIds.size());
+ assertEquals(entries - deletedPositions.size(),
actualReadEntryIds.size());
+ for (Entry entry : readEntries) {
+ long entryId = entry.getEntryId();
+ assertTrue(actualReadEntryIds.contains(entryId));
+ }
+ }
+
+
+ @Test
+ public void testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions()
throws Exception {
+ @Cleanup
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
+
factory.open("testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions");
+ ledger = Mockito.spy(ledger);
+
+ List<Long> actualReadEntryIds = new ArrayList<>();
+ Mockito.doAnswer(inv -> {
+ long start = inv.getArgument(1);
+ long end = inv.getArgument(2);
+ for (long i = start; i <= end; i++) {
+ actualReadEntryIds.add(i);
+ }
+ return inv.callRealMethod();
+ })
+ .when(ledger)
+ .asyncReadEntry(Mockito.any(ReadHandle.class),
Mockito.anyLong(), Mockito.anyLong(), Mockito.any(), Mockito.any());
+ @Cleanup
+ ManagedCursor cursor = ledger.openCursor("c");
+
+ int entries = 20;
+ Position maxReadPosition0 = null;
+ Map<Integer, Position> map = new HashMap<>();
+ for (int i = 0; i < entries; i++) {
+ maxReadPosition0 = ledger.addEntry(new byte[1024]);
+ map.put(i, maxReadPosition0);
+ }
+
+ PositionImpl maxReadPosition =
+ PositionImpl.get(maxReadPosition0.getLedgerId(),
maxReadPosition0.getEntryId()).getNext();
+
+ Set<Position> deletedPositions = new HashSet<>();
+ deletedPositions.add(map.get(1));
+ deletedPositions.add(map.get(3));
+ deletedPositions.add(map.get(5));
+ cursor.delete(deletedPositions);
+
+ Set<Long> skippedPositions = new HashSet<>();
+ skippedPositions.add(map.get(6).getEntryId());
+ skippedPositions.add(map.get(7).getEntryId());
+ skippedPositions.add(map.get(8).getEntryId());
+ skippedPositions.add(map.get(11).getEntryId());
+ skippedPositions.add(map.get(15).getEntryId());
+ skippedPositions.add(map.get(16).getEntryId());
+
+ Predicate<PositionImpl> skipCondition = position ->
skippedPositions.contains(position.getEntryId());
+ List<Entry> readEntries = new ArrayList<>();
+
+ CompletableFuture<Void> f0 = new CompletableFuture<>();
+ cursor.asyncReadEntriesWithSkip(10, -1L, new ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object ctx) {
+ readEntries.addAll(entries);
+ f0.complete(null);
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception,
Object ctx) {
+ f0.completeExceptionally(exception);
+ }
+ }, null, maxReadPosition, skipCondition);
+
+ f0.get();
+ CompletableFuture<Void> f1 = new CompletableFuture<>();
+ cursor.asyncReadEntriesWithSkip(100, -1L, new ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object ctx) {
+ readEntries.addAll(entries);
+ f1.complete(null);
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception,
Object ctx) {
+ f1.completeExceptionally(exception);
+ }
+ }, null, maxReadPosition, skipCondition);
+ f1.get();
+
+
+ assertEquals(actualReadEntryIds.size(), readEntries.size());
+ assertEquals(entries - deletedPositions.size() -
skippedPositions.size(), actualReadEntryIds.size());
+ for (Entry entry : readEntries) {
+ long entryId = entry.getEntryId();
+ assertTrue(actualReadEntryIds.contains(entryId));
+ }
+
+ Position cursorReadPosition = cursor.getReadPosition();
+ Position expectReadPosition = maxReadPosition;
+ assertTrue(cursorReadPosition.getLedgerId() ==
expectReadPosition.getLedgerId()
+ && cursorReadPosition.getEntryId() ==
expectReadPosition.getEntryId());
+ }
+
private static final Logger log =
LoggerFactory.getLogger(ManagedCursorTest.class);
}