eolivelli commented on code in PR #17953:
URL: https://github.com/apache/pulsar/pull/17953#discussion_r994749999


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java:
##########
@@ -437,6 +458,60 @@ public void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
         }
     }
 
+    private AsyncCallbacks.ReadEntriesCallback 
handlePendingReadsLimits(ReadHandle lh,
+                                                                long 
firstEntry, long lastEntry,
+                                                                boolean 
shouldCacheEntry,
+                                                                
AsyncCallbacks.ReadEntriesCallback originalCallback,
+                                                                Object ctx, 
PendingReadsLimiter.Handle handle) {
+        if (pendingReadsLimiter.isDisabled()) {
+            return originalCallback;
+        }
+        long estimatedReadSize = (1 + lastEntry - firstEntry)
+                * (estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+        final AsyncCallbacks.ReadEntriesCallback callback;
+        PendingReadsLimiter.Handle newHandle = 
pendingReadsLimiter.acquire(estimatedReadSize, handle);
+        if (!newHandle.success) {
+            long now = System.currentTimeMillis();
+            if (now - newHandle.creationTime > readEntryTimeoutMillis) {
+                String message = "Time-out elapsed reading from ledger "
+                        + lh.getId()
+                        + ", " + rangeEntryCache.getName()
+                        + ", estimated read size " + estimatedReadSize + " 
bytes"
+                        + " for " + (1 + lastEntry - firstEntry) + " entries";
+                log.error(message);
+                pendingReadsLimiter.release(newHandle);
+                originalCallback.readEntriesFailed(
+                        new 
ManagedLedgerException.TooManyRequestsException(message), ctx);
+                return null;
+            }
+            this.rangeEntryCache.ml.getExecutor().submitOrdered(lh.getId(), () 
-> {
+                readEntriesInternal(lh, firstEntry, lastEntry, 
shouldCacheEntry,
+                        originalCallback, ctx, newHandle);
+                return null;
+            });
+            return null;

Review Comment:
   > Can we make this reactive by queuing cursors that have requested more 
entries and then feed those cursors as memory gets freed? 
   
   
   I have thought about creating some kind of "async rate limiter", but after 
all you would have to keep a list (or priority queue) with the reads that are 
waiting for the needed amount of permits to be available.
   
   Then the problem would be to implement some "fair" algorithm that:
   - does not let the pending reads starve
   - tried to be "fair" and serve the pending reads in FIFO order.
   
   
   With the current mechanism we are scheduling the read on the Thread that is 
pinned to the ManagerLedger, and this naturally adds some kind of back pressure 
depending on the demands of reads of the ML.
   
   In one broker all the ML are competing on the available memory and I don't 
want to keep a single global list of pending reads, as it will be really hard 
to make it "fair".
   
   With this approach basically each ML will have its own list of waiting 
pending reads (the internal queue of the pinned executor).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to