lhotari commented on code in PR #23901:
URL: https://github.com/apache/pulsar/pull/23901#discussion_r1933491175
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -295,38 +274,125 @@ public void asyncReadEntry(ReadHandle lh, long
firstEntry, long lastEntry, boole
@SuppressWarnings({ "unchecked", "rawtypes" })
void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry,
boolean shouldCacheEntry,
- final ReadEntriesCallback callback, Object ctx, boolean
withLimits) {
- asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry,
callback, ctx, null, withLimits);
+ final ReadEntriesCallback callback, Object ctx, boolean
acquirePermits) {
+ final long ledgerId = lh.getId();
+ final int numberOfEntries = (int) (lastEntry - firstEntry) + 1;
+ final Position firstPosition = PositionFactory.create(ledgerId,
firstEntry);
+ final Position lastPosition = PositionFactory.create(ledgerId,
lastEntry);
+ asyncReadEntriesByPosition(lh, firstPosition, lastPosition,
numberOfEntries, shouldCacheEntry, callback, ctx,
+ acquirePermits);
}
- void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long
lastEntry, boolean shouldCacheEntry,
- final ReadEntriesCallback originalCallback, Object ctx,
InflightReadsLimiter.Handle handle,
- boolean withLimits) {
- AsyncCallbacks.ReadEntriesCallback callback;
- if (withLimits) {
- callback = handlePendingReadsLimits(lh, firstEntry, lastEntry,
shouldCacheEntry, originalCallback, ctx,
- handle);
+ void asyncReadEntriesByPosition(ReadHandle lh, Position firstPosition,
Position lastPosition, int numberOfEntries,
+ boolean shouldCacheEntry, final
ReadEntriesCallback originalCallback,
+ Object ctx, boolean acquirePermits) {
+ checkArgument(firstPosition.getLedgerId() ==
lastPosition.getLedgerId(),
+ "Invalid range. Entries %s and %s should be in the same
ledger.",
+ firstPosition, lastPosition);
+ checkArgument(firstPosition.getLedgerId() == lh.getId(),
+ "Invalid ReadHandle. The ledger %s of the range positions
should match the handle's ledger %s.",
+ firstPosition.getLedgerId(), lh.getId());
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Reading {} entries in range {} to {}",
ml.getName(), numberOfEntries, firstPosition,
+ lastPosition);
+ }
+
+ InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter();
+ if (!acquirePermits || pendingReadsLimiter.isDisabled()) {
+ doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition,
numberOfEntries, shouldCacheEntry,
+ originalCallback, ctx);
} else {
- callback = originalCallback;
+ long estimatedEntrySize = getEstimatedEntrySize();
+ long estimatedReadSize = numberOfEntries * estimatedEntrySize;
+ if (log.isDebugEnabled()) {
+ log.debug("Estimated read size: {} bytes for {} entries with
{} estimated entry size",
+ estimatedReadSize,
+ numberOfEntries, estimatedEntrySize);
+ }
+ Optional<InflightReadsLimiter.Handle> optionalHandle =
+ pendingReadsLimiter.acquire(estimatedReadSize, handle -> {
+ // permits were not immediately available, callback
will be executed when permits are acquired
+ // or timeout
+ ml.getExecutor().execute(() -> {
+ doAsyncReadEntriesWithAcquiredPermits(lh,
firstPosition, lastPosition, numberOfEntries,
+ shouldCacheEntry, originalCallback, ctx,
handle, estimatedReadSize);
+ });
+ });
+ // permits were immediately available and acquired
+ if (optionalHandle.isPresent()) {
+ doAsyncReadEntriesWithAcquiredPermits(lh, firstPosition,
lastPosition, numberOfEntries,
+ shouldCacheEntry, originalCallback, ctx,
optionalHandle.get(), estimatedReadSize);
+ }
}
- if (callback == null) {
+ }
+
+ void doAsyncReadEntriesWithAcquiredPermits(ReadHandle lh, Position
firstPosition, Position lastPosition,
+ int numberOfEntries, boolean
shouldCacheEntry,
+ final ReadEntriesCallback
originalCallback, Object ctx,
+ InflightReadsLimiter.Handle
handle, long estimatedReadSize) {
+ if (!handle.success) {
+ String message = "Couldn't acquire enough permits "
Review Comment:
Yes, that makes it cleaner.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]