coderzc commented on code in PR #21739:
URL: https://github.com/apache/pulsar/pull/21739#discussion_r1429787167


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2088,44 +2094,132 @@ private void internalReadFromLedger(ReadHandle ledger, 
OpReadEntry opReadEntry)
             return;
         }
 
-        long lastEntry = min(firstEntry + 
opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);
+        Predicate<PositionImpl> skipCond = opReadEntry.skipCondition;
+        if (skipCond == null) {
+            long lastEntry = min(firstEntry + 
opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Reading entries from ledger {} - first={} 
last={}", name, ledger.getId(), firstEntry,
+                        lastEntry);
+            }
+            asyncReadEntry(
+                    ledger, firstEntry, lastEntry, 
opReadEntry.cursor.isCacheReadEntry(), opReadEntry, opReadEntry.ctx);
+            return;
+        }
 
-        // Filer out and skip unnecessary read entry
-        if (opReadEntry.skipCondition != null) {
-            long firstValidEntry = -1L;
-            long lastValidEntry = -1L;
-            long entryId = firstEntry;
-            for (; entryId <= lastEntry; entryId++) {
-                if 
(opReadEntry.skipCondition.test(PositionImpl.get(ledger.getId(), entryId))) {
-                    if (firstValidEntry != -1L) {
-                        break;
-                    }
-                } else {
-                    if (firstValidEntry == -1L) {
-                        firstValidEntry = entryId;
-                    }
+        // Try to read entries in the current ledger what we need through a 
single `entryIdSet` as much as possible.
+        long entryId = firstEntry;
+        int count = 0;
+        SortedSet<Long> entryIds = new TreeSet<>();
+        int entriesToRead = opReadEntry.getNumberOfEntriesToRead();
+        while (entryId <= lastEntryInLedger || count <= entriesToRead) {
+            PositionImpl position = PositionImpl.get(ledger.getId(), entryId);
+            if (!skipCond.test(position)) {
+                entryIds.add(entryId);
+                count++;
+            }
+            entryId++;
+        }
+        asyncReadEntry(ledger, entryIds, opReadEntry, opReadEntry.ctx);
+    }
 
-                    lastValidEntry = entryId;
+
+    private void asyncReadEntry(ReadHandle ledger, SortedSet<Long> entryIds, 
OpReadEntry opReadEntry, Object ctx) {
+        if (entryIds.isEmpty()) {
+            // If the entryIds is empty, should not move the `readPosition` of 
`cursor`.
+            // OpReadEntry#internalReadEntriesComplete will move the 
`readPosition` of `cursor`
+            // to the next position of `lastEntry`, so here uses the previous 
position of `readPosition`
+            // to offset the impact of OpReadEntry#internalReadEntriesComplete.
+            PositionImpl previous = 
this.getPreviousPosition(opReadEntry.readPosition);
+            opReadEntry.internalReadEntriesComplete(Collections.emptyList(), 
ctx, previous);
+            return;

Review Comment:
   Why don't move the readPosition if all messages are filtered out, this may 
cause the read to be stuck in a dead loop.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -60,7 +61,18 @@ public static OpReadEntry create(ManagedCursorImpl cursor, 
PositionImpl readPosi
             maxPosition = PositionImpl.LATEST;
         }
         op.maxPosition = maxPosition;
-        op.skipCondition = skipCondition;
+
+        Predicate<PositionImpl> skipCondition0 = cursor instanceof 
ReadOnlyCursor ? null : cursor::isMessageDeleted;
+        if (skipCondition == null) {
+            op.skipCondition = skipCondition0;
+        } else {
+            if (skipCondition0 == null) {
+                op.skipCondition = skipCondition;
+            } else {
+                op.skipCondition = skipCondition.or(skipCondition0);
+            }
+        }

Review Comment:
   I think it's better to pass a parameter to specify whether to skip deleted 
messages or pass in skipCondition instead of changing the behavior of read 
directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to