merlimat commented on code in PR #23901:
URL: https://github.com/apache/pulsar/pull/23901#discussion_r1933072176


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -337,89 +403,36 @@ void asyncReadEntry0WithLimits(ReadHandle lh, long 
firstEntry, long lastEntry, b
 
             manager.mlFactoryMBean.recordCacheHits(entriesToReturn.size(), 
totalCachedSize);
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Ledger {} -- Found in cache entries: {}-{}", 
ml.getName(), ledgerId, firstEntry,
-                        lastEntry);
+                log.debug("[{}] Cache hit for {} entries in range {} to {}", 
ml.getName(), numberOfEntries,
+                        firstPosition, lastPosition);
             }
 
-            callback.readEntriesComplete((List) entriesToReturn, ctx);
+            callback.readEntriesComplete(entriesToReturn, ctx);
 
         } else {
             if (!cachedEntries.isEmpty()) {
                 cachedEntries.forEach(entry -> entry.release());
             }
 
             // Read all the entries from bookkeeper
-            pendingReadsManager.readEntries(lh, firstEntry, lastEntry,
+            pendingReadsManager.readEntries(lh, firstPosition.getEntryId(), 
lastPosition.getEntryId(),
                     shouldCacheEntry, callback, ctx);
-
         }
     }
 
-    private AsyncCallbacks.ReadEntriesCallback 
handlePendingReadsLimits(ReadHandle lh,
-                                                                long 
firstEntry, long lastEntry,
-                                                                boolean 
shouldCacheEntry,
-                                                                
AsyncCallbacks.ReadEntriesCallback originalCallback,
-                                                                Object ctx, 
InflightReadsLimiter.Handle handle) {
-        InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter();
-        if (pendingReadsLimiter.isDisabled()) {
-            return originalCallback;
+    @VisibleForTesting
+    public long getEstimatedEntrySize() {
+        long estimatedEntrySize = getAvgEntrySize();
+        if (estimatedEntrySize == 0) {
+            estimatedEntrySize = DEFAULT_ESTIMATED_ENTRY_SIZE;
         }
-        long estimatedReadSize = (1 + lastEntry - firstEntry)
-                * (estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
-        final AsyncCallbacks.ReadEntriesCallback callback;
-        InflightReadsLimiter.Handle newHandle = 
pendingReadsLimiter.acquire(estimatedReadSize, handle);
-        if (!newHandle.success) {
-            long now = System.currentTimeMillis();
-            if (now - newHandle.creationTime > readEntryTimeoutMillis) {
-                String message = "Time-out elapsed while acquiring enough 
permits "
-                        + "on the memory limiter to read from ledger "
-                        + lh.getId()
-                        + ", " + getName()
-                        + ", estimated read size " + estimatedReadSize + " 
bytes"
-                        + " for " + (1 + lastEntry - firstEntry)
-                        + " entries (check 
managedLedgerMaxReadsInFlightSizeInMB)";
-                log.error(message);
-                pendingReadsLimiter.release(newHandle);
-                originalCallback.readEntriesFailed(
-                        new 
ManagedLedgerException.TooManyRequestsException(message), ctx);
-                return null;
-            }
-            ml.getExecutor().execute(() -> {
-                asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, 
shouldCacheEntry,
-                        originalCallback, ctx, newHandle, true);
-            });
-            return null;
-        } else {
-            callback = new AsyncCallbacks.ReadEntriesCallback() {
-
-                @Override
-                public void readEntriesComplete(List<Entry> entries, Object 
ctx) {
-                    if (!entries.isEmpty()) {
-                        long size = entries.get(0).getLength();
-                        estimatedEntrySize = size;
-
-                        AtomicInteger remainingCount = new 
AtomicInteger(entries.size());
-                        for (Entry entry : entries) {
-                            ((EntryImpl) entry).onDeallocate(() -> {
-                                if (remainingCount.decrementAndGet() <= 0) {
-                                    pendingReadsLimiter.release(newHandle);
-                                }
-                            });
-                        }
-                    } else {
-                        pendingReadsLimiter.release(newHandle);
-                    }
-                    originalCallback.readEntriesComplete(entries, ctx);
-                }
+        return estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+    }
 
-                @Override
-                public void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
-                    pendingReadsLimiter.release(newHandle);
-                    originalCallback.readEntriesFailed(exception, ctx);
-                }
-            };
-        }
-        return callback;
+    private long getAvgEntrySize() {

Review Comment:
   Is this going to get called frequently? Using the `LongAdder.sum()` might 
have a perf impact



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -295,38 +274,125 @@ public void asyncReadEntry(ReadHandle lh, long 
firstEntry, long lastEntry, boole
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
     void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, 
boolean shouldCacheEntry,
-            final ReadEntriesCallback callback, Object ctx, boolean 
withLimits) {
-        asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, 
callback, ctx, null, withLimits);
+            final ReadEntriesCallback callback, Object ctx, boolean 
acquirePermits) {
+        final long ledgerId = lh.getId();
+        final int numberOfEntries = (int) (lastEntry - firstEntry) + 1;
+        final Position firstPosition = PositionFactory.create(ledgerId, 
firstEntry);
+        final Position lastPosition = PositionFactory.create(ledgerId, 
lastEntry);
+        asyncReadEntriesByPosition(lh, firstPosition, lastPosition, 
numberOfEntries, shouldCacheEntry, callback, ctx,
+                acquirePermits);
     }
 
-    void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long 
lastEntry, boolean shouldCacheEntry,
-        final ReadEntriesCallback originalCallback, Object ctx, 
InflightReadsLimiter.Handle handle,
-                                   boolean withLimits) {
-        AsyncCallbacks.ReadEntriesCallback callback;
-        if (withLimits) {
-            callback = handlePendingReadsLimits(lh, firstEntry, lastEntry, 
shouldCacheEntry, originalCallback, ctx,
-                    handle);
+    void asyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, 
Position lastPosition, int numberOfEntries,
+                                    boolean shouldCacheEntry, final 
ReadEntriesCallback originalCallback,
+                                    Object ctx, boolean acquirePermits) {
+        checkArgument(firstPosition.getLedgerId() == 
lastPosition.getLedgerId(),
+                "Invalid range. Entries %s and %s should be in the same 
ledger.",
+                firstPosition, lastPosition);
+        checkArgument(firstPosition.getLedgerId() == lh.getId(),
+                "Invalid ReadHandle. The ledger %s of the range positions 
should match the handle's ledger %s.",
+                firstPosition.getLedgerId(), lh.getId());
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Reading {} entries in range {} to {}", 
ml.getName(), numberOfEntries, firstPosition,
+                    lastPosition);
+        }
+
+        InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter();
+        if (!acquirePermits || pendingReadsLimiter.isDisabled()) {
+            doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, 
numberOfEntries, shouldCacheEntry,
+                    originalCallback, ctx);
         } else {
-            callback = originalCallback;
+            long estimatedEntrySize = getEstimatedEntrySize();
+            long estimatedReadSize = numberOfEntries * estimatedEntrySize;
+            if (log.isDebugEnabled()) {
+                log.debug("Estimated read size: {} bytes for {} entries with 
{} estimated entry size",
+                        estimatedReadSize,
+                        numberOfEntries, estimatedEntrySize);
+            }
+            Optional<InflightReadsLimiter.Handle> optionalHandle =
+                    pendingReadsLimiter.acquire(estimatedReadSize, handle -> {
+                        // permits were not immediately available, callback 
will be executed when permits are acquired
+                        // or timeout
+                        ml.getExecutor().execute(() -> {
+                            doAsyncReadEntriesWithAcquiredPermits(lh, 
firstPosition, lastPosition, numberOfEntries,
+                                    shouldCacheEntry, originalCallback, ctx, 
handle, estimatedReadSize);
+                        });
+                    });
+            // permits were immediately available and acquired
+            if (optionalHandle.isPresent()) {
+                doAsyncReadEntriesWithAcquiredPermits(lh, firstPosition, 
lastPosition, numberOfEntries,
+                        shouldCacheEntry, originalCallback, ctx, 
optionalHandle.get(), estimatedReadSize);
+            }
         }
-        if (callback == null) {
+    }
+
+    void doAsyncReadEntriesWithAcquiredPermits(ReadHandle lh, Position 
firstPosition, Position lastPosition,
+                                               int numberOfEntries, boolean 
shouldCacheEntry,
+                                               final ReadEntriesCallback 
originalCallback, Object ctx,
+                                               InflightReadsLimiter.Handle 
handle, long estimatedReadSize) {
+        if (!handle.success) {
+            String message = "Couldn't acquire enough permits "

Review Comment:
   nit: maybe use `String.format()` ? 
   



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java:
##########
@@ -232,8 +232,66 @@ public PendingRead(PendingReadKey key,
             this.ledgerCache = ledgerCache;
         }
 
-        private List<EntryImpl> keepEntries(List<EntryImpl> list, long 
startEntry, long endEntry) {
-            List<EntryImpl> result = new ArrayList<>((int) (endEntry - 
startEntry));
+        public void attach(CompletableFuture<List<EntryImpl>> handle) {
+            handle.whenComplete((entriesToReturn, error) -> {
+                // execute in the completing thread
+                completeAndRemoveFromCache();
+                // execute the callbacks in the managed ledger executor
+                rangeEntryCache.getManagedLedger().getExecutor().execute(() -> 
{
+                    if (error != null) {
+                        readEntriesFailed(error);
+                    } else {
+                        readEntriesComplete(entriesToReturn);
+                    }
+                });
+            });
+        }
+
+        private synchronized void completeAndRemoveFromCache() {
+            completed = true;
+            // When the read has completed, remove the instance from the 
ledgerCache map
+            // so that new reads will go to a new instance.
+            // this is required because we are going to do refcount management
+            // on the results of the callback
+            synchronized (ledgerCache) {

Review Comment:
   Could this double-lock end up in deadlock with inverse sequence of locking?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to