codelipenghui commented on code in PR #17241:
URL: https://github.com/apache/pulsar/pull/17241#discussion_r956518885


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -295,51 +439,70 @@ private void asyncReadEntry0(ReadHandle lh, long 
firstEntry, long lastEntry, boo
             }
 
             // Read all the entries from bookkeeper
-            lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
-                    ledgerEntries -> {
-                        requireNonNull(ml.getName());
-                        requireNonNull(ml.getExecutor());
-
-                        try {
-                            // We got the entries, we need to transform them 
to a List<> type
-                            long totalSize = 0;
-                            final List<EntryImpl> entriesToReturn = 
Lists.newArrayListWithExpectedSize(entriesToRead);
-                            for (LedgerEntry e : ledgerEntries) {
-                                EntryImpl entry = 
RangeEntryCacheManagerImpl.create(e, interceptor);
-                                entriesToReturn.add(entry);
-                                totalSize += entry.getLength();
-                                if (shouldCacheEntry) {
-                                    EntryImpl cacheEntry = 
EntryImpl.create(entry);
-                                    insert(cacheEntry);
-                                    cacheEntry.release();
-                                }
-                            }
-
-                            
manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
-                            
ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
-
-                            callback.readEntriesComplete((List) 
entriesToReturn, ctx);
-                        } finally {
-                            ledgerEntries.close();
-                        }
-                    }, 
ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
+            final PendingReadKey key = new PendingReadKey(firstEntry, 
lastEntry);
+
+            Map<PendingReadKey, CachedPendingRead> pendingReadsForLedger =
+                    cachedPendingReads.computeIfAbsent(ledgerId, (l) -> new 
ConcurrentHashMap<>());
+
+            boolean listenerAdded = false;
+            while (!listenerAdded) {
+                AtomicBoolean createdByThisThread = new AtomicBoolean();
+                CachedPendingRead cachedPendingRead = findBestCandidate(key,

Review Comment:
   Yes, I just want to avoid creating `AtomicBoolean` here.
   Can we use the callback size of the `cachedPendingRead` or a boolean inside 
`CachedPendingRead`?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -295,51 +439,70 @@ private void asyncReadEntry0(ReadHandle lh, long 
firstEntry, long lastEntry, boo
             }
 
             // Read all the entries from bookkeeper
-            lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
-                    ledgerEntries -> {
-                        requireNonNull(ml.getName());
-                        requireNonNull(ml.getExecutor());
-
-                        try {
-                            // We got the entries, we need to transform them 
to a List<> type
-                            long totalSize = 0;
-                            final List<EntryImpl> entriesToReturn = 
Lists.newArrayListWithExpectedSize(entriesToRead);
-                            for (LedgerEntry e : ledgerEntries) {
-                                EntryImpl entry = 
RangeEntryCacheManagerImpl.create(e, interceptor);
-                                entriesToReturn.add(entry);
-                                totalSize += entry.getLength();
-                                if (shouldCacheEntry) {
-                                    EntryImpl cacheEntry = 
EntryImpl.create(entry);
-                                    insert(cacheEntry);
-                                    cacheEntry.release();
-                                }
-                            }
-
-                            
manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
-                            
ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
-
-                            callback.readEntriesComplete((List) 
entriesToReturn, ctx);
-                        } finally {
-                            ledgerEntries.close();
-                        }
-                    }, 
ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
+            final PendingReadKey key = new PendingReadKey(firstEntry, 
lastEntry);
+
+            Map<PendingReadKey, CachedPendingRead> pendingReadsForLedger =
+                    cachedPendingReads.computeIfAbsent(ledgerId, (l) -> new 
ConcurrentHashMap<>());
+
+            boolean listenerAdded = false;
+            while (!listenerAdded) {
+                AtomicBoolean createdByThisThread = new AtomicBoolean();
+                CachedPendingRead cachedPendingRead = findBestCandidate(key,
+                        pendingReadsForLedger, createdByThisThread);
+                listenerAdded = cachedPendingRead.addListener(callback, ctx, 
key.startEntry, key.endEntry);
+
+                if (createdByThisThread.get()) {
+                    CompletableFuture<List<EntryImpl>> readResult = 
lh.readAsync(firstEntry, lastEntry)
+                            .thenApply(
+                                    ledgerEntries -> {
+                                        requireNonNull(ml.getName());
+                                        requireNonNull(ml.getExecutor());
+
+                                        try {
+                                            // We got the entries, we need to 
transform them to a List<> type
+                                            long totalSize = 0;
+                                            final List<EntryImpl> 
entriesToReturn =
+                                                    
Lists.newArrayListWithExpectedSize(entriesToRead);
+                                            for (LedgerEntry e : 
ledgerEntries) {
+                                                EntryImpl entry = 
RangeEntryCacheManagerImpl.create(e, interceptor);
+                                                entriesToReturn.add(entry);
+                                                totalSize += entry.getLength();
+                                                if (shouldCacheEntry) {
+                                                    EntryImpl cacheEntry = 
EntryImpl.create(entry);
+                                                    insert(cacheEntry);
+                                                    cacheEntry.release();
+                                                }
+                                            }
+
+                                            
manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
+                                            
ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
+
+                                            return entriesToReturn;
+                                        } finally {
+                                            ledgerEntries.close();
+                                        }
+                                    });
+                    // handle LH invalidation
+                    readResult.exceptionally(exception -> {
                         if (exception instanceof BKException
                                 && ((BKException) exception).getCode() == 
BKException.Code.TooManyRequestsException) {
-                            
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
                         } else {
                             ml.invalidateLedgerHandle(lh);
-                            ManagedLedgerException mlException = 
createManagedLedgerException(exception);
-                            callback.readEntriesFailed(mlException, ctx);
                         }
                         return null;
-            });
+                    });
+
+                    cachedPendingRead.attach(readResult);
+                }
+            }
         }
     }
 
     @Override
     public void clear() {
         Pair<Integer, Long> removedPair = entries.clear();
         manager.entriesRemoved(removedPair.getRight(), removedPair.getLeft());
+        cachedPendingReads.clear();

Review Comment:
   > We could remove the entry for a ledger in case of rollover.
   but I thought it requires more coordination.
   I will try to improve this
   
   Yes, we are on the same page.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -256,6 +276,130 @@ public void asyncReadEntry(ReadHandle lh, long 
firstEntry, long lastEntry, boole
         }
     }
 
+    @AllArgsConstructor
+    private static final class ReadEntriesCallbackWithContext {
+        final ReadEntriesCallback callback;
+        final Object ctx;
+        final long startEntry;
+        final long endEntry;
+    }
+
+    private CachedPendingRead findBestCandidate(PendingReadKey key, 
Map<PendingReadKey, CachedPendingRead> ledgerCache,
+                                                AtomicBoolean created) {
+        synchronized (ledgerCache) {

Review Comment:
   Oh, sorry. I read it wrong, I thought it was equal to synchronized(this).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to