eolivelli commented on code in PR #17241:
URL: https://github.com/apache/pulsar/pull/17241#discussion_r956140682
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -295,51 +439,70 @@ private void asyncReadEntry0(ReadHandle lh, long
firstEntry, long lastEntry, boo
}
// Read all the entries from bookkeeper
- lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
- ledgerEntries -> {
- requireNonNull(ml.getName());
- requireNonNull(ml.getExecutor());
-
- try {
- // We got the entries, we need to transform them
to a List<> type
- long totalSize = 0;
- final List<EntryImpl> entriesToReturn =
Lists.newArrayListWithExpectedSize(entriesToRead);
- for (LedgerEntry e : ledgerEntries) {
- EntryImpl entry =
RangeEntryCacheManagerImpl.create(e, interceptor);
- entriesToReturn.add(entry);
- totalSize += entry.getLength();
- if (shouldCacheEntry) {
- EntryImpl cacheEntry =
EntryImpl.create(entry);
- insert(cacheEntry);
- cacheEntry.release();
- }
- }
-
-
manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
-
ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
-
- callback.readEntriesComplete((List)
entriesToReturn, ctx);
- } finally {
- ledgerEntries.close();
- }
- },
ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
+ final PendingReadKey key = new PendingReadKey(firstEntry,
lastEntry);
+
+ Map<PendingReadKey, CachedPendingRead> pendingReadsForLedger =
+ cachedPendingReads.computeIfAbsent(ledgerId, (l) -> new
ConcurrentHashMap<>());
+
+ boolean listenerAdded = false;
+ while (!listenerAdded) {
+ AtomicBoolean createdByThisThread = new AtomicBoolean();
+ CachedPendingRead cachedPendingRead = findBestCandidate(key,
+ pendingReadsForLedger, createdByThisThread);
+ listenerAdded = cachedPendingRead.addListener(callback, ctx,
key.startEntry, key.endEntry);
+
+ if (createdByThisThread.get()) {
+ CompletableFuture<List<EntryImpl>> readResult =
lh.readAsync(firstEntry, lastEntry)
+ .thenApply(
+ ledgerEntries -> {
+ requireNonNull(ml.getName());
+ requireNonNull(ml.getExecutor());
+
+ try {
+ // We got the entries, we need to
transform them to a List<> type
+ long totalSize = 0;
+ final List<EntryImpl>
entriesToReturn =
+
Lists.newArrayListWithExpectedSize(entriesToRead);
+ for (LedgerEntry e :
ledgerEntries) {
+ EntryImpl entry =
RangeEntryCacheManagerImpl.create(e, interceptor);
+ entriesToReturn.add(entry);
+ totalSize += entry.getLength();
+ if (shouldCacheEntry) {
+ EntryImpl cacheEntry =
EntryImpl.create(entry);
+ insert(cacheEntry);
+ cacheEntry.release();
+ }
+ }
+
+
manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
+
ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
+
+ return entriesToReturn;
+ } finally {
+ ledgerEntries.close();
+ }
+ });
+ // handle LH invalidation
+ readResult.exceptionally(exception -> {
if (exception instanceof BKException
&& ((BKException) exception).getCode() ==
BKException.Code.TooManyRequestsException) {
-
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
} else {
ml.invalidateLedgerHandle(lh);
- ManagedLedgerException mlException =
createManagedLedgerException(exception);
- callback.readEntriesFailed(mlException, ctx);
}
return null;
- });
+ });
+
+ cachedPendingRead.attach(readResult);
+ }
+ }
}
}
@Override
public void clear() {
Pair<Integer, Long> removedPair = entries.clear();
manager.entriesRemoved(removedPair.getRight(), removedPair.getLeft());
+ cachedPendingReads.clear();
Review Comment:
the elements here are one entry per ledger.
the per ledger maps are evicted here
https://github.com/apache/pulsar/pull/17241/files#diff-c55509e3ab1389d89a58fd564f2e318dbb95f50121ab33c729a7ca4a21d02ef1R340
We could remove the entry for a ledger in case of rollover.
but I thought it requires more coordination.
I will try to improve this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]