This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7eb708940edf1e059d6f8f7218fff8f458706246 Author: fengyubiao <[email protected]> AuthorDate: Sat May 31 01:11:01 2025 +0800 [improve][broker]Improve the log when encountered in-flight read limitation (#24359) (cherry picked from commit 9c504f59b7431475f46858dc1bc10775648287dc) --- .../mledger/impl/cache/InflightReadsLimiter.java | 23 ++++++++++++++-------- .../mledger/impl/cache/RangeEntryCacheImpl.java | 5 ++--- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java index c2b9b82b694..dbec022eb71 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java @@ -132,8 +132,13 @@ public class InflightReadsLimiter { return Optional.of(new Handle(maxReadsInFlightSize, handle.creationTime, true)); } else { if (queuedHandles.size() >= maxReadsInFlightAcquireQueueSize) { - log.warn("Failed to queue handle for acquiring permits: {}, creationTime: {}, remainingBytes:{}", - permits, handle.creationTime, remainingBytes); + log.warn("Failed to queue handle for acquiring permits: {}, creationTime: {}, remainingBytes:{}," + + " maxReadsInFlightAcquireQueueSize:{}, pending-queue-size: {}, please increase broker" + + " config managedLedgerMaxReadsInFlightPermitsAcquireQueueSize and confirm the configuration of" + + " managedLedgerMaxReadsInFlightSizeInMB and" + + " managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis are suitable.", + permits, handle.creationTime, remainingBytes, maxReadsInFlightAcquireQueueSize, + queuedHandles.size()); return Optional.of(new Handle(0, handle.creationTime, false)); } else { queuedHandles.offer(new QueuedHandle(handle, callback)); @@ -178,15 +183,17 @@ public class InflightReadsLimiter { } private void handleTimeout(QueuedHandle queuedHandle) { - if (log.isDebugEnabled()) { - log.debug("timed out queued permits: {}, creationTime: {}, remainingBytes:{}", - queuedHandle.handle.permits, queuedHandle.handle.creationTime, remainingBytes); - } + log.warn("timed out queued permits: {}, creationTime: {}, remainingBytes:{}, acquireTimeoutMillis: {}. Please" + + " review whether the BK read requests is fast enough or broker config" + + " managedLedgerMaxReadsInFlightSizeInMB and managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis" + + " are suitable", + queuedHandle.handle.permits, queuedHandle.handle.creationTime, remainingBytes, acquireTimeoutMillis); try { queuedHandle.callback.accept(new Handle(0, queuedHandle.handle.creationTime, false)); } catch (Exception e) { - log.error("Error in callback of timed out queued permits: {}, creationTime: {}, remainingBytes:{}", - queuedHandle.handle.permits, queuedHandle.handle.creationTime, remainingBytes, e); + log.error("Error in callback of timed out queued permits: {}, creationTime: {}, remainingBytes:{}," + + " acquireTimeoutMillis: {}", + queuedHandle.handle.permits, queuedHandle.handle.creationTime, remainingBytes, acquireTimeoutMillis, e); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index 6c1a445f9c7..cfaad242d80 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -334,11 +334,10 @@ public class RangeEntryCacheImpl implements EntryCache { String message = String.format( "Couldn't acquire enough permits on the max reads in flight limiter to read from ledger " + "%d, %s, estimated read size %d bytes for %d entries (check " - + "managedLedgerMaxReadsInFlightSizeInMB, " + + "managedLedgerMaxReadsInFlightPermitsAcquireQueueSize (direct config), " + "managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis and " - + "managedLedgerMaxReadsInFlightPermitsAcquireQueueSize)", lh.getId(), getName(), + + "managedLedgerMaxReadsInFlightSizeInMB)", lh.getId(), getName(), estimatedReadSize, numberOfEntries); - log.error(message); originalCallback.readEntriesFailed(new ManagedLedgerException.TooManyRequestsException(message), ctx); return; }
