eolivelli commented on code in PR #17241:
URL: https://github.com/apache/pulsar/pull/17241#discussion_r956140682


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -295,51 +439,70 @@ private void asyncReadEntry0(ReadHandle lh, long 
firstEntry, long lastEntry, boo
             }
 
             // Read all the entries from bookkeeper
-            lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
-                    ledgerEntries -> {
-                        requireNonNull(ml.getName());
-                        requireNonNull(ml.getExecutor());
-
-                        try {
-                            // We got the entries, we need to transform them 
to a List<> type
-                            long totalSize = 0;
-                            final List<EntryImpl> entriesToReturn = 
Lists.newArrayListWithExpectedSize(entriesToRead);
-                            for (LedgerEntry e : ledgerEntries) {
-                                EntryImpl entry = 
RangeEntryCacheManagerImpl.create(e, interceptor);
-                                entriesToReturn.add(entry);
-                                totalSize += entry.getLength();
-                                if (shouldCacheEntry) {
-                                    EntryImpl cacheEntry = 
EntryImpl.create(entry);
-                                    insert(cacheEntry);
-                                    cacheEntry.release();
-                                }
-                            }
-
-                            
manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
-                            
ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
-
-                            callback.readEntriesComplete((List) 
entriesToReturn, ctx);
-                        } finally {
-                            ledgerEntries.close();
-                        }
-                    }, 
ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
+            final PendingReadKey key = new PendingReadKey(firstEntry, 
lastEntry);
+
+            Map<PendingReadKey, CachedPendingRead> pendingReadsForLedger =
+                    cachedPendingReads.computeIfAbsent(ledgerId, (l) -> new 
ConcurrentHashMap<>());
+
+            boolean listenerAdded = false;
+            while (!listenerAdded) {
+                AtomicBoolean createdByThisThread = new AtomicBoolean();
+                CachedPendingRead cachedPendingRead = findBestCandidate(key,
+                        pendingReadsForLedger, createdByThisThread);
+                listenerAdded = cachedPendingRead.addListener(callback, ctx, 
key.startEntry, key.endEntry);
+
+                if (createdByThisThread.get()) {
+                    CompletableFuture<List<EntryImpl>> readResult = 
lh.readAsync(firstEntry, lastEntry)
+                            .thenApply(
+                                    ledgerEntries -> {
+                                        requireNonNull(ml.getName());
+                                        requireNonNull(ml.getExecutor());
+
+                                        try {
+                                            // We got the entries, we need to 
transform them to a List<> type
+                                            long totalSize = 0;
+                                            final List<EntryImpl> 
entriesToReturn =
+                                                    
Lists.newArrayListWithExpectedSize(entriesToRead);
+                                            for (LedgerEntry e : 
ledgerEntries) {
+                                                EntryImpl entry = 
RangeEntryCacheManagerImpl.create(e, interceptor);
+                                                entriesToReturn.add(entry);
+                                                totalSize += entry.getLength();
+                                                if (shouldCacheEntry) {
+                                                    EntryImpl cacheEntry = 
EntryImpl.create(entry);
+                                                    insert(cacheEntry);
+                                                    cacheEntry.release();
+                                                }
+                                            }
+
+                                            
manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
+                                            
ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
+
+                                            return entriesToReturn;
+                                        } finally {
+                                            ledgerEntries.close();
+                                        }
+                                    });
+                    // handle LH invalidation
+                    readResult.exceptionally(exception -> {
                         if (exception instanceof BKException
                                 && ((BKException) exception).getCode() == 
BKException.Code.TooManyRequestsException) {
-                            
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
                         } else {
                             ml.invalidateLedgerHandle(lh);
-                            ManagedLedgerException mlException = 
createManagedLedgerException(exception);
-                            callback.readEntriesFailed(mlException, ctx);
                         }
                         return null;
-            });
+                    });
+
+                    cachedPendingRead.attach(readResult);
+                }
+            }
         }
     }
 
     @Override
     public void clear() {
         Pair<Integer, Long> removedPair = entries.clear();
         manager.entriesRemoved(removedPair.getRight(), removedPair.getLeft());
+        cachedPendingReads.clear();

Review Comment:
   the elements here are one entry per ledger.
   the per ledger maps are evicted here
   
https://github.com/apache/pulsar/pull/17241/files#diff-c55509e3ab1389d89a58fd564f2e318dbb95f50121ab33c729a7ca4a21d02ef1R340
   
   We could remove the entry for a ledger in case of rollover.
   but I thought it requires more coordination.
   I will try to improve this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to