berg223 commented on code in PR #22560:
URL: https://github.com/apache/pulsar/pull/22560#discussion_r2107643732
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2208,42 +2211,140 @@ private void internalReadFromLedger(ReadHandle ledger,
OpReadEntry opReadEntry)
long lastEntry = min(firstEntry +
opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);
- // Filer out and skip unnecessary read entry
- if (opReadEntry.skipCondition != null) {
- long firstValidEntry = -1L;
- long lastValidEntry = -1L;
- long entryId = firstEntry;
- for (; entryId <= lastEntry; entryId++) {
- if
(opReadEntry.skipCondition.test(PositionFactory.create(ledger.getId(),
entryId))) {
- if (firstValidEntry != -1L) {
- break;
- }
- } else {
- if (firstValidEntry == -1L) {
- firstValidEntry = entryId;
- }
+ Predicate<Position> skipCondition = opReadEntry.skipCondition;
+ if (skipCondition == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Reading entries from ledger {} - first={}
last={}", name, ledger.getId(), firstEntry,
+ lastEntry);
+ }
+ asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry,
opReadEntry.ctx);
+ return;
+ }
- lastValidEntry = entryId;
- }
+ // Skip entries that don't match the predicate
+ SortedSet<Long> entryIds = new TreeSet<>();
+ for (long entryId = firstEntry; entryId <= lastEntry; entryId++) {
+ Position position = PositionFactory.create(ledger.getId(),
entryId);
+ if (skipCondition.test(position)) {
+ continue;
}
+ entryIds.add(entryId);
+ }
- // If all messages in [firstEntry...lastEntry] are filter out,
- // then manual call internalReadEntriesComplete to advance read
position.
- if (firstValidEntry == -1L) {
-
opReadEntry.internalReadEntriesComplete(Collections.emptyList(),
opReadEntry.ctx,
- PositionFactory.create(ledger.getId(), lastEntry));
- return;
+ Position lastReadPosition = PositionFactory.create(ledger.getId(),
lastEntry);
+ if (entryIds.isEmpty()) {
+ // Move `readPosition` of `cursor`.
+ opReadEntry.internalReadEntriesComplete(Collections.emptyList(),
opReadEntry.ctx, lastReadPosition);
+ return;
+ }
+
+ List<Pair<Long, Long>> ranges = toRanges(entryIds);
+ ReadEntriesCallback callback = new BatchReadEntriesCallback(entryIds,
opReadEntry, lastReadPosition);
+ for (Pair<Long, Long> pair : ranges) {
+ long start = pair.getLeft();
+ long end = pair.getRight();
+ asyncReadEntry(ledger, start, end,
opReadEntry.cursor.isCacheReadEntry(), callback, opReadEntry.ctx);
+ }
+ }
+
+ @VisibleForTesting
+ public static List<Pair<Long, Long>> toRanges(SortedSet<Long> entryIds) {
+ List<Pair<Long, Long>> ranges = new ArrayList<>();
+ long start = entryIds.first();
+ long end = start;
+ for (long entryId : entryIds) {
+ if (entryId - end > 1) {
+ ranges.add(Pair.of(start, end));
+ start = entryId;
+ end = start;
+ } else {
+ end = entryId;
}
+ }
+ ranges.add(Pair.of(start, end));
+ return ranges;
+ }
- firstEntry = firstValidEntry;
- lastEntry = lastValidEntry;
+ @VisibleForTesting
+ public static class BatchReadEntriesCallback implements
ReadEntriesCallback {
+ private final SortedSet<Long> entryIds;
+ private final List<Entry> entries;
+ private final OpReadEntry callback;
+ private volatile boolean completed = false;
+ private final Position lastReadPosition;
+
+ @VisibleForTesting
+ public BatchReadEntriesCallback(SortedSet<Long> entryIdSet,
OpReadEntry callback,
+ Position lastReadPosition) {
+ this.entryIds = entryIdSet;
+ this.entries = new ArrayList<>(entryIdSet.size());
+ this.callback = callback;
+ this.lastReadPosition = lastReadPosition;
}
- if (log.isDebugEnabled()) {
- log.debug("[{}] Reading entries from ledger {} - first={}
last={}", name, ledger.getId(), firstEntry,
- lastEntry);
+ @Override
+ public synchronized void readEntriesComplete(List<Entry> entries0,
Object ctx) {
+ if (completed) {
+ return;
+ }
+ entries.addAll(entries0);
+ if (entries.size() < entryIds.size()) {
+ return;
+ }
+ completed = true;
+ // Make sure the entries are in the correct order
+ entries.sort(Comparator.comparingLong(Entry::getEntryId));
+ // If we want to read [1, 2, 3, 4, 5], but we only read [1, 2, 3],
[4,5] are filtered, so we need to pass
+ // the `lastReadPosition([5])` to make sure the cursor read
position is correct.
+ callback.internalReadEntriesComplete(entries, ctx,
lastReadPosition);
+ }
+
+ @Override
+ public synchronized void readEntriesFailed(ManagedLedgerException
exception, Object ctx) {
+ if (completed) {
+ return;
+ }
+ completed = true;
+ // If there are entries been read success, try to let the read
operation success as possible.
+ List<Entry> entries = filterEntries();
+ if (!entries.isEmpty()) {
+ // Move the read position of the cursor to the next position
of the last read entry,
+ // or we will deliver the same entry to the consumer more than
once.
+ Entry entry = entries.get(entries.size() - 1);
+ Position position =
PositionFactory.create(entry.getLedgerId(), entry.getEntryId());
+ Position nextReadPosition =
callback.cursor.getNextAvailablePosition(position);
+ callback.updateReadPosition(nextReadPosition);
+ }
+ callback.internalReadEntriesFailed(entries, exception, ctx);
+ }
+
+ /**
+ * Filter the entries that have been read success.
+ * <p>
+ * If we want to read [1, 2, 3, 4, 5], but only read [1, 2, 4, 5]
successfully, [3] is read failed,
+ * only return [1,2] to the caller, to make sure the read operation
success as possible
+ * and keep the ordering guarantee.
+ *
+ * @return filtered entries
+ */
+ private List<Entry> filterEntries() {
+ if (entries.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<Entry> entries = new ArrayList<>();
+ for (long entryId : entryIds) {
+ if (this.entries.isEmpty()) {
+ break;
+ }
+ Entry entry = this.entries.remove(0);
Review Comment:
Is entries in ascending order? For example:
1. first read [1,5]
2. second read [7,8]
3. third read [10, 20]
4. [7, 8] is fetched from BK and callback readEntriesComplete without sort.
5. [1,5] is fetched from BK and callback readEntriesComplete without sort.
6. At last, [10,20] is failed to read and callback readEntriesFailed here.
After step 6, entries is [7, 8, 1 , 2, 3, 4, 5]. So it's not in order.
Have I missed something? @dao-jun
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]