michaeljmarshall commented on code in PR #17953:
URL: https://github.com/apache/pulsar/pull/17953#discussion_r995980167


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java:
##########
@@ -437,6 +458,60 @@ public void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
         }
     }
 
+    private AsyncCallbacks.ReadEntriesCallback 
handlePendingReadsLimits(ReadHandle lh,
+                                                                long 
firstEntry, long lastEntry,
+                                                                boolean 
shouldCacheEntry,
+                                                                
AsyncCallbacks.ReadEntriesCallback originalCallback,
+                                                                Object ctx, 
PendingReadsLimiter.Handle handle) {
+        if (pendingReadsLimiter.isDisabled()) {
+            return originalCallback;
+        }
+        long estimatedReadSize = (1 + lastEntry - firstEntry)
+                * (estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+        final AsyncCallbacks.ReadEntriesCallback callback;
+        PendingReadsLimiter.Handle newHandle = 
pendingReadsLimiter.acquire(estimatedReadSize, handle);
+        if (!newHandle.success) {
+            long now = System.currentTimeMillis();
+            if (now - newHandle.creationTime > readEntryTimeoutMillis) {
+                String message = "Time-out elapsed reading from ledger "
+                        + lh.getId()
+                        + ", " + rangeEntryCache.getName()
+                        + ", estimated read size " + estimatedReadSize + " 
bytes"
+                        + " for " + (1 + lastEntry - firstEntry) + " entries";
+                log.error(message);
+                pendingReadsLimiter.release(newHandle);
+                originalCallback.readEntriesFailed(
+                        new 
ManagedLedgerException.TooManyRequestsException(message), ctx);
+                return null;
+            }
+            this.rangeEntryCache.ml.getExecutor().submitOrdered(lh.getId(), () 
-> {
+                readEntriesInternal(lh, firstEntry, lastEntry, 
shouldCacheEntry,
+                        originalCallback, ctx, newHandle);
+                return null;
+            });
+            return null;

Review Comment:
   To clarify, your concern about fairness is not to have a global FIFO, but is 
to ensure that the QoS for independent subscriptions is fair, right? A global 
queue would be fair in that it would be FIFO, but could risk becoming a 
performance bottleneck. However, we'd only ever queue pending reads when we're 
over the memory threshold, so the performance is already going to be degraded.
   
   One potential option for ensuring fairness is to put that on the rate 
limiting solution.
   
   > With the current mechanism we are scheduling the read on the Thread that 
is pinned to the ManagerLedger, and this naturally adds some kind of back 
pressure depending on the demands of reads of the ML.
   
   One potential issue for this solution is that the managed ledger thread is 
already responsible for a lot. This scheduled task has no back off, so it will 
run very frequently and could become expensive if there is contention on the 
PendingReadsLimiter lock.
   
   > With this approach basically each ML will have its own list of waiting 
pending reads (the internal queue of the pinned executor).
   
   It seems to me the current solution has a race where each ML is competing to 
get the memory permits. The race does not prevent a single ML from getting all 
of the permits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to