This is an automated email from the ASF dual-hosted git repository.

daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c66167be55e [improve][ml] Filter out deleted entries before read 
entries from ledger. (#21739)
c66167be55e is described below

commit c66167be55e9ed14261174a672952136c6fdb441
Author: 道君 <[email protected]>
AuthorDate: Fri Jan 19 14:38:14 2024 +0800

    [improve][ml] Filter out deleted entries before read entries from ledger. 
(#21739)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   4 +
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   2 +-
 .../mledger/impl/ReadOnlyCursorImpl.java           |   6 +
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 232 +++++++++++++++++++--
 4 files changed, 227 insertions(+), 17 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 060138f491e..38b142aca37 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -791,6 +791,8 @@ public class ManagedCursorImpl implements ManagedCursor {
         int numOfEntriesToRead = applyMaxSizeCap(numberOfEntriesToRead, 
maxSizeBytes);
 
         PENDING_READ_OPS_UPDATER.incrementAndGet(this);
+        // Skip deleted entries.
+        skipCondition = skipCondition == null ? this::isMessageDeleted : 
skipCondition.or(this::isMessageDeleted);
         OpReadEntry op =
                 OpReadEntry.create(this, readPosition, numOfEntriesToRead, 
callback, ctx, maxPosition, skipCondition);
         ledger.asyncReadEntries(op);
@@ -949,6 +951,8 @@ public class ManagedCursorImpl implements ManagedCursor {
             asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, 
callback, ctx,
                     maxPosition, skipCondition);
         } else {
+            // Skip deleted entries.
+            skipCondition = skipCondition == null ? this::isMessageDeleted : 
skipCondition.or(this::isMessageDeleted);
             OpReadEntry op = OpReadEntry.create(this, readPosition, 
numberOfEntriesToRead, callback,
                     ctx, maxPosition, skipCondition);
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 569776edccf..c839ee6f77c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -4539,4 +4539,4 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
         return theSlowestNonDurableReadPosition;
     }
-}
+}
\ No newline at end of file
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
index 9102339b290..1661613f07d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.ReadOnlyCursor;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
@@ -70,4 +71,9 @@ public class ReadOnlyCursorImpl extends ManagedCursorImpl 
implements ReadOnlyCur
     public long getNumberOfEntries(Range<PositionImpl> range) {
         return this.ledger.getNumberOfEntries(range);
     }
+
+    @Override
+    public boolean isMessageDeleted(Position position) {
+        return false;
+    }
 }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 627ae73d928..644f53c3a52 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -43,6 +43,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -65,6 +66,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
@@ -72,6 +74,7 @@ import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
@@ -766,7 +769,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
     @Test(timeOut = 20000)
     void testResetCursor1() throws Exception {
         ManagedLedger ledger = factory.open("my_test_move_cursor_ledger",
-            new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
         ManagedCursor cursor = ledger.openCursor("trc1");
         PositionImpl actualEarliest = (PositionImpl) 
ledger.addEntry("dummy-entry-1".getBytes(Encoding));
         ledger.addEntry("dummy-entry-2".getBytes(Encoding));
@@ -2286,7 +2289,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
 
         ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
         assertNull(c1.findNewestMatching(
-            entry -> Arrays.equals(entry.getDataAndRelease(), 
"expired".getBytes(Encoding))));
+                entry -> Arrays.equals(entry.getDataAndRelease(), 
"expired".getBytes(Encoding))));
     }
 
     @Test(timeOut = 20000)
@@ -2595,7 +2598,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
 
             @Override
             public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition,
-                    Object ctx) {
+                                        Object ctx) {
                 result.exception = exception;
                 counter.countDown();
             }
@@ -2621,7 +2624,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
     }
 
     void internalTestFindNewestMatchingAllEntries(final String name, final int 
entriesPerLedger,
-            final int expectedEntryId) throws Exception {
+                                                  final int expectedEntryId) 
throws Exception {
         final String ledgerAndCursorName = name;
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setRetentionSizeInMB(10);
@@ -2715,7 +2718,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         assertTrue((Arrays.equals(entries.get(0).getData(), 
"entry1".getBytes(Encoding))
                 && Arrays.equals(entries.get(1).getData(), 
"entry3".getBytes(Encoding)))
                 || (Arrays.equals(entries.get(0).getData(), 
"entry3".getBytes(Encoding))
-                        && Arrays.equals(entries.get(1).getData(), 
"entry1".getBytes(Encoding))));
+                && Arrays.equals(entries.get(1).getData(), 
"entry1".getBytes(Encoding))));
         entries.forEach(Entry::release);
 
         // 3. Fail on reading non-existing position
@@ -3142,7 +3145,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
 
         try {
             bkc.openLedgerNoRecovery(ledgerId, 
DigestType.fromApiDigestType(mlConfig.getDigestType()),
-                                     mlConfig.getPassword());
+                    mlConfig.getPassword());
             fail("ledger should have deleted due to update-cursor failure");
         } catch (BKException e) {
             // ok
@@ -3761,17 +3764,17 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         pos.ackSet = bitSet.toLongArray();
 
         cursor.asyncDelete(pos,
-            new DeleteCallback() {
-                @Override
-                public void deleteComplete(Object ctx) {
-                    latch.countDown();
-                }
+                new DeleteCallback() {
+                    @Override
+                    public void deleteComplete(Object ctx) {
+                        latch.countDown();
+                    }
 
-                @Override
-                public void deleteFailed(ManagedLedgerException exception, 
Object ctx) {
-                    latch.countDown();
-                }
-            }, null);
+                    @Override
+                    public void deleteFailed(ManagedLedgerException exception, 
Object ctx) {
+                        latch.countDown();
+                    }
+                }, null);
         latch.await();
         pos.ackSet = null;
     }
@@ -4484,5 +4487,202 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         ledger.close();
     }
 
+
+    @Test
+    public void testReadEntriesWithSkipDeletedEntries() throws Exception {
+        @Cleanup
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("testReadEntriesWithSkipDeletedEntries");
+        ledger = Mockito.spy(ledger);
+        List<Long> actualReadEntryIds = new ArrayList<>();
+        Mockito.doAnswer(inv -> {
+                    long start = inv.getArgument(1);
+                    long end = inv.getArgument(2);
+                    for (long i = start; i <= end; i++) {
+                        actualReadEntryIds.add(i);
+                    }
+                    return inv.callRealMethod();
+                })
+                .when(ledger)
+                .asyncReadEntry(Mockito.any(ReadHandle.class), 
Mockito.anyLong(), Mockito.anyLong(), Mockito.any(), Mockito.any());
+        @Cleanup
+        ManagedCursor cursor = ledger.openCursor("c");
+
+        int entries = 20;
+        Position maxReadPosition = null;
+        Map<Integer, Position> map = new HashMap<>();
+        for (int i = 0; i < entries; i++) {
+            maxReadPosition = ledger.addEntry(new byte[1024]);
+            map.put(i, maxReadPosition);
+        }
+
+
+        Set<Position> deletedPositions = new HashSet<>();
+        deletedPositions.add(map.get(1));
+        deletedPositions.add(map.get(4));
+        deletedPositions.add(map.get(5));
+        deletedPositions.add(map.get(8));
+        deletedPositions.add(map.get(9));
+        deletedPositions.add(map.get(10));
+        deletedPositions.add(map.get(15));
+        deletedPositions.add(map.get(17));
+        deletedPositions.add(map.get(19));
+        cursor.delete(deletedPositions);
+
+        CompletableFuture<Void> f0 = new CompletableFuture<>();
+        List<Entry> readEntries = new ArrayList<>();
+        cursor.asyncReadEntries(5, -1L, new ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                readEntries.addAll(entries);
+                f0.complete(null);
+            }
+
+            @Override
+            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
+                f0.completeExceptionally(exception);
+            }
+        }, null, PositionImpl.get(maxReadPosition.getLedgerId(), 
maxReadPosition.getEntryId()).getNext());
+
+        f0.get();
+
+        CompletableFuture<Void> f1 = new CompletableFuture<>();
+        cursor.asyncReadEntries(5, -1L, new ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                readEntries.addAll(entries);
+                f1.complete(null);
+            }
+
+            @Override
+            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
+                f1.completeExceptionally(exception);
+            }
+        }, null, PositionImpl.get(maxReadPosition.getLedgerId(), 
maxReadPosition.getEntryId()).getNext());
+
+
+        f1.get();
+        CompletableFuture<Void> f2 = new CompletableFuture<>();
+        cursor.asyncReadEntries(100, -1L, new ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                readEntries.addAll(entries);
+                f2.complete(null);
+            }
+
+            @Override
+            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
+                f2.completeExceptionally(exception);
+            }
+        }, null, PositionImpl.get(maxReadPosition.getLedgerId(), 
maxReadPosition.getEntryId()).getNext());
+
+        f2.get();
+
+        Position cursorReadPosition = cursor.getReadPosition();
+        Position expectReadPosition = maxReadPosition.getNext();
+        assertTrue(cursorReadPosition.getLedgerId() == 
expectReadPosition.getLedgerId()
+                && cursorReadPosition.getEntryId() == 
expectReadPosition.getEntryId());
+
+        assertEquals(readEntries.size(), actualReadEntryIds.size());
+        assertEquals(entries - deletedPositions.size(), 
actualReadEntryIds.size());
+        for (Entry entry : readEntries) {
+            long entryId = entry.getEntryId();
+            assertTrue(actualReadEntryIds.contains(entryId));
+        }
+    }
+
+
+    @Test
+    public void testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions() 
throws Exception {
+        @Cleanup
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl)
+                
factory.open("testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions");
+        ledger = Mockito.spy(ledger);
+
+        List<Long> actualReadEntryIds = new ArrayList<>();
+        Mockito.doAnswer(inv -> {
+                    long start = inv.getArgument(1);
+                    long end = inv.getArgument(2);
+                    for (long i = start; i <= end; i++) {
+                        actualReadEntryIds.add(i);
+                    }
+                    return inv.callRealMethod();
+                })
+                .when(ledger)
+                .asyncReadEntry(Mockito.any(ReadHandle.class), 
Mockito.anyLong(), Mockito.anyLong(), Mockito.any(), Mockito.any());
+        @Cleanup
+        ManagedCursor cursor = ledger.openCursor("c");
+
+        int entries = 20;
+        Position maxReadPosition0 = null;
+        Map<Integer, Position> map = new HashMap<>();
+        for (int i = 0; i < entries; i++) {
+            maxReadPosition0 = ledger.addEntry(new byte[1024]);
+            map.put(i, maxReadPosition0);
+        }
+
+        PositionImpl maxReadPosition =
+                PositionImpl.get(maxReadPosition0.getLedgerId(), 
maxReadPosition0.getEntryId()).getNext();
+
+        Set<Position> deletedPositions = new HashSet<>();
+        deletedPositions.add(map.get(1));
+        deletedPositions.add(map.get(3));
+        deletedPositions.add(map.get(5));
+        cursor.delete(deletedPositions);
+
+        Set<Long> skippedPositions = new HashSet<>();
+        skippedPositions.add(map.get(6).getEntryId());
+        skippedPositions.add(map.get(7).getEntryId());
+        skippedPositions.add(map.get(8).getEntryId());
+        skippedPositions.add(map.get(11).getEntryId());
+        skippedPositions.add(map.get(15).getEntryId());
+        skippedPositions.add(map.get(16).getEntryId());
+
+        Predicate<PositionImpl> skipCondition = position -> 
skippedPositions.contains(position.getEntryId());
+        List<Entry> readEntries = new ArrayList<>();
+
+        CompletableFuture<Void> f0 = new CompletableFuture<>();
+        cursor.asyncReadEntriesWithSkip(10, -1L, new ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                readEntries.addAll(entries);
+                f0.complete(null);
+            }
+
+            @Override
+            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
+                f0.completeExceptionally(exception);
+            }
+        }, null, maxReadPosition, skipCondition);
+
+        f0.get();
+        CompletableFuture<Void> f1 = new CompletableFuture<>();
+        cursor.asyncReadEntriesWithSkip(100, -1L, new ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                readEntries.addAll(entries);
+                f1.complete(null);
+            }
+
+            @Override
+            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
+                f1.completeExceptionally(exception);
+            }
+        }, null, maxReadPosition, skipCondition);
+        f1.get();
+
+
+        assertEquals(actualReadEntryIds.size(), readEntries.size());
+        assertEquals(entries - deletedPositions.size() - 
skippedPositions.size(), actualReadEntryIds.size());
+        for (Entry entry : readEntries) {
+            long entryId = entry.getEntryId();
+            assertTrue(actualReadEntryIds.contains(entryId));
+        }
+
+        Position cursorReadPosition = cursor.getReadPosition();
+        Position expectReadPosition = maxReadPosition;
+        assertTrue(cursorReadPosition.getLedgerId() == 
expectReadPosition.getLedgerId()
+                && cursorReadPosition.getEntryId() == 
expectReadPosition.getEntryId());
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorTest.class);
 }

Reply via email to