codelipenghui commented on code in PR #17241:
URL: https://github.com/apache/pulsar/pull/17241#discussion_r956101966


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -295,51 +439,70 @@ private void asyncReadEntry0(ReadHandle lh, long 
firstEntry, long lastEntry, boo
             }
 
             // Read all the entries from bookkeeper
-            lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
-                    ledgerEntries -> {
-                        requireNonNull(ml.getName());
-                        requireNonNull(ml.getExecutor());
-
-                        try {
-                            // We got the entries, we need to transform them 
to a List<> type
-                            long totalSize = 0;
-                            final List<EntryImpl> entriesToReturn = 
Lists.newArrayListWithExpectedSize(entriesToRead);
-                            for (LedgerEntry e : ledgerEntries) {
-                                EntryImpl entry = 
RangeEntryCacheManagerImpl.create(e, interceptor);
-                                entriesToReturn.add(entry);
-                                totalSize += entry.getLength();
-                                if (shouldCacheEntry) {
-                                    EntryImpl cacheEntry = 
EntryImpl.create(entry);
-                                    insert(cacheEntry);
-                                    cacheEntry.release();
-                                }
-                            }
-
-                            
manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
-                            
ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
-
-                            callback.readEntriesComplete((List) 
entriesToReturn, ctx);
-                        } finally {
-                            ledgerEntries.close();
-                        }
-                    }, 
ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
+            final PendingReadKey key = new PendingReadKey(firstEntry, 
lastEntry);
+
+            Map<PendingReadKey, CachedPendingRead> pendingReadsForLedger =
+                    cachedPendingReads.computeIfAbsent(ledgerId, (l) -> new 
ConcurrentHashMap<>());
+
+            boolean listenerAdded = false;
+            while (!listenerAdded) {
+                AtomicBoolean createdByThisThread = new AtomicBoolean();
+                CachedPendingRead cachedPendingRead = findBestCandidate(key,

Review Comment:
   I think we can only make the findBestCandidate() return null if no candidate 
and then create a new one to continue the read operation.
   
   So that we don't need `AtomicBoolean createdByThisThread = new 
AtomicBoolean();` and the code is easy to read.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -256,6 +276,130 @@ public void asyncReadEntry(ReadHandle lh, long 
firstEntry, long lastEntry, boole
         }
     }
 
+    @AllArgsConstructor
+    private static final class ReadEntriesCallbackWithContext {
+        final ReadEntriesCallback callback;
+        final Object ctx;
+        final long startEntry;
+        final long endEntry;
+    }
+
+    private CachedPendingRead findBestCandidate(PendingReadKey key, 
Map<PendingReadKey, CachedPendingRead> ledgerCache,
+                                                AtomicBoolean created) {
+        synchronized (ledgerCache) {
+            CachedPendingRead existing = ledgerCache.get(key);
+            if (existing != null) {
+                return existing;
+            }
+            for (Map.Entry<PendingReadKey, CachedPendingRead> entry : 
ledgerCache.entrySet()) {
+                if (entry.getKey().includes(key)) {
+                    return entry.getValue();
+                }
+            }
+            created.set(true);
+            CachedPendingRead newRead = new CachedPendingRead(key, 
ledgerCache);
+            ledgerCache.put(key, newRead);
+            return newRead;
+        }
+    }
+
+    private class CachedPendingRead {
+        final PendingReadKey key;
+        final Map<PendingReadKey, CachedPendingRead> ledgerCache;
+        final List<ReadEntriesCallbackWithContext> callbacks = new 
ArrayList<>(1);
+        boolean completed = false;
+
+        public CachedPendingRead(PendingReadKey key,
+                                 Map<PendingReadKey, CachedPendingRead> 
ledgerCache) {
+            this.key = key;
+            this.ledgerCache = ledgerCache;
+        }
+
+        private List<EntryImpl> keepEntries(List<EntryImpl> list, long 
startEntry, long endEntry) {
+            List<EntryImpl> result = new ArrayList<>((int) (endEntry - 
startEntry));
+            for (EntryImpl entry : list) {
+                long entryId = entry.getEntryId();
+                if (startEntry <= entryId && entryId <= endEntry) {
+                    result.add(entry);
+                } else {
+                    entry.release();
+                }
+            }
+            return result;
+        }
+
+        public void attach(CompletableFuture<List<EntryImpl>> handle) {
+            // when the future is done remove this from the map
+            // new reads will go to a new instance
+            // this is required because we are going to do refcount management
+            // on the results of the callback
+            handle.whenComplete((___, error) -> {
+                synchronized (CachedPendingRead.this) {
+                    completed = true;
+                    synchronized (ledgerCache) {
+                        ledgerCache.remove(key, this);
+                    }
+                }
+            });
+
+            handle.thenAcceptAsync(entriesToReturn -> {
+                synchronized (CachedPendingRead.this) {
+                    if (callbacks.size() == 1) {
+                        ReadEntriesCallbackWithContext first = 
callbacks.get(0);
+                        if (first.startEntry == key.startEntry
+                             && first.endEntry == key.endEntry) {
+                            // perfect match, no copy, this is the most common 
case
+                            first.callback.readEntriesComplete((List) 
entriesToReturn,
+                                            first.ctx);
+                        } else {
+                            first.callback.readEntriesComplete(
+                                    (List) keepEntries(entriesToReturn, 
first.startEntry, first.endEntry),
+                                            first.ctx);
+                        }
+                    } else {
+                        for (ReadEntriesCallbackWithContext callback : 
callbacks) {
+                            List<EntryImpl> copy = new 
ArrayList<>(entriesToReturn.size());
+                            long callbackStartEntry = callback.startEntry;
+                            long callbackEndEntry = callback.endEntry;

Review Comment:
   Looks like we can change to 
   
   long callbackStartEntry = callback.startEntry;
   long callbackEndEntry = callback.endEntry
   List<EntryImpl> copy = new ArrayList<>(callbackEndEntry - callbackStartEntry 
+ 1);
   



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -256,6 +276,130 @@ public void asyncReadEntry(ReadHandle lh, long 
firstEntry, long lastEntry, boole
         }
     }
 
+    @AllArgsConstructor
+    private static final class ReadEntriesCallbackWithContext {
+        final ReadEntriesCallback callback;
+        final Object ctx;
+        final long startEntry;
+        final long endEntry;
+    }
+
+    private CachedPendingRead findBestCandidate(PendingReadKey key, 
Map<PendingReadKey, CachedPendingRead> ledgerCache,
+                                                AtomicBoolean created) {
+        synchronized (ledgerCache) {
+            CachedPendingRead existing = ledgerCache.get(key);
+            if (existing != null) {
+                return existing;
+            }
+            for (Map.Entry<PendingReadKey, CachedPendingRead> entry : 
ledgerCache.entrySet()) {
+                if (entry.getKey().includes(key)) {
+                    return entry.getValue();
+                }
+            }
+            created.set(true);
+            CachedPendingRead newRead = new CachedPendingRead(key, 
ledgerCache);
+            ledgerCache.put(key, newRead);
+            return newRead;
+        }
+    }
+
+    private class CachedPendingRead {
+        final PendingReadKey key;
+        final Map<PendingReadKey, CachedPendingRead> ledgerCache;
+        final List<ReadEntriesCallbackWithContext> callbacks = new 
ArrayList<>(1);
+        boolean completed = false;
+
+        public CachedPendingRead(PendingReadKey key,
+                                 Map<PendingReadKey, CachedPendingRead> 
ledgerCache) {
+            this.key = key;
+            this.ledgerCache = ledgerCache;
+        }
+
+        private List<EntryImpl> keepEntries(List<EntryImpl> list, long 
startEntry, long endEntry) {
+            List<EntryImpl> result = new ArrayList<>((int) (endEntry - 
startEntry));
+            for (EntryImpl entry : list) {
+                long entryId = entry.getEntryId();
+                if (startEntry <= entryId && entryId <= endEntry) {
+                    result.add(entry);
+                } else {
+                    entry.release();
+                }
+            }
+            return result;
+        }
+
+        public void attach(CompletableFuture<List<EntryImpl>> handle) {

Review Comment:
   Do we need to remove the `CachedPendingRead` from the `cachedPendingReads`?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -256,6 +276,130 @@ public void asyncReadEntry(ReadHandle lh, long 
firstEntry, long lastEntry, boole
         }
     }
 
+    @AllArgsConstructor
+    private static final class ReadEntriesCallbackWithContext {
+        final ReadEntriesCallback callback;
+        final Object ctx;
+        final long startEntry;
+        final long endEntry;
+    }
+
+    private CachedPendingRead findBestCandidate(PendingReadKey key, 
Map<PendingReadKey, CachedPendingRead> ledgerCache,
+                                                AtomicBoolean created) {
+        synchronized (ledgerCache) {

Review Comment:
   We can move the synchronized to method



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -256,6 +276,130 @@ public void asyncReadEntry(ReadHandle lh, long 
firstEntry, long lastEntry, boole
         }
     }
 
+    @AllArgsConstructor
+    private static final class ReadEntriesCallbackWithContext {
+        final ReadEntriesCallback callback;
+        final Object ctx;
+        final long startEntry;
+        final long endEntry;
+    }
+
+    private CachedPendingRead findBestCandidate(PendingReadKey key, 
Map<PendingReadKey, CachedPendingRead> ledgerCache,
+                                                AtomicBoolean created) {
+        synchronized (ledgerCache) {
+            CachedPendingRead existing = ledgerCache.get(key);
+            if (existing != null) {
+                return existing;
+            }
+            for (Map.Entry<PendingReadKey, CachedPendingRead> entry : 
ledgerCache.entrySet()) {
+                if (entry.getKey().includes(key)) {
+                    return entry.getValue();
+                }
+            }
+            created.set(true);
+            CachedPendingRead newRead = new CachedPendingRead(key, 
ledgerCache);
+            ledgerCache.put(key, newRead);
+            return newRead;
+        }
+    }
+
+    private class CachedPendingRead {
+        final PendingReadKey key;
+        final Map<PendingReadKey, CachedPendingRead> ledgerCache;
+        final List<ReadEntriesCallbackWithContext> callbacks = new 
ArrayList<>(1);
+        boolean completed = false;
+
+        public CachedPendingRead(PendingReadKey key,
+                                 Map<PendingReadKey, CachedPendingRead> 
ledgerCache) {
+            this.key = key;
+            this.ledgerCache = ledgerCache;
+        }
+
+        private List<EntryImpl> keepEntries(List<EntryImpl> list, long 
startEntry, long endEntry) {
+            List<EntryImpl> result = new ArrayList<>((int) (endEntry - 
startEntry));
+            for (EntryImpl entry : list) {
+                long entryId = entry.getEntryId();
+                if (startEntry <= entryId && entryId <= endEntry) {
+                    result.add(entry);
+                } else {
+                    entry.release();
+                }
+            }
+            return result;
+        }
+
+        public void attach(CompletableFuture<List<EntryImpl>> handle) {
+            // when the future is done remove this from the map
+            // new reads will go to a new instance
+            // this is required because we are going to do refcount management
+            // on the results of the callback
+            handle.whenComplete((___, error) -> {
+                synchronized (CachedPendingRead.this) {
+                    completed = true;
+                    synchronized (ledgerCache) {
+                        ledgerCache.remove(key, this);
+                    }
+                }
+            });
+
+            handle.thenAcceptAsync(entriesToReturn -> {
+                synchronized (CachedPendingRead.this) {
+                    if (callbacks.size() == 1) {
+                        ReadEntriesCallbackWithContext first = 
callbacks.get(0);
+                        if (first.startEntry == key.startEntry
+                             && first.endEntry == key.endEntry) {
+                            // perfect match, no copy, this is the most common 
case
+                            first.callback.readEntriesComplete((List) 
entriesToReturn,
+                                            first.ctx);
+                        } else {
+                            first.callback.readEntriesComplete(
+                                    (List) keepEntries(entriesToReturn, 
first.startEntry, first.endEntry),
+                                            first.ctx);
+                        }
+                    } else {
+                        for (ReadEntriesCallbackWithContext callback : 
callbacks) {
+                            List<EntryImpl> copy = new 
ArrayList<>(entriesToReturn.size());
+                            long callbackStartEntry = callback.startEntry;
+                            long callbackEndEntry = callback.endEntry;
+                            for (EntryImpl entry : entriesToReturn) {
+                                long entryId = entry.getEntryId();
+                                if (callbackStartEntry <= entryId && entryId 
<= callbackEndEntry) {
+                                    EntryImpl entryCopy = 
EntryImpl.create(entry);
+                                    copy.add(entryCopy);
+                                }
+                            }
+                            callback.callback.readEntriesComplete((List) copy, 
callback.ctx);
+                        }
+                        for (EntryImpl entry : entriesToReturn) {
+                            entry.release();
+                        }
+                    }
+                }
+            }, 
ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
+                synchronized (CachedPendingRead.this) {
+                    for (ReadEntriesCallbackWithContext callback : callbacks) {
+                        if (exception instanceof BKException

Review Comment:
   We should unwrap the exception



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -295,51 +439,70 @@ private void asyncReadEntry0(ReadHandle lh, long 
firstEntry, long lastEntry, boo
             }
 
             // Read all the entries from bookkeeper
-            lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
-                    ledgerEntries -> {
-                        requireNonNull(ml.getName());
-                        requireNonNull(ml.getExecutor());
-
-                        try {
-                            // We got the entries, we need to transform them 
to a List<> type
-                            long totalSize = 0;
-                            final List<EntryImpl> entriesToReturn = 
Lists.newArrayListWithExpectedSize(entriesToRead);
-                            for (LedgerEntry e : ledgerEntries) {
-                                EntryImpl entry = 
RangeEntryCacheManagerImpl.create(e, interceptor);
-                                entriesToReturn.add(entry);
-                                totalSize += entry.getLength();
-                                if (shouldCacheEntry) {
-                                    EntryImpl cacheEntry = 
EntryImpl.create(entry);
-                                    insert(cacheEntry);
-                                    cacheEntry.release();
-                                }
-                            }
-
-                            
manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
-                            
ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
-
-                            callback.readEntriesComplete((List) 
entriesToReturn, ctx);
-                        } finally {
-                            ledgerEntries.close();
-                        }
-                    }, 
ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
+            final PendingReadKey key = new PendingReadKey(firstEntry, 
lastEntry);
+
+            Map<PendingReadKey, CachedPendingRead> pendingReadsForLedger =
+                    cachedPendingReads.computeIfAbsent(ledgerId, (l) -> new 
ConcurrentHashMap<>());
+
+            boolean listenerAdded = false;
+            while (!listenerAdded) {
+                AtomicBoolean createdByThisThread = new AtomicBoolean();
+                CachedPendingRead cachedPendingRead = findBestCandidate(key,
+                        pendingReadsForLedger, createdByThisThread);
+                listenerAdded = cachedPendingRead.addListener(callback, ctx, 
key.startEntry, key.endEntry);
+
+                if (createdByThisThread.get()) {
+                    CompletableFuture<List<EntryImpl>> readResult = 
lh.readAsync(firstEntry, lastEntry)
+                            .thenApply(
+                                    ledgerEntries -> {
+                                        requireNonNull(ml.getName());
+                                        requireNonNull(ml.getExecutor());
+
+                                        try {
+                                            // We got the entries, we need to 
transform them to a List<> type
+                                            long totalSize = 0;
+                                            final List<EntryImpl> 
entriesToReturn =
+                                                    
Lists.newArrayListWithExpectedSize(entriesToRead);
+                                            for (LedgerEntry e : 
ledgerEntries) {
+                                                EntryImpl entry = 
RangeEntryCacheManagerImpl.create(e, interceptor);
+                                                entriesToReturn.add(entry);
+                                                totalSize += entry.getLength();
+                                                if (shouldCacheEntry) {
+                                                    EntryImpl cacheEntry = 
EntryImpl.create(entry);
+                                                    insert(cacheEntry);
+                                                    cacheEntry.release();
+                                                }
+                                            }
+
+                                            
manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
+                                            
ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
+
+                                            return entriesToReturn;
+                                        } finally {
+                                            ledgerEntries.close();
+                                        }
+                                    });
+                    // handle LH invalidation
+                    readResult.exceptionally(exception -> {
                         if (exception instanceof BKException
                                 && ((BKException) exception).getCode() == 
BKException.Code.TooManyRequestsException) {
-                            
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
                         } else {
                             ml.invalidateLedgerHandle(lh);
-                            ManagedLedgerException mlException = 
createManagedLedgerException(exception);
-                            callback.readEntriesFailed(mlException, ctx);
                         }
                         return null;
-            });
+                    });
+
+                    cachedPendingRead.attach(readResult);
+                }
+            }
         }
     }
 
     @Override
     public void clear() {
         Pair<Integer, Long> removedPair = entries.clear();
         manager.entriesRemoved(removedPair.getRight(), removedPair.getLeft());
+        cachedPendingReads.clear();

Review Comment:
   I noticed the element of the `cachedPendingReads` only be removed here. This 
one will be called if all the cached data should be clean up
   
   It's better to add a unit test for this one to make sure we will not 
introduce any heap memory leak.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -256,6 +276,130 @@ public void asyncReadEntry(ReadHandle lh, long 
firstEntry, long lastEntry, boole
         }
     }
 
+    @AllArgsConstructor
+    private static final class ReadEntriesCallbackWithContext {
+        final ReadEntriesCallback callback;
+        final Object ctx;
+        final long startEntry;
+        final long endEntry;
+    }
+
+    private CachedPendingRead findBestCandidate(PendingReadKey key, 
Map<PendingReadKey, CachedPendingRead> ledgerCache,
+                                                AtomicBoolean created) {
+        synchronized (ledgerCache) {
+            CachedPendingRead existing = ledgerCache.get(key);
+            if (existing != null) {
+                return existing;
+            }
+            for (Map.Entry<PendingReadKey, CachedPendingRead> entry : 
ledgerCache.entrySet()) {
+                if (entry.getKey().includes(key)) {
+                    return entry.getValue();
+                }
+            }
+            created.set(true);
+            CachedPendingRead newRead = new CachedPendingRead(key, 
ledgerCache);
+            ledgerCache.put(key, newRead);
+            return newRead;
+        }
+    }
+
+    private class CachedPendingRead {
+        final PendingReadKey key;
+        final Map<PendingReadKey, CachedPendingRead> ledgerCache;
+        final List<ReadEntriesCallbackWithContext> callbacks = new 
ArrayList<>(1);
+        boolean completed = false;
+
+        public CachedPendingRead(PendingReadKey key,
+                                 Map<PendingReadKey, CachedPendingRead> 
ledgerCache) {
+            this.key = key;
+            this.ledgerCache = ledgerCache;
+        }
+
+        private List<EntryImpl> keepEntries(List<EntryImpl> list, long 
startEntry, long endEntry) {
+            List<EntryImpl> result = new ArrayList<>((int) (endEntry - 
startEntry));
+            for (EntryImpl entry : list) {
+                long entryId = entry.getEntryId();
+                if (startEntry <= entryId && entryId <= endEntry) {
+                    result.add(entry);
+                } else {
+                    entry.release();
+                }
+            }
+            return result;
+        }
+
+        public void attach(CompletableFuture<List<EntryImpl>> handle) {
+            // when the future is done remove this from the map
+            // new reads will go to a new instance
+            // this is required because we are going to do refcount management
+            // on the results of the callback
+            handle.whenComplete((___, error) -> {
+                synchronized (CachedPendingRead.this) {
+                    completed = true;
+                    synchronized (ledgerCache) {
+                        ledgerCache.remove(key, this);
+                    }
+                }
+            });
+
+            handle.thenAcceptAsync(entriesToReturn -> {
+                synchronized (CachedPendingRead.this) {
+                    if (callbacks.size() == 1) {
+                        ReadEntriesCallbackWithContext first = 
callbacks.get(0);
+                        if (first.startEntry == key.startEntry
+                             && first.endEntry == key.endEntry) {
+                            // perfect match, no copy, this is the most common 
case
+                            first.callback.readEntriesComplete((List) 
entriesToReturn,
+                                            first.ctx);
+                        } else {
+                            first.callback.readEntriesComplete(
+                                    (List) keepEntries(entriesToReturn, 
first.startEntry, first.endEntry),
+                                            first.ctx);
+                        }
+                    } else {
+                        for (ReadEntriesCallbackWithContext callback : 
callbacks) {
+                            List<EntryImpl> copy = new 
ArrayList<>(entriesToReturn.size());
+                            long callbackStartEntry = callback.startEntry;
+                            long callbackEndEntry = callback.endEntry;
+                            for (EntryImpl entry : entriesToReturn) {
+                                long entryId = entry.getEntryId();
+                                if (callbackStartEntry <= entryId && entryId 
<= callbackEndEntry) {
+                                    EntryImpl entryCopy = 
EntryImpl.create(entry);
+                                    copy.add(entryCopy);
+                                }
+                            }
+                            callback.callback.readEntriesComplete((List) copy, 
callback.ctx);
+                        }
+                        for (EntryImpl entry : entriesToReturn) {
+                            entry.release();
+                        }
+                    }
+                }
+            }, 
ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
+                synchronized (CachedPendingRead.this) {
+                    for (ReadEntriesCallbackWithContext callback : callbacks) {
+                        if (exception instanceof BKException

Review Comment:
   And looks like we can just use `createManagedLedgerException` directly 
without the `if` check here.
   
   ```java
       public static ManagedLedgerException 
createManagedLedgerException(Throwable t) {
           if (t instanceof org.apache.bookkeeper.client.api.BKException) {
               return 
createManagedLedgerException(((org.apache.bookkeeper.client.api.BKException) 
t).getCode());
           } else if (t instanceof CompletionException
                   && !(t.getCause() instanceof CompletionException) /* check 
to avoid stackoverlflow */) {
               return createManagedLedgerException(t.getCause());
           } else {
               log.error("Unknown exception for ManagedLedgerException.", t);
               return new ManagedLedgerException("Other exception", t);
           }
       }
   ```
   It already handled the `CompletionException` and `TooManyRequestsException`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to