michaeljmarshall commented on code in PR #17953:
URL: https://github.com/apache/pulsar/pull/17953#discussion_r995980167
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java:
##########
@@ -437,6 +458,60 @@ public void readEntriesFailed(ManagedLedgerException
exception, Object ctx) {
}
}
+ private AsyncCallbacks.ReadEntriesCallback
handlePendingReadsLimits(ReadHandle lh,
+ long
firstEntry, long lastEntry,
+ boolean
shouldCacheEntry,
+
AsyncCallbacks.ReadEntriesCallback originalCallback,
+ Object ctx,
PendingReadsLimiter.Handle handle) {
+ if (pendingReadsLimiter.isDisabled()) {
+ return originalCallback;
+ }
+ long estimatedReadSize = (1 + lastEntry - firstEntry)
+ * (estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+ final AsyncCallbacks.ReadEntriesCallback callback;
+ PendingReadsLimiter.Handle newHandle =
pendingReadsLimiter.acquire(estimatedReadSize, handle);
+ if (!newHandle.success) {
+ long now = System.currentTimeMillis();
+ if (now - newHandle.creationTime > readEntryTimeoutMillis) {
+ String message = "Time-out elapsed reading from ledger "
+ + lh.getId()
+ + ", " + rangeEntryCache.getName()
+ + ", estimated read size " + estimatedReadSize + "
bytes"
+ + " for " + (1 + lastEntry - firstEntry) + " entries";
+ log.error(message);
+ pendingReadsLimiter.release(newHandle);
+ originalCallback.readEntriesFailed(
+ new
ManagedLedgerException.TooManyRequestsException(message), ctx);
+ return null;
+ }
+ this.rangeEntryCache.ml.getExecutor().submitOrdered(lh.getId(), ()
-> {
+ readEntriesInternal(lh, firstEntry, lastEntry,
shouldCacheEntry,
+ originalCallback, ctx, newHandle);
+ return null;
+ });
+ return null;
Review Comment:
To clarify, your concern about fairness is not to have a global FIFO, but is
to ensure that the QoS for independent subscriptions is fair, right? A global
queue would be fair in that it would be FIFO, but could risk becoming a
performance bottleneck. However, we'd only ever queue pending reads when we're
over the memory threshold, so the performance is already going to be degraded.
One potential option for ensuring fairness is to put that on the rate
limiting solution.
> With the current mechanism we are scheduling the read on the Thread that
is pinned to the ManagerLedger, and this naturally adds some kind of back
pressure depending on the demands of reads of the ML.
One potential issue for this solution is that the managed ledger thread is
already responsible for a lot. This scheduled task has no back off, so it will
run very frequently and could become expensive if there is contention on the
PendingReadsLimiter lock.
> With this approach basically each ML will have its own list of waiting
pending reads (the internal queue of the pinned executor).
It seems to me the current solution has a race where each ML is competing to
get the memory permits. The race does not prevent a single ML from getting all
of the permits.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]