eolivelli commented on code in PR #17241:
URL: https://github.com/apache/pulsar/pull/17241#discussion_r956141833
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -295,51 +439,70 @@ private void asyncReadEntry0(ReadHandle lh, long
firstEntry, long lastEntry, boo
}
// Read all the entries from bookkeeper
- lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
- ledgerEntries -> {
- requireNonNull(ml.getName());
- requireNonNull(ml.getExecutor());
-
- try {
- // We got the entries, we need to transform them
to a List<> type
- long totalSize = 0;
- final List<EntryImpl> entriesToReturn =
Lists.newArrayListWithExpectedSize(entriesToRead);
- for (LedgerEntry e : ledgerEntries) {
- EntryImpl entry =
RangeEntryCacheManagerImpl.create(e, interceptor);
- entriesToReturn.add(entry);
- totalSize += entry.getLength();
- if (shouldCacheEntry) {
- EntryImpl cacheEntry =
EntryImpl.create(entry);
- insert(cacheEntry);
- cacheEntry.release();
- }
- }
-
-
manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
-
ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
-
- callback.readEntriesComplete((List)
entriesToReturn, ctx);
- } finally {
- ledgerEntries.close();
- }
- },
ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
+ final PendingReadKey key = new PendingReadKey(firstEntry,
lastEntry);
+
+ Map<PendingReadKey, CachedPendingRead> pendingReadsForLedger =
+ cachedPendingReads.computeIfAbsent(ledgerId, (l) -> new
ConcurrentHashMap<>());
+
+ boolean listenerAdded = false;
+ while (!listenerAdded) {
+ AtomicBoolean createdByThisThread = new AtomicBoolean();
+ CachedPendingRead cachedPendingRead = findBestCandidate(key,
Review Comment:
findBestCandidate is kind of a computeIfAbsent method (when I was not
looking for "includes" it was actually computeIfAbsent on a ConcurrentHashMap).
So I have to create the object and put it into the map inside the "lock"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]