dao-jun commented on code in PR #21739:
URL: https://github.com/apache/pulsar/pull/21739#discussion_r1429867309


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2088,44 +2094,132 @@ private void internalReadFromLedger(ReadHandle ledger, 
OpReadEntry opReadEntry)
             return;
         }
 
-        long lastEntry = min(firstEntry + 
opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);
+        Predicate<PositionImpl> skipCond = opReadEntry.skipCondition;
+        if (skipCond == null) {
+            long lastEntry = min(firstEntry + 
opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Reading entries from ledger {} - first={} 
last={}", name, ledger.getId(), firstEntry,
+                        lastEntry);
+            }
+            asyncReadEntry(
+                    ledger, firstEntry, lastEntry, 
opReadEntry.cursor.isCacheReadEntry(), opReadEntry, opReadEntry.ctx);
+            return;
+        }
 
-        // Filer out and skip unnecessary read entry
-        if (opReadEntry.skipCondition != null) {
-            long firstValidEntry = -1L;
-            long lastValidEntry = -1L;
-            long entryId = firstEntry;
-            for (; entryId <= lastEntry; entryId++) {
-                if 
(opReadEntry.skipCondition.test(PositionImpl.get(ledger.getId(), entryId))) {
-                    if (firstValidEntry != -1L) {
-                        break;
-                    }
-                } else {
-                    if (firstValidEntry == -1L) {
-                        firstValidEntry = entryId;
-                    }
+        // Try to read entries in the current ledger what we need through a 
single `entryIdSet` as much as possible.
+        long entryId = firstEntry;
+        int count = 0;
+        SortedSet<Long> entryIds = new TreeSet<>();
+        int entriesToRead = opReadEntry.getNumberOfEntriesToRead();
+        while (entryId <= lastEntryInLedger || count <= entriesToRead) {
+            PositionImpl position = PositionImpl.get(ledger.getId(), entryId);
+            if (!skipCond.test(position)) {
+                entryIds.add(entryId);
+                count++;
+            }
+            entryId++;
+        }
+        asyncReadEntry(ledger, entryIds, opReadEntry, opReadEntry.ctx);
+    }
 
-                    lastValidEntry = entryId;
+
+    private void asyncReadEntry(ReadHandle ledger, SortedSet<Long> entryIds, 
OpReadEntry opReadEntry, Object ctx) {
+        if (entryIds.isEmpty()) {
+            // If the entryIds is empty, should not move the `readPosition` of 
`cursor`.
+            // OpReadEntry#internalReadEntriesComplete will move the 
`readPosition` of `cursor`
+            // to the next position of `lastEntry`, so here uses the previous 
position of `readPosition`
+            // to offset the impact of OpReadEntry#internalReadEntriesComplete.
+            PositionImpl previous = 
this.getPreviousPosition(opReadEntry.readPosition);
+            opReadEntry.internalReadEntriesComplete(Collections.emptyList(), 
ctx, previous);
+            return;

Review Comment:
   @coderzc Thanks for review, this PR is still in drafting. I just wondering 
if this improvement makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to