eolivelli commented on code in PR #17953:
URL: https://github.com/apache/pulsar/pull/17953#discussion_r994749999
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java:
##########
@@ -437,6 +458,60 @@ public void readEntriesFailed(ManagedLedgerException
exception, Object ctx) {
}
}
+ private AsyncCallbacks.ReadEntriesCallback
handlePendingReadsLimits(ReadHandle lh,
+ long
firstEntry, long lastEntry,
+ boolean
shouldCacheEntry,
+
AsyncCallbacks.ReadEntriesCallback originalCallback,
+ Object ctx,
PendingReadsLimiter.Handle handle) {
+ if (pendingReadsLimiter.isDisabled()) {
+ return originalCallback;
+ }
+ long estimatedReadSize = (1 + lastEntry - firstEntry)
+ * (estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+ final AsyncCallbacks.ReadEntriesCallback callback;
+ PendingReadsLimiter.Handle newHandle =
pendingReadsLimiter.acquire(estimatedReadSize, handle);
+ if (!newHandle.success) {
+ long now = System.currentTimeMillis();
+ if (now - newHandle.creationTime > readEntryTimeoutMillis) {
+ String message = "Time-out elapsed reading from ledger "
+ + lh.getId()
+ + ", " + rangeEntryCache.getName()
+ + ", estimated read size " + estimatedReadSize + "
bytes"
+ + " for " + (1 + lastEntry - firstEntry) + " entries";
+ log.error(message);
+ pendingReadsLimiter.release(newHandle);
+ originalCallback.readEntriesFailed(
+ new
ManagedLedgerException.TooManyRequestsException(message), ctx);
+ return null;
+ }
+ this.rangeEntryCache.ml.getExecutor().submitOrdered(lh.getId(), ()
-> {
+ readEntriesInternal(lh, firstEntry, lastEntry,
shouldCacheEntry,
+ originalCallback, ctx, newHandle);
+ return null;
+ });
+ return null;
Review Comment:
> Can we make this reactive by queuing cursors that have requested more
entries and then feed those cursors as memory gets freed?
I have thought about creating some kind of "async rate limiter", but after
all you would have to keep a list (or priority queue) with the reads that are
waiting for the needed amount of permits to be available.
Then the problem would be to implement some "fair" algorithm that:
- does not let the pending reads starve
- tried to be "fair" and serve the pending reads in FIFO order.
With the current mechanism we are scheduling the read on the Thread that is
pinned to the ManagerLedger, and this naturally adds some kind of back pressure
depending on the demands of reads of the ML.
In one broker all the ML are competing on the available memory and I don't
want to keep a single global list of pending reads, as it will be really hard
to make it "fair".
With this approach basically each ML will have its own list of waiting
pending reads (the internal queue of the pinned executor).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]