This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 37dbf7254e64d04a3f71baa904ddcfa0d0e227bb Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Sat May 31 01:11:01 2025 +0800 [improve][broker]Improve the log when encountered in-flight read limitation (#24359) --- .../mledger/impl/cache/InflightReadsLimiter.java | 23 ++++++++++++++-------- .../mledger/impl/cache/RangeEntryCacheImpl.java | 5 ++--- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java index 1d5e0a3568f..cda8e975544 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java @@ -177,8 +177,13 @@ public class InflightReadsLimiter implements AutoCloseable { return Optional.of(new Handle(maxReadsInFlightSize, handle.creationTime, true)); } else { if (queuedHandles.size() >= maxReadsInFlightAcquireQueueSize) { - log.warn("Failed to queue handle for acquiring permits: {}, creationTime: {}, remainingBytes:{}", - permits, handle.creationTime, remainingBytes); + log.warn("Failed to queue handle for acquiring permits: {}, creationTime: {}, remainingBytes:{}," + + " maxReadsInFlightAcquireQueueSize:{}, pending-queue-size: {}, please increase broker" + + " config managedLedgerMaxReadsInFlightPermitsAcquireQueueSize and confirm the configuration of" + + " managedLedgerMaxReadsInFlightSizeInMB and" + + " managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis are suitable.", + permits, handle.creationTime, remainingBytes, maxReadsInFlightAcquireQueueSize, + queuedHandles.size()); return Optional.of(new Handle(0, handle.creationTime, false)); } else { queuedHandles.offer(new QueuedHandle(handle, callback)); @@ -223,15 +228,17 @@ public class InflightReadsLimiter implements AutoCloseable { } private void handleTimeout(QueuedHandle queuedHandle) { - if (log.isDebugEnabled()) { - log.debug("timed out queued permits: {}, creationTime: {}, remainingBytes:{}", - queuedHandle.handle.permits, queuedHandle.handle.creationTime, remainingBytes); - } + log.warn("timed out queued permits: {}, creationTime: {}, remainingBytes:{}, acquireTimeoutMillis: {}. Please" + + " review whether the BK read requests is fast enough or broker config" + + " managedLedgerMaxReadsInFlightSizeInMB and managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis" + + " are suitable", + queuedHandle.handle.permits, queuedHandle.handle.creationTime, remainingBytes, acquireTimeoutMillis); try { queuedHandle.callback.accept(new Handle(0, queuedHandle.handle.creationTime, false)); } catch (Exception e) { - log.error("Error in callback of timed out queued permits: {}, creationTime: {}, remainingBytes:{}", - queuedHandle.handle.permits, queuedHandle.handle.creationTime, remainingBytes, e); + log.error("Error in callback of timed out queued permits: {}, creationTime: {}, remainingBytes:{}," + + " acquireTimeoutMillis: {}", + queuedHandle.handle.permits, queuedHandle.handle.creationTime, remainingBytes, acquireTimeoutMillis, e); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index fdcff97bd45..ad5630c078a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -335,11 +335,10 @@ public class RangeEntryCacheImpl implements EntryCache { String message = String.format( "Couldn't acquire enough permits on the max reads in flight limiter to read from ledger " + "%d, %s, estimated read size %d bytes for %d entries (check " - + "managedLedgerMaxReadsInFlightSizeInMB, " + + "managedLedgerMaxReadsInFlightPermitsAcquireQueueSize (direct config), " + "managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis and " - + "managedLedgerMaxReadsInFlightPermitsAcquireQueueSize)", lh.getId(), getName(), + + "managedLedgerMaxReadsInFlightSizeInMB)", lh.getId(), getName(), estimatedReadSize, numberOfEntries); - log.error(message); originalCallback.readEntriesFailed(new ManagedLedgerException.TooManyRequestsException(message), ctx); return; }