merlimat commented on code in PR #23901:
URL: https://github.com/apache/pulsar/pull/23901#discussion_r1933072176
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -337,89 +403,36 @@ void asyncReadEntry0WithLimits(ReadHandle lh, long
firstEntry, long lastEntry, b
manager.mlFactoryMBean.recordCacheHits(entriesToReturn.size(),
totalCachedSize);
if (log.isDebugEnabled()) {
- log.debug("[{}] Ledger {} -- Found in cache entries: {}-{}",
ml.getName(), ledgerId, firstEntry,
- lastEntry);
+ log.debug("[{}] Cache hit for {} entries in range {} to {}",
ml.getName(), numberOfEntries,
+ firstPosition, lastPosition);
}
- callback.readEntriesComplete((List) entriesToReturn, ctx);
+ callback.readEntriesComplete(entriesToReturn, ctx);
} else {
if (!cachedEntries.isEmpty()) {
cachedEntries.forEach(entry -> entry.release());
}
// Read all the entries from bookkeeper
- pendingReadsManager.readEntries(lh, firstEntry, lastEntry,
+ pendingReadsManager.readEntries(lh, firstPosition.getEntryId(),
lastPosition.getEntryId(),
shouldCacheEntry, callback, ctx);
-
}
}
- private AsyncCallbacks.ReadEntriesCallback
handlePendingReadsLimits(ReadHandle lh,
- long
firstEntry, long lastEntry,
- boolean
shouldCacheEntry,
-
AsyncCallbacks.ReadEntriesCallback originalCallback,
- Object ctx,
InflightReadsLimiter.Handle handle) {
- InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter();
- if (pendingReadsLimiter.isDisabled()) {
- return originalCallback;
+ @VisibleForTesting
+ public long getEstimatedEntrySize() {
+ long estimatedEntrySize = getAvgEntrySize();
+ if (estimatedEntrySize == 0) {
+ estimatedEntrySize = DEFAULT_ESTIMATED_ENTRY_SIZE;
}
- long estimatedReadSize = (1 + lastEntry - firstEntry)
- * (estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
- final AsyncCallbacks.ReadEntriesCallback callback;
- InflightReadsLimiter.Handle newHandle =
pendingReadsLimiter.acquire(estimatedReadSize, handle);
- if (!newHandle.success) {
- long now = System.currentTimeMillis();
- if (now - newHandle.creationTime > readEntryTimeoutMillis) {
- String message = "Time-out elapsed while acquiring enough
permits "
- + "on the memory limiter to read from ledger "
- + lh.getId()
- + ", " + getName()
- + ", estimated read size " + estimatedReadSize + "
bytes"
- + " for " + (1 + lastEntry - firstEntry)
- + " entries (check
managedLedgerMaxReadsInFlightSizeInMB)";
- log.error(message);
- pendingReadsLimiter.release(newHandle);
- originalCallback.readEntriesFailed(
- new
ManagedLedgerException.TooManyRequestsException(message), ctx);
- return null;
- }
- ml.getExecutor().execute(() -> {
- asyncReadEntry0WithLimits(lh, firstEntry, lastEntry,
shouldCacheEntry,
- originalCallback, ctx, newHandle, true);
- });
- return null;
- } else {
- callback = new AsyncCallbacks.ReadEntriesCallback() {
-
- @Override
- public void readEntriesComplete(List<Entry> entries, Object
ctx) {
- if (!entries.isEmpty()) {
- long size = entries.get(0).getLength();
- estimatedEntrySize = size;
-
- AtomicInteger remainingCount = new
AtomicInteger(entries.size());
- for (Entry entry : entries) {
- ((EntryImpl) entry).onDeallocate(() -> {
- if (remainingCount.decrementAndGet() <= 0) {
- pendingReadsLimiter.release(newHandle);
- }
- });
- }
- } else {
- pendingReadsLimiter.release(newHandle);
- }
- originalCallback.readEntriesComplete(entries, ctx);
- }
+ return estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+ }
- @Override
- public void readEntriesFailed(ManagedLedgerException
exception, Object ctx) {
- pendingReadsLimiter.release(newHandle);
- originalCallback.readEntriesFailed(exception, ctx);
- }
- };
- }
- return callback;
+ private long getAvgEntrySize() {
Review Comment:
Is this going to get called frequently? Using the `LongAdder.sum()` might
have a perf impact
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -295,38 +274,125 @@ public void asyncReadEntry(ReadHandle lh, long
firstEntry, long lastEntry, boole
@SuppressWarnings({ "unchecked", "rawtypes" })
void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry,
boolean shouldCacheEntry,
- final ReadEntriesCallback callback, Object ctx, boolean
withLimits) {
- asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry,
callback, ctx, null, withLimits);
+ final ReadEntriesCallback callback, Object ctx, boolean
acquirePermits) {
+ final long ledgerId = lh.getId();
+ final int numberOfEntries = (int) (lastEntry - firstEntry) + 1;
+ final Position firstPosition = PositionFactory.create(ledgerId,
firstEntry);
+ final Position lastPosition = PositionFactory.create(ledgerId,
lastEntry);
+ asyncReadEntriesByPosition(lh, firstPosition, lastPosition,
numberOfEntries, shouldCacheEntry, callback, ctx,
+ acquirePermits);
}
- void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long
lastEntry, boolean shouldCacheEntry,
- final ReadEntriesCallback originalCallback, Object ctx,
InflightReadsLimiter.Handle handle,
- boolean withLimits) {
- AsyncCallbacks.ReadEntriesCallback callback;
- if (withLimits) {
- callback = handlePendingReadsLimits(lh, firstEntry, lastEntry,
shouldCacheEntry, originalCallback, ctx,
- handle);
+ void asyncReadEntriesByPosition(ReadHandle lh, Position firstPosition,
Position lastPosition, int numberOfEntries,
+ boolean shouldCacheEntry, final
ReadEntriesCallback originalCallback,
+ Object ctx, boolean acquirePermits) {
+ checkArgument(firstPosition.getLedgerId() ==
lastPosition.getLedgerId(),
+ "Invalid range. Entries %s and %s should be in the same
ledger.",
+ firstPosition, lastPosition);
+ checkArgument(firstPosition.getLedgerId() == lh.getId(),
+ "Invalid ReadHandle. The ledger %s of the range positions
should match the handle's ledger %s.",
+ firstPosition.getLedgerId(), lh.getId());
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Reading {} entries in range {} to {}",
ml.getName(), numberOfEntries, firstPosition,
+ lastPosition);
+ }
+
+ InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter();
+ if (!acquirePermits || pendingReadsLimiter.isDisabled()) {
+ doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition,
numberOfEntries, shouldCacheEntry,
+ originalCallback, ctx);
} else {
- callback = originalCallback;
+ long estimatedEntrySize = getEstimatedEntrySize();
+ long estimatedReadSize = numberOfEntries * estimatedEntrySize;
+ if (log.isDebugEnabled()) {
+ log.debug("Estimated read size: {} bytes for {} entries with
{} estimated entry size",
+ estimatedReadSize,
+ numberOfEntries, estimatedEntrySize);
+ }
+ Optional<InflightReadsLimiter.Handle> optionalHandle =
+ pendingReadsLimiter.acquire(estimatedReadSize, handle -> {
+ // permits were not immediately available, callback
will be executed when permits are acquired
+ // or timeout
+ ml.getExecutor().execute(() -> {
+ doAsyncReadEntriesWithAcquiredPermits(lh,
firstPosition, lastPosition, numberOfEntries,
+ shouldCacheEntry, originalCallback, ctx,
handle, estimatedReadSize);
+ });
+ });
+ // permits were immediately available and acquired
+ if (optionalHandle.isPresent()) {
+ doAsyncReadEntriesWithAcquiredPermits(lh, firstPosition,
lastPosition, numberOfEntries,
+ shouldCacheEntry, originalCallback, ctx,
optionalHandle.get(), estimatedReadSize);
+ }
}
- if (callback == null) {
+ }
+
+ void doAsyncReadEntriesWithAcquiredPermits(ReadHandle lh, Position
firstPosition, Position lastPosition,
+ int numberOfEntries, boolean
shouldCacheEntry,
+ final ReadEntriesCallback
originalCallback, Object ctx,
+ InflightReadsLimiter.Handle
handle, long estimatedReadSize) {
+ if (!handle.success) {
+ String message = "Couldn't acquire enough permits "
Review Comment:
nit: maybe use `String.format()` ?
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java:
##########
@@ -232,8 +232,66 @@ public PendingRead(PendingReadKey key,
this.ledgerCache = ledgerCache;
}
- private List<EntryImpl> keepEntries(List<EntryImpl> list, long
startEntry, long endEntry) {
- List<EntryImpl> result = new ArrayList<>((int) (endEntry -
startEntry));
+ public void attach(CompletableFuture<List<EntryImpl>> handle) {
+ handle.whenComplete((entriesToReturn, error) -> {
+ // execute in the completing thread
+ completeAndRemoveFromCache();
+ // execute the callbacks in the managed ledger executor
+ rangeEntryCache.getManagedLedger().getExecutor().execute(() ->
{
+ if (error != null) {
+ readEntriesFailed(error);
+ } else {
+ readEntriesComplete(entriesToReturn);
+ }
+ });
+ });
+ }
+
+ private synchronized void completeAndRemoveFromCache() {
+ completed = true;
+ // When the read has completed, remove the instance from the
ledgerCache map
+ // so that new reads will go to a new instance.
+ // this is required because we are going to do refcount management
+ // on the results of the callback
+ synchronized (ledgerCache) {
Review Comment:
Could this double-lock end up in deadlock with inverse sequence of locking?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]