lhotari commented on code in PR #23901:
URL: https://github.com/apache/pulsar/pull/23901#discussion_r1933491175


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -295,38 +274,125 @@ public void asyncReadEntry(ReadHandle lh, long 
firstEntry, long lastEntry, boole
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
     void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, 
boolean shouldCacheEntry,
-            final ReadEntriesCallback callback, Object ctx, boolean 
withLimits) {
-        asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, 
callback, ctx, null, withLimits);
+            final ReadEntriesCallback callback, Object ctx, boolean 
acquirePermits) {
+        final long ledgerId = lh.getId();
+        final int numberOfEntries = (int) (lastEntry - firstEntry) + 1;
+        final Position firstPosition = PositionFactory.create(ledgerId, 
firstEntry);
+        final Position lastPosition = PositionFactory.create(ledgerId, 
lastEntry);
+        asyncReadEntriesByPosition(lh, firstPosition, lastPosition, 
numberOfEntries, shouldCacheEntry, callback, ctx,
+                acquirePermits);
     }
 
-    void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long 
lastEntry, boolean shouldCacheEntry,
-        final ReadEntriesCallback originalCallback, Object ctx, 
InflightReadsLimiter.Handle handle,
-                                   boolean withLimits) {
-        AsyncCallbacks.ReadEntriesCallback callback;
-        if (withLimits) {
-            callback = handlePendingReadsLimits(lh, firstEntry, lastEntry, 
shouldCacheEntry, originalCallback, ctx,
-                    handle);
+    void asyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, 
Position lastPosition, int numberOfEntries,
+                                    boolean shouldCacheEntry, final 
ReadEntriesCallback originalCallback,
+                                    Object ctx, boolean acquirePermits) {
+        checkArgument(firstPosition.getLedgerId() == 
lastPosition.getLedgerId(),
+                "Invalid range. Entries %s and %s should be in the same 
ledger.",
+                firstPosition, lastPosition);
+        checkArgument(firstPosition.getLedgerId() == lh.getId(),
+                "Invalid ReadHandle. The ledger %s of the range positions 
should match the handle's ledger %s.",
+                firstPosition.getLedgerId(), lh.getId());
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Reading {} entries in range {} to {}", 
ml.getName(), numberOfEntries, firstPosition,
+                    lastPosition);
+        }
+
+        InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter();
+        if (!acquirePermits || pendingReadsLimiter.isDisabled()) {
+            doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, 
numberOfEntries, shouldCacheEntry,
+                    originalCallback, ctx);
         } else {
-            callback = originalCallback;
+            long estimatedEntrySize = getEstimatedEntrySize();
+            long estimatedReadSize = numberOfEntries * estimatedEntrySize;
+            if (log.isDebugEnabled()) {
+                log.debug("Estimated read size: {} bytes for {} entries with 
{} estimated entry size",
+                        estimatedReadSize,
+                        numberOfEntries, estimatedEntrySize);
+            }
+            Optional<InflightReadsLimiter.Handle> optionalHandle =
+                    pendingReadsLimiter.acquire(estimatedReadSize, handle -> {
+                        // permits were not immediately available, callback 
will be executed when permits are acquired
+                        // or timeout
+                        ml.getExecutor().execute(() -> {
+                            doAsyncReadEntriesWithAcquiredPermits(lh, 
firstPosition, lastPosition, numberOfEntries,
+                                    shouldCacheEntry, originalCallback, ctx, 
handle, estimatedReadSize);
+                        });
+                    });
+            // permits were immediately available and acquired
+            if (optionalHandle.isPresent()) {
+                doAsyncReadEntriesWithAcquiredPermits(lh, firstPosition, 
lastPosition, numberOfEntries,
+                        shouldCacheEntry, originalCallback, ctx, 
optionalHandle.get(), estimatedReadSize);
+            }
         }
-        if (callback == null) {
+    }
+
+    void doAsyncReadEntriesWithAcquiredPermits(ReadHandle lh, Position 
firstPosition, Position lastPosition,
+                                               int numberOfEntries, boolean 
shouldCacheEntry,
+                                               final ReadEntriesCallback 
originalCallback, Object ctx,
+                                               InflightReadsLimiter.Handle 
handle, long estimatedReadSize) {
+        if (!handle.success) {
+            String message = "Couldn't acquire enough permits "

Review Comment:
   Yes, that makes it cleaner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to