This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 37f0bc22dff [fix][broker] Make InflightReadsLimiter asynchronous and
apply it for replay queue reads (#23901)
37f0bc22dff is described below
commit 37f0bc22dffe6468fc775d68ad1a9c7f9516cf3a
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Jan 29 22:11:56 2025 +0200
[fix][broker] Make InflightReadsLimiter asynchronous and apply it for
replay queue reads (#23901)
(cherry picked from commit c5173d5e15efade90afb9b0b1c19f3ba5b3aab37)
---
.../mledger/ManagedLedgerFactoryConfig.java | 12 +
.../mledger/impl/ManagedLedgerFactoryImpl.java | 2 +-
.../mledger/impl/cache/InflightReadsLimiter.java | 250 ++++++---
.../mledger/impl/cache/PendingReadsManager.java | 156 +++---
.../mledger/impl/cache/RangeEntryCacheImpl.java | 298 ++++++-----
.../impl/cache/RangeEntryCacheManagerImpl.java | 15 +-
.../impl/InflightReadsLimiterIntegrationTest.java | 13 +-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 92 ++--
.../impl/cache/InflightReadsLimiterTest.java | 577 ++++++++++++++++-----
.../impl/cache/PendingReadsManagerTest.java | 4 +-
.../apache/pulsar/broker/ServiceConfiguration.java | 9 +
.../pulsar/broker/ManagedLedgerClientFactory.java | 17 +-
12 files changed, 992 insertions(+), 453 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index 5aa4e8374d7..349ec05329f 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -62,6 +62,18 @@ public class ManagedLedgerFactoryConfig {
*/
private long managedLedgerMaxReadsInFlightSize = 0;
+ /**
+ * Maximum time to wait for acquiring permits for max reads in flight when
managedLedgerMaxReadsInFlightSizeInMB is
+ * set (>0) and the limit is reached.
+ */
+ private long managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis =
60000;
+
+ /**
+ * Maximum number of reads that can be queued for acquiring permits for
max reads in flight when
+ * managedLedgerMaxReadsInFlightSizeInMB is set (>0) and the limit is
reached.
+ */
+ private int managedLedgerMaxReadsInFlightPermitsAcquireQueueSize = 10000;
+
/**
* Whether trace managed ledger task execution time.
*/
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index ed6f63c3938..fa0d8255207 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -198,7 +198,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
config.getManagedCursorInfoCompressionType());
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
- this.entryCacheManager = new RangeEntryCacheManagerImpl(this);
+ this.entryCacheManager = new RangeEntryCacheManagerImpl(this,
scheduledExecutor);
this.statsTask =
scheduledExecutor.scheduleWithFixedDelay(catchingAndLoggingThrowables(this::refreshStats),
0, config.getStatsPeriodSeconds(), TimeUnit.SECONDS);
this.flushCursorsTask =
scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors),
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
index b946dc09a0c..3a6bb3cd039 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
@@ -20,9 +20,13 @@ package org.apache.bookkeeper.mledger.impl.cache;
import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.Gauge;
-import lombok.AllArgsConstructor;
-import lombok.ToString;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
+import org.jctools.queues.SpscArrayQueue;
@Slf4j
public class InflightReadsLimiter {
@@ -41,15 +45,35 @@ public class InflightReadsLimiter {
private final long maxReadsInFlightSize;
private long remainingBytes;
+ private final long acquireTimeoutMillis;
+ private final ScheduledExecutorService timeOutExecutor;
+ private final boolean enabled;
- public InflightReadsLimiter(long maxReadsInFlightSize) {
- if (maxReadsInFlightSize <= 0) {
+ record Handle(long permits, long creationTime, boolean success) {
+ }
+
+ record QueuedHandle(Handle handle, Consumer<Handle> callback) {
+ }
+
+ private final Queue<QueuedHandle> queuedHandles;
+ private boolean timeoutCheckRunning = false;
+
+ public InflightReadsLimiter(long maxReadsInFlightSize, int
maxReadsInFlightAcquireQueueSize,
+ long acquireTimeoutMillis,
ScheduledExecutorService timeOutExecutor) {
+ this.maxReadsInFlightSize = maxReadsInFlightSize;
+ this.remainingBytes = maxReadsInFlightSize;
+ this.acquireTimeoutMillis = acquireTimeoutMillis;
+ this.timeOutExecutor = timeOutExecutor;
+ if (maxReadsInFlightSize > 0) {
+ enabled = true;
+ this.queuedHandles = new
SpscArrayQueue<>(maxReadsInFlightAcquireQueueSize);
+ } else {
+ enabled = false;
+ this.queuedHandles = null;
// set it to -1 in order to show in the metrics that the metric is
not available
PULSAR_ML_READS_BUFFER_SIZE.set(-1);
PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE.set(-1);
}
- this.maxReadsInFlightSize = maxReadsInFlightSize;
- this.remainingBytes = maxReadsInFlightSize;
}
@VisibleForTesting
@@ -57,70 +81,178 @@ public class InflightReadsLimiter {
return remainingBytes;
}
- @AllArgsConstructor
- @ToString
- static class Handle {
- final long acquiredPermits;
- final boolean success;
- final int trials;
+ private static final Handle DISABLED = new Handle(0, 0, true);
+ private static final Optional<Handle> DISABLED_OPTIONAL =
Optional.of(DISABLED);
- final long creationTime;
+ /**
+ * Acquires permits from the limiter. If the limiter is disabled, it will
immediately return a successful handle.
+ * If permits are available, it will return a handle with the acquired
permits. If no permits are available,
+ * it will return an empty optional and the callback will be called when
permits become available or when the
+ * acquire timeout is reached. The success field in the handle passed to
the callback will be false if the acquire
+ * operation times out. The callback should be non-blocking and run on a
desired executor handled within the
+ * callback itself.
+ *
+ * A successful handle will have the success field set to true, and the
caller must call release with the handle
+ * when the permits are no longer needed.
+ *
+ * If an unsuccessful handle is returned immediately, it means that the
queue limit has been reached and the
+ * callback will not be called. The caller should fail the read operation
in this case to apply backpressure.
+ *
+ * @param permits the number of permits to acquire
+ * @param callback the callback to be called when the permits are acquired
or timed out
+ * @return an optional handle that contains the permits if acquired,
otherwise an empty optional
+ */
+ public Optional<Handle> acquire(long permits, Consumer<Handle> callback) {
+ if (isDisabled()) {
+ return DISABLED_OPTIONAL;
+ }
+ return internalAcquire(permits, callback);
}
- private static final Handle DISABLED = new Handle(0, true, 0, -1);
+ private synchronized Optional<Handle> internalAcquire(long permits,
Consumer<Handle> callback) {
+ Handle handle = new Handle(permits, System.currentTimeMillis(), true);
+ if (remainingBytes >= permits) {
+ remainingBytes -= permits;
+ if (log.isDebugEnabled()) {
+ log.debug("acquired permits: {}, creationTime: {},
remainingBytes:{}", permits, handle.creationTime,
+ remainingBytes);
+ }
+ updateMetrics();
+ return Optional.of(handle);
+ } else if (permits > maxReadsInFlightSize && remainingBytes ==
maxReadsInFlightSize) {
+ remainingBytes = 0;
+ if (log.isInfoEnabled()) {
+ log.info("Requested permits {} exceeded maxReadsInFlightSize
{}, creationTime: {}, remainingBytes:{}. "
+ + "Allowing request with permits set to
maxReadsInFlightSize.",
+ permits, maxReadsInFlightSize, handle.creationTime,
remainingBytes);
+ }
+ updateMetrics();
+ return Optional.of(new Handle(maxReadsInFlightSize,
handle.creationTime, true));
+ } else {
+ if (queuedHandles.offer(new QueuedHandle(handle, callback))) {
+ scheduleTimeOutCheck(acquireTimeoutMillis);
+ return Optional.empty();
+ } else {
+ log.warn("Failed to queue handle for acquiring permits: {},
creationTime: {}, remainingBytes:{}",
+ permits, handle.creationTime, remainingBytes);
+ return Optional.of(new Handle(0, handle.creationTime, false));
+ }
+ }
+ }
- Handle acquire(long permits, Handle current) {
- if (maxReadsInFlightSize <= 0) {
- // feature is disabled
- return DISABLED;
+ private synchronized void scheduleTimeOutCheck(long delayMillis) {
+ if (acquireTimeoutMillis <= 0) {
+ return;
+ }
+ if (!timeoutCheckRunning) {
+ timeoutCheckRunning = true;
+ timeOutExecutor.schedule(this::timeoutCheck, delayMillis,
TimeUnit.MILLISECONDS);
}
- synchronized (this) {
- try {
- if (current == null) {
- if (remainingBytes == 0) {
- return new Handle(0, false, 1,
System.currentTimeMillis());
- }
- if (remainingBytes >= permits) {
- remainingBytes -= permits;
- return new Handle(permits, true, 1,
System.currentTimeMillis());
- } else {
- long possible = remainingBytes;
- remainingBytes = 0;
- return new Handle(possible, false, 1,
System.currentTimeMillis());
- }
+ }
+
+ private synchronized void timeoutCheck() {
+ timeoutCheckRunning = false;
+ long delay = 0;
+ while (true) {
+ QueuedHandle queuedHandle = queuedHandles.peek();
+ if (queuedHandle != null) {
+ long age = System.currentTimeMillis() -
queuedHandle.handle.creationTime;
+ if (age >= acquireTimeoutMillis) {
+ // remove the peeked handle from the queue
+ queuedHandles.poll();
+ handleTimeout(queuedHandle);
} else {
- if (current.trials >= 4 && current.acquiredPermits > 0) {
- remainingBytes += current.acquiredPermits;
- return new Handle(0, false, 1, current.creationTime);
- }
- if (remainingBytes == 0) {
- return new Handle(current.acquiredPermits, false,
current.trials + 1,
- current.creationTime);
- }
- long needed = permits - current.acquiredPermits;
- if (remainingBytes >= needed) {
- remainingBytes -= needed;
- return new Handle(permits, true, current.trials + 1,
current.creationTime);
- } else {
- long possible = remainingBytes;
- remainingBytes = 0;
- return new Handle(current.acquiredPermits + possible,
false,
- current.trials + 1, current.creationTime);
- }
+ delay = acquireTimeoutMillis - age;
+ break;
}
- } finally {
- updateMetrics();
+ } else {
+ break;
}
}
+ if (delay > 0) {
+ scheduleTimeOutCheck(delay);
+ }
+ }
+
+ private void handleTimeout(QueuedHandle queuedHandle) {
+ if (log.isDebugEnabled()) {
+ log.debug("timed out queued permits: {}, creationTime: {},
remainingBytes:{}",
+ queuedHandle.handle.permits,
queuedHandle.handle.creationTime, remainingBytes);
+ }
+ try {
+ queuedHandle.callback.accept(new Handle(0,
queuedHandle.handle.creationTime, false));
+ } catch (Exception e) {
+ log.error("Error in callback of timed out queued permits: {},
creationTime: {}, remainingBytes:{}",
+ queuedHandle.handle.permits,
queuedHandle.handle.creationTime, remainingBytes, e);
+ }
}
- void release(Handle handle) {
+ /**
+ * Releases permits back to the limiter. If the handle is disabled, this
method will be a no-op.
+ *
+ * @param handle the handle containing the permits to release
+ */
+ public void release(Handle handle) {
if (handle == DISABLED) {
return;
}
- synchronized (this) {
- remainingBytes += handle.acquiredPermits;
- updateMetrics();
+ internalRelease(handle);
+ }
+
+ private synchronized void internalRelease(Handle handle) {
+ if (log.isDebugEnabled()) {
+ log.debug("release permits: {}, creationTime: {},
remainingBytes:{}", handle.permits,
+ handle.creationTime, getRemainingBytes());
+ }
+ remainingBytes += handle.permits;
+ while (true) {
+ QueuedHandle queuedHandle = queuedHandles.peek();
+ if (queuedHandle != null) {
+ boolean timedOut = acquireTimeoutMillis > 0
+ && System.currentTimeMillis() -
queuedHandle.handle.creationTime > acquireTimeoutMillis;
+ if (timedOut) {
+ // remove the peeked handle from the queue
+ queuedHandles.poll();
+ handleTimeout(queuedHandle);
+ } else if (remainingBytes >= queuedHandle.handle.permits
+ || queuedHandle.handle.permits > maxReadsInFlightSize
+ && remainingBytes == maxReadsInFlightSize) {
+ // remove the peeked handle from the queue
+ queuedHandles.poll();
+ handleQueuedHandle(queuedHandle);
+ } else {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+ updateMetrics();
+ }
+
+ private void handleQueuedHandle(QueuedHandle queuedHandle) {
+ long permits = queuedHandle.handle.permits;
+ Handle handleForCallback = queuedHandle.handle;
+ if (permits > maxReadsInFlightSize && remainingBytes ==
maxReadsInFlightSize) {
+ remainingBytes = 0;
+ if (log.isInfoEnabled()) {
+ log.info("Requested permits {} exceeded maxReadsInFlightSize
{}, creationTime: {}, remainingBytes:{}. "
+ + "Allowing request with permits set to
maxReadsInFlightSize.",
+ permits, maxReadsInFlightSize,
queuedHandle.handle.creationTime, remainingBytes);
+ }
+ handleForCallback = new Handle(maxReadsInFlightSize,
queuedHandle.handle.creationTime, true);
+ } else {
+ remainingBytes -= permits;
+ if (log.isDebugEnabled()) {
+ log.debug("acquired queued permits: {}, creationTime: {},
remainingBytes:{}",
+ permits, queuedHandle.handle.creationTime,
remainingBytes);
+ }
+ }
+ try {
+ queuedHandle.callback.accept(handleForCallback);
+ } catch (Exception e) {
+ log.error("Error in callback of acquired queued permits: {},
creationTime: {}, remainingBytes:{}",
+ handleForCallback.permits, handleForCallback.creationTime,
remainingBytes, e);
}
}
@@ -130,8 +262,6 @@ public class InflightReadsLimiter {
}
public boolean isDisabled() {
- return maxReadsInFlightSize <= 0;
+ return !enabled;
}
-
-
-}
+}
\ No newline at end of file
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
index d733b54dd13..5944199287e 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
@@ -25,9 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import lombok.AllArgsConstructor;
-import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -95,15 +94,11 @@ public class PendingReadsManager {
this.rangeEntryCache = rangeEntryCache;
}
- @Value
- private static class PendingReadKey {
- private final long startEntry;
- private final long endEntry;
+ private record PendingReadKey(long startEntry, long endEntry) {
long size() {
return endEntry - startEntry + 1;
}
-
boolean includes(PendingReadKey other) {
return startEntry <= other.startEntry && other.endEntry <=
endEntry;
}
@@ -135,25 +130,18 @@ public class PendingReadsManager {
}
- @AllArgsConstructor
- private static final class ReadEntriesCallbackWithContext {
- final AsyncCallbacks.ReadEntriesCallback callback;
- final Object ctx;
- final long startEntry;
- final long endEntry;
+ private record
ReadEntriesCallbackWithContext(AsyncCallbacks.ReadEntriesCallback callback,
Object ctx,
+ long startEntry, long
endEntry) {
}
- @AllArgsConstructor
- private static final class FindPendingReadOutcome {
- final PendingRead pendingRead;
- final PendingReadKey missingOnLeft;
- final PendingReadKey missingOnRight;
+ private record FindPendingReadOutcome(PendingRead pendingRead,
+ PendingReadKey missingOnLeft,
PendingReadKey missingOnRight) {
boolean needsAdditionalReads() {
return missingOnLeft != null || missingOnRight != null;
}
}
- private FindPendingReadOutcome findPendingRead(PendingReadKey key,
Map<PendingReadKey,
+ private FindPendingReadOutcome findPendingRead(PendingReadKey key,
ConcurrentMap<PendingReadKey,
PendingRead> ledgerCache, AtomicBoolean created) {
synchronized (ledgerCache) {
PendingRead existing = ledgerCache.get(key);
@@ -222,18 +210,74 @@ public class PendingReadsManager {
private class PendingRead {
final PendingReadKey key;
- final Map<PendingReadKey, PendingRead> ledgerCache;
+ final ConcurrentMap<PendingReadKey, PendingRead> ledgerCache;
final List<ReadEntriesCallbackWithContext> callbacks = new
ArrayList<>(1);
boolean completed = false;
public PendingRead(PendingReadKey key,
- Map<PendingReadKey, PendingRead> ledgerCache) {
+ ConcurrentMap<PendingReadKey, PendingRead>
ledgerCache) {
this.key = key;
this.ledgerCache = ledgerCache;
}
- private List<EntryImpl> keepEntries(List<EntryImpl> list, long
startEntry, long endEntry) {
- List<EntryImpl> result = new ArrayList<>((int) (endEntry -
startEntry));
+ public void attach(CompletableFuture<List<EntryImpl>> handle) {
+ handle.whenComplete((entriesToReturn, error) -> {
+ // execute in the completing thread
+ completeAndRemoveFromCache();
+ // execute the callbacks in the managed ledger executor
+ rangeEntryCache.getManagedLedger().getExecutor().execute(() ->
{
+ if (error != null) {
+ readEntriesFailed(error);
+ } else {
+ readEntriesComplete(entriesToReturn);
+ }
+ });
+ });
+ }
+
+ private synchronized void completeAndRemoveFromCache() {
+ completed = true;
+ // When the read has completed, remove the instance from the
ledgerCache map
+ // so that new reads will go to a new instance.
+ // this is required because we are going to do refcount management
+ // on the results of the callback
+ ledgerCache.remove(key, this);
+ }
+
+ private synchronized void readEntriesComplete(List<EntryImpl>
entriesToReturn) {
+ if (callbacks.size() == 1) {
+ ReadEntriesCallbackWithContext first = callbacks.get(0);
+ if (first.startEntry == key.startEntry
+ && first.endEntry == key.endEntry) {
+ // perfect match, no copy, this is the most common case
+ first.callback.readEntriesComplete((List) entriesToReturn,
+ first.ctx);
+ } else {
+ first.callback.readEntriesComplete(
+ keepEntries(entriesToReturn, first.startEntry,
first.endEntry),
+ first.ctx);
+ }
+ } else {
+ for (ReadEntriesCallbackWithContext callback : callbacks) {
+ callback.callback.readEntriesComplete(
+ copyEntries(entriesToReturn, callback.startEntry,
callback.endEntry),
+ callback.ctx);
+ }
+ for (EntryImpl entry : entriesToReturn) {
+ entry.release();
+ }
+ }
+ }
+
+ private synchronized void readEntriesFailed(Throwable error) {
+ for (ReadEntriesCallbackWithContext callback : callbacks) {
+ ManagedLedgerException mlException =
createManagedLedgerException(error);
+ callback.callback.readEntriesFailed(mlException, callback.ctx);
+ }
+ }
+
+ private List<Entry> keepEntries(List<EntryImpl> list, long startEntry,
long endEntry) {
+ List<Entry> result = new ArrayList<>((int) (endEntry -
startEntry));
for (EntryImpl entry : list) {
long entryId = entry.getEntryId();
if (startEntry <= entryId && entryId <= endEntry) {
@@ -245,62 +289,16 @@ public class PendingReadsManager {
return result;
}
- public void attach(CompletableFuture<List<EntryImpl>> handle) {
- // when the future is done remove this from the map
- // new reads will go to a new instance
- // this is required because we are going to do refcount management
- // on the results of the callback
- handle.whenComplete((___, error) -> {
- synchronized (PendingRead.this) {
- completed = true;
- synchronized (ledgerCache) {
- ledgerCache.remove(key, this);
- }
- }
- });
-
- handle.thenAcceptAsync(entriesToReturn -> {
- synchronized (PendingRead.this) {
- if (callbacks.size() == 1) {
- ReadEntriesCallbackWithContext first =
callbacks.get(0);
- if (first.startEntry == key.startEntry
- && first.endEntry == key.endEntry) {
- // perfect match, no copy, this is the most common
case
- first.callback.readEntriesComplete((List)
entriesToReturn,
- first.ctx);
- } else {
- first.callback.readEntriesComplete(
- (List) keepEntries(entriesToReturn,
first.startEntry, first.endEntry),
- first.ctx);
- }
- } else {
- for (ReadEntriesCallbackWithContext callback :
callbacks) {
- long callbackStartEntry = callback.startEntry;
- long callbackEndEntry = callback.endEntry;
- List<EntryImpl> copy = new ArrayList<>((int)
(callbackEndEntry - callbackStartEntry + 1));
- for (EntryImpl entry : entriesToReturn) {
- long entryId = entry.getEntryId();
- if (callbackStartEntry <= entryId && entryId
<= callbackEndEntry) {
- EntryImpl entryCopy =
EntryImpl.create(entry);
- copy.add(entryCopy);
- }
- }
- callback.callback.readEntriesComplete((List) copy,
callback.ctx);
- }
- for (EntryImpl entry : entriesToReturn) {
- entry.release();
- }
- }
- }
- },
rangeEntryCache.getManagedLedger().getExecutor()).exceptionally(exception -> {
- synchronized (PendingRead.this) {
- for (ReadEntriesCallbackWithContext callback : callbacks) {
- ManagedLedgerException mlException =
createManagedLedgerException(exception);
- callback.callback.readEntriesFailed(mlException,
callback.ctx);
- }
+ private List<Entry> copyEntries(List<EntryImpl> entriesToReturn, long
startEntry, long endEntry) {
+ List<Entry> result = new ArrayList<>((int) (endEntry - startEntry
+ 1));
+ for (EntryImpl entry : entriesToReturn) {
+ long entryId = entry.getEntryId();
+ if (startEntry <= entryId && entryId <= endEntry) {
+ EntryImpl entryCopy = EntryImpl.create(entry);
+ result.add(entryCopy);
}
- return null;
- });
+ }
+ return result;
}
synchronized boolean addListener(AsyncCallbacks.ReadEntriesCallback
callback,
@@ -318,7 +316,7 @@ public class PendingReadsManager {
final AsyncCallbacks.ReadEntriesCallback callback, Object
ctx) {
final PendingReadKey key = new PendingReadKey(firstEntry, lastEntry);
- Map<PendingReadKey, PendingRead> pendingReadsForLedger =
+ ConcurrentMap<PendingReadKey, PendingRead> pendingReadsForLedger =
cachedPendingReads.computeIfAbsent(lh.getId(), (l) -> new
ConcurrentHashMap<>());
boolean listenerAdded = false;
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index f378acfba14..c8d14cebebc 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -22,18 +22,19 @@ import static
com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.Iterator;
+import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
-import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.Entry;
@@ -57,6 +58,8 @@ public class RangeEntryCacheImpl implements EntryCache {
* Overhead per-entry to take into account the envelope.
*/
public static final long BOOKKEEPER_READ_OVERHEAD_PER_ENTRY = 64;
+ private static final int DEFAULT_ESTIMATED_ENTRY_SIZE = 10 * 1024;
+ private static final boolean DEFAULT_CACHE_INDIVIDUAL_READ_ENTRY = false;
private final RangeEntryCacheManagerImpl manager;
final ManagedLedgerImpl ml;
@@ -65,18 +68,16 @@ public class RangeEntryCacheImpl implements EntryCache {
private final boolean copyEntries;
private final PendingReadsManager pendingReadsManager;
- private volatile long estimatedEntrySize = 10 * 1024;
-
- private final long readEntryTimeoutMillis;
-
private static final double MB = 1024 * 1024;
+ private final LongAdder totalAddedEntriesSize = new LongAdder();
+ private final LongAdder totalAddedEntriesCount = new LongAdder();
+
public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager,
ManagedLedgerImpl ml, boolean copyEntries) {
this.manager = manager;
this.ml = ml;
this.pendingReadsManager = new PendingReadsManager(this);
this.interceptor = ml.getManagedLedgerInterceptor();
- this.readEntryTimeoutMillis =
getManagedLedgerConfig().getReadEntryTimeoutSeconds();
this.entries = new RangeCache<>(EntryImpl::getLength,
EntryImpl::getTimestamp);
this.copyEntries = copyEntries;
@@ -117,17 +118,18 @@ public class RangeEntryCacheImpl implements EntryCache {
@Override
public boolean insert(EntryImpl entry) {
+ int entryLength = entry.getLength();
if (!manager.hasSpaceInCache()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping cache while doing eviction: {} -
size: {}", ml.getName(), entry.getPosition(),
- entry.getLength());
+ entryLength);
}
return false;
}
if (log.isDebugEnabled()) {
log.debug("[{}] Adding entry to cache: {} - size: {}",
ml.getName(), entry.getPosition(),
- entry.getLength());
+ entryLength);
}
PositionImpl position = entry.getPosition();
@@ -149,7 +151,9 @@ public class RangeEntryCacheImpl implements EntryCache {
EntryImpl cacheEntry = EntryImpl.create(position, cachedData);
cachedData.release();
if (entries.put(position, cacheEntry)) {
- manager.entryAdded(entry.getLength());
+ totalAddedEntriesSize.add(entryLength);
+ totalAddedEntriesCount.increment();
+ manager.entryAdded(entryLength);
return true;
} else {
// entry was not inserted into cache, we need to discard it
@@ -225,7 +229,23 @@ public class RangeEntryCacheImpl implements EntryCache {
public void asyncReadEntry(ReadHandle lh, PositionImpl position, final
ReadEntryCallback callback,
final Object ctx) {
try {
- asyncReadEntry0(lh, position, callback, ctx);
+ asyncReadEntriesByPosition(lh, position, position, 1,
+ DEFAULT_CACHE_INDIVIDUAL_READ_ENTRY,
+ new ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object
ctx) {
+ if (entries.isEmpty()) {
+ callback.readEntryFailed(new
ManagedLedgerException("Could not read given position"), ctx);
+ } else {
+ callback.readEntryComplete(entries.get(0), ctx);
+ }
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException
exception, Object ctx) {
+ callback.readEntryFailed(exception, ctx);
+ }
+ }, ctx, true);
} catch (Throwable t) {
log.warn("failed to read entries for {}-{}", lh.getId(), position,
t);
// invalidate all entries related to ledger from the cache (it
might happen if entry gets corrupt
@@ -236,47 +256,6 @@ public class RangeEntryCacheImpl implements EntryCache {
}
}
- private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final
ReadEntryCallback callback,
- final Object ctx) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Reading entry ledger {}: {}", ml.getName(),
lh.getId(), position.getEntryId());
- }
- EntryImpl entry = entries.get(position);
- if (entry != null) {
- EntryImpl cachedEntry = EntryImpl.create(entry);
- entry.release();
- manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength());
- callback.readEntryComplete(cachedEntry, ctx);
- } else {
- ReadEntryUtils.readAsync(ml, lh, position.getEntryId(),
position.getEntryId()).thenAcceptAsync(
- ledgerEntries -> {
- try {
- Iterator<LedgerEntry> iterator =
ledgerEntries.iterator();
- if (iterator.hasNext()) {
- LedgerEntry ledgerEntry = iterator.next();
- EntryImpl returnEntry =
RangeEntryCacheManagerImpl.create(ledgerEntry, interceptor);
-
-
ml.getMbean().recordReadEntriesOpsCacheMisses(1, returnEntry.getLength());
- manager.mlFactoryMBean.recordCacheMiss(1,
returnEntry.getLength());
- ml.getMbean().addReadEntriesSample(1,
returnEntry.getLength());
- callback.readEntryComplete(returnEntry, ctx);
- } else {
- // got an empty sequence
- callback.readEntryFailed(new
ManagedLedgerException("Could not read given position"),
- ctx);
- }
- } finally {
- ledgerEntries.close();
- }
- }, ml.getExecutor()).exceptionally(exception -> {
- ml.invalidateLedgerHandle(lh);
- pendingReadsManager.invalidateLedger(lh.getId());
-
callback.readEntryFailed(createManagedLedgerException(exception), ctx);
- return null;
- });
- }
- }
-
@Override
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry,
boolean shouldCacheEntry,
final ReadEntriesCallback callback, Object ctx) {
@@ -294,38 +273,123 @@ public class RangeEntryCacheImpl implements EntryCache {
@SuppressWarnings({ "unchecked", "rawtypes" })
void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry,
boolean shouldCacheEntry,
- final ReadEntriesCallback callback, Object ctx, boolean
withLimits) {
- asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry,
callback, ctx, null, withLimits);
+ final ReadEntriesCallback callback, Object ctx, boolean
acquirePermits) {
+ final long ledgerId = lh.getId();
+ final int numberOfEntries = (int) (lastEntry - firstEntry) + 1;
+ final PositionImpl firstPosition = PositionImpl.get(ledgerId,
firstEntry);
+ final PositionImpl lastPosition = PositionImpl.get(ledgerId,
lastEntry);
+ asyncReadEntriesByPosition(lh, firstPosition, lastPosition,
numberOfEntries, shouldCacheEntry, callback, ctx,
+ acquirePermits);
}
- void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long
lastEntry, boolean shouldCacheEntry,
- final ReadEntriesCallback originalCallback, Object ctx,
InflightReadsLimiter.Handle handle,
- boolean withLimits) {
- AsyncCallbacks.ReadEntriesCallback callback;
- if (withLimits) {
- callback = handlePendingReadsLimits(lh, firstEntry, lastEntry,
shouldCacheEntry, originalCallback, ctx,
- handle);
+ void asyncReadEntriesByPosition(ReadHandle lh, PositionImpl firstPosition,
PositionImpl lastPosition,
+ int numberOfEntries, boolean
shouldCacheEntry,
+ final ReadEntriesCallback
originalCallback, Object ctx, boolean acquirePermits) {
+ checkArgument(firstPosition.getLedgerId() ==
lastPosition.getLedgerId(),
+ "Invalid range. Entries %s and %s should be in the same
ledger.",
+ firstPosition, lastPosition);
+ checkArgument(firstPosition.getLedgerId() == lh.getId(),
+ "Invalid ReadHandle. The ledger %s of the range positions
should match the handle's ledger %s.",
+ firstPosition.getLedgerId(), lh.getId());
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Reading {} entries in range {} to {}",
ml.getName(), numberOfEntries, firstPosition,
+ lastPosition);
+ }
+
+ InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter();
+ if (!acquirePermits || pendingReadsLimiter.isDisabled()) {
+ doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition,
numberOfEntries, shouldCacheEntry,
+ originalCallback, ctx);
} else {
- callback = originalCallback;
+ long estimatedEntrySize = getEstimatedEntrySize();
+ long estimatedReadSize = numberOfEntries * estimatedEntrySize;
+ if (log.isDebugEnabled()) {
+ log.debug("Estimated read size: {} bytes for {} entries with
{} estimated entry size",
+ estimatedReadSize,
+ numberOfEntries, estimatedEntrySize);
+ }
+ Optional<InflightReadsLimiter.Handle> optionalHandle =
+ pendingReadsLimiter.acquire(estimatedReadSize, handle -> {
+ // permits were not immediately available, callback
will be executed when permits are acquired
+ // or timeout
+ ml.getExecutor().execute(() -> {
+ doAsyncReadEntriesWithAcquiredPermits(lh,
firstPosition, lastPosition, numberOfEntries,
+ shouldCacheEntry, originalCallback, ctx,
handle, estimatedReadSize);
+ });
+ });
+ // permits were immediately available and acquired
+ if (optionalHandle.isPresent()) {
+ doAsyncReadEntriesWithAcquiredPermits(lh, firstPosition,
lastPosition, numberOfEntries,
+ shouldCacheEntry, originalCallback, ctx,
optionalHandle.get(), estimatedReadSize);
+ }
}
- if (callback == null) {
+ }
+
+ void doAsyncReadEntriesWithAcquiredPermits(ReadHandle lh, PositionImpl
firstPosition, PositionImpl lastPosition,
+ int numberOfEntries, boolean
shouldCacheEntry,
+ final ReadEntriesCallback
originalCallback, Object ctx,
+ InflightReadsLimiter.Handle
handle, long estimatedReadSize) {
+ if (!handle.success()) {
+ String message = String.format(
+ "Couldn't acquire enough permits on the max reads in
flight limiter to read from ledger "
+ + "%d, %s, estimated read size %d bytes for %d
entries (check "
+ + "managedLedgerMaxReadsInFlightSizeInMB, "
+ +
"managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis and "
+ +
"managedLedgerMaxReadsInFlightPermitsAcquireQueueSize)", lh.getId(), getName(),
+ estimatedReadSize, numberOfEntries);
+ log.error(message);
+ originalCallback.readEntriesFailed(new
ManagedLedgerException.TooManyRequestsException(message), ctx);
return;
}
+ InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter();
+ ReadEntriesCallback wrappedCallback = new ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object ctx2) {
+ if (!entries.isEmpty()) {
+ // release permits only when entries have been handled
+ AtomicInteger remainingCount = new
AtomicInteger(entries.size());
+ for (Entry entry : entries) {
+ ((EntryImpl) entry).onDeallocate(() -> {
+ if (remainingCount.decrementAndGet() <= 0) {
+ pendingReadsLimiter.release(handle);
+ }
+ });
+ }
+ } else {
+ pendingReadsLimiter.release(handle);
+ }
+ originalCallback.readEntriesComplete(entries, ctx2);
+ }
- final long ledgerId = lh.getId();
- final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
- final PositionImpl firstPosition = PositionImpl.get(lh.getId(),
firstEntry);
- final PositionImpl lastPosition = PositionImpl.get(lh.getId(),
lastEntry);
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception,
Object ctx2) {
+ pendingReadsLimiter.release(handle);
+ originalCallback.readEntriesFailed(exception, ctx2);
+ }
+ };
+ doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition,
numberOfEntries, shouldCacheEntry,
+ wrappedCallback, ctx);
+ }
- if (log.isDebugEnabled()) {
- log.debug("[{}] Reading entries range ledger {}: {} to {}",
ml.getName(), ledgerId, firstEntry, lastEntry);
+ void doAsyncReadEntriesByPosition(ReadHandle lh, PositionImpl
firstPosition, PositionImpl lastPosition,
+ int numberOfEntries, boolean
shouldCacheEntry, final ReadEntriesCallback callback,
+ Object ctx) {
+ Collection<EntryImpl> cachedEntries;
+ if (firstPosition.compareTo(lastPosition) == 0) {
+ EntryImpl cachedEntry = entries.get(firstPosition);
+ if (cachedEntry == null) {
+ cachedEntries = Collections.emptyList();
+ } else {
+ cachedEntries = Collections.singleton(cachedEntry);
+ }
+ } else {
+ cachedEntries = entries.getRange(firstPosition, lastPosition);
}
- Collection<EntryImpl> cachedEntries = entries.getRange(firstPosition,
lastPosition);
-
- if (cachedEntries.size() == entriesToRead) {
+ if (cachedEntries.size() == numberOfEntries) {
long totalCachedSize = 0;
- final List<EntryImpl> entriesToReturn =
Lists.newArrayListWithExpectedSize(entriesToRead);
+ final List<Entry> entriesToReturn = new
ArrayList<>(numberOfEntries);
// All entries found in cache
for (EntryImpl entry : cachedEntries) {
@@ -336,11 +400,11 @@ public class RangeEntryCacheImpl implements EntryCache {
manager.mlFactoryMBean.recordCacheHits(entriesToReturn.size(),
totalCachedSize);
if (log.isDebugEnabled()) {
- log.debug("[{}] Ledger {} -- Found in cache entries: {}-{}",
ml.getName(), ledgerId, firstEntry,
- lastEntry);
+ log.debug("[{}] Cache hit for {} entries in range {} to {}",
ml.getName(), numberOfEntries,
+ firstPosition, lastPosition);
}
- callback.readEntriesComplete((List) entriesToReturn, ctx);
+ callback.readEntriesComplete(entriesToReturn, ctx);
} else {
if (!cachedEntries.isEmpty()) {
@@ -348,77 +412,24 @@ public class RangeEntryCacheImpl implements EntryCache {
}
// Read all the entries from bookkeeper
- pendingReadsManager.readEntries(lh, firstEntry, lastEntry,
+ pendingReadsManager.readEntries(lh, firstPosition.getEntryId(),
lastPosition.getEntryId(),
shouldCacheEntry, callback, ctx);
-
}
}
- private AsyncCallbacks.ReadEntriesCallback
handlePendingReadsLimits(ReadHandle lh,
- long
firstEntry, long lastEntry,
- boolean
shouldCacheEntry,
-
AsyncCallbacks.ReadEntriesCallback originalCallback,
- Object ctx,
InflightReadsLimiter.Handle handle) {
- InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter();
- if (pendingReadsLimiter.isDisabled()) {
- return originalCallback;
+ @VisibleForTesting
+ public long getEstimatedEntrySize() {
+ long estimatedEntrySize = getAvgEntrySize();
+ if (estimatedEntrySize == 0) {
+ estimatedEntrySize = DEFAULT_ESTIMATED_ENTRY_SIZE;
}
- long estimatedReadSize = (1 + lastEntry - firstEntry)
- * (estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
- final AsyncCallbacks.ReadEntriesCallback callback;
- InflightReadsLimiter.Handle newHandle =
pendingReadsLimiter.acquire(estimatedReadSize, handle);
- if (!newHandle.success) {
- long now = System.currentTimeMillis();
- if (now - newHandle.creationTime > readEntryTimeoutMillis) {
- String message = "Time-out elapsed while acquiring enough
permits "
- + "on the memory limiter to read from ledger "
- + lh.getId()
- + ", " + getName()
- + ", estimated read size " + estimatedReadSize + "
bytes"
- + " for " + (1 + lastEntry - firstEntry)
- + " entries (check
managedLedgerMaxReadsInFlightSizeInMB)";
- log.error(message);
- pendingReadsLimiter.release(newHandle);
- originalCallback.readEntriesFailed(
- new
ManagedLedgerException.TooManyRequestsException(message), ctx);
- return null;
- }
- ml.getExecutor().execute(() -> {
- asyncReadEntry0WithLimits(lh, firstEntry, lastEntry,
shouldCacheEntry,
- originalCallback, ctx, newHandle, true);
- });
- return null;
- } else {
- callback = new AsyncCallbacks.ReadEntriesCallback() {
-
- @Override
- public void readEntriesComplete(List<Entry> entries, Object
ctx) {
- if (!entries.isEmpty()) {
- long size = entries.get(0).getLength();
- estimatedEntrySize = size;
-
- AtomicInteger remainingCount = new
AtomicInteger(entries.size());
- for (Entry entry : entries) {
- ((EntryImpl) entry).onDeallocate(() -> {
- if (remainingCount.decrementAndGet() <= 0) {
- pendingReadsLimiter.release(newHandle);
- }
- });
- }
- } else {
- pendingReadsLimiter.release(newHandle);
- }
- originalCallback.readEntriesComplete(entries, ctx);
- }
+ return estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+ }
- @Override
- public void readEntriesFailed(ManagedLedgerException
exception, Object ctx) {
- pendingReadsLimiter.release(newHandle);
- originalCallback.readEntriesFailed(exception, ctx);
- }
- };
- }
- return callback;
+ private long getAvgEntrySize() {
+ long totalAddedEntriesCount = this.totalAddedEntriesCount.sum();
+ long totalAddedEntriesSize = this.totalAddedEntriesSize.sum();
+ return totalAddedEntriesCount != 0 ? totalAddedEntriesSize /
totalAddedEntriesCount : 0;
}
/**
@@ -441,8 +452,7 @@ public class RangeEntryCacheImpl implements EntryCache {
try {
// We got the entries, we need to transform
them to a List<> type
long totalSize = 0;
- final List<EntryImpl> entriesToReturn =
-
Lists.newArrayListWithExpectedSize(entriesToRead);
+ final List<EntryImpl> entriesToReturn = new
ArrayList<>(entriesToRead);
for (LedgerEntry e : ledgerEntries) {
EntryImpl entry =
RangeEntryCacheManagerImpl.create(e, interceptor);
entriesToReturn.add(entry);
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
index d5a3019855c..4fff47df822 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
@@ -27,7 +27,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl;
@@ -56,12 +58,15 @@ public class RangeEntryCacheManagerImpl implements
EntryCacheManager {
private static final double evictionTriggerThresholdPercent = 0.98;
- public RangeEntryCacheManagerImpl(ManagedLedgerFactoryImpl factory) {
- this.maxSize = factory.getConfig().getMaxCacheSize();
- this.inflightReadsLimiter = new InflightReadsLimiter(
- factory.getConfig().getManagedLedgerMaxReadsInFlightSize());
+ public RangeEntryCacheManagerImpl(ManagedLedgerFactoryImpl factory,
OrderedScheduler scheduledExecutor) {
+ ManagedLedgerFactoryConfig config = factory.getConfig();
+ this.maxSize = config.getMaxCacheSize();
+ this.inflightReadsLimiter = new
InflightReadsLimiter(config.getManagedLedgerMaxReadsInFlightSize(),
+
config.getManagedLedgerMaxReadsInFlightPermitsAcquireQueueSize(),
+
config.getManagedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis(),
+ scheduledExecutor);
this.evictionTriggerThreshold = (long) (maxSize *
evictionTriggerThresholdPercent);
- this.cacheEvictionWatermark =
factory.getConfig().getCacheEvictionWatermark();
+ this.cacheEvictionWatermark = config.getCacheEvictionWatermark();
this.evictionPolicy = new EntryCacheDefaultEvictionPolicy();
this.mlFactory = factory;
this.mlFactoryMBean = factory.getMbean();
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
index b57dea6a5bb..48f0cf08ddf 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
@@ -40,7 +40,6 @@ import
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.awaitility.Awaitility;
-import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
@@ -142,8 +141,8 @@ public class InflightReadsLimiterIntegrationTest extends
MockedBookKeeperTestCas
SimpleReadEntriesCallback cb0 = new SimpleReadEntriesCallback();
entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, true, cb0, ctx);
cb0.entries.join();
- Long sizePerEntry1 = WhiteboxImpl.getInternalState(entryCache,
"estimatedEntrySize");
- Assert.assertEquals(sizePerEntry1, 1);
+ Long sizePerEntry1 = entryCache.getEstimatedEntrySize();
+ Assert.assertEquals(sizePerEntry1, 1 +
RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
Awaitility.await().untilAsserted(() -> {
long remainingBytes =limiter.getRemainingBytes();
Assert.assertEquals(remainingBytes, totalCapacity);
@@ -179,8 +178,8 @@ public class InflightReadsLimiterIntegrationTest extends
MockedBookKeeperTestCas
Thread.sleep(3000);
readCompleteSignal1.countDown();
cb1.entries.join();
- Long sizePerEntry2 = WhiteboxImpl.getInternalState(entryCache,
"estimatedEntrySize");
- Assert.assertEquals(sizePerEntry2, 1);
+ Long sizePerEntry2 = entryCache.getEstimatedEntrySize();
+ Assert.assertEquals(sizePerEntry2, 1 +
RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2,
1);
long remainingBytesExpected2 = totalCapacity - bytesAcquired2;
log.info("acquired : {}", bytesAcquired2);
@@ -192,8 +191,8 @@ public class InflightReadsLimiterIntegrationTest extends
MockedBookKeeperTestCas
readCompleteSignal2.countDown();
cb2.entries.join();
- Long sizePerEntry3 = WhiteboxImpl.getInternalState(entryCache,
"estimatedEntrySize");
- Assert.assertEquals(sizePerEntry3, 1);
+ Long sizePerEntry3 = entryCache.getEstimatedEntrySize();
+ Assert.assertEquals(sizePerEntry3, 1 +
RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
Awaitility.await().untilAsserted(() -> {
long remainingBytes = limiter.getRemainingBytes();
log.info("remainingBytes 2: {}", remainingBytes);
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 8620604e41b..d09bb3ac3f5 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -94,6 +94,7 @@ import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
+import org.apache.bookkeeper.client.PulsarMockReadHandleInterceptor;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
@@ -3129,17 +3130,26 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
ManagedLedgerConfig config = new
ManagedLedgerConfig().setReadEntryTimeoutSeconds(1);
ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("timeout_ledger_test", config);
- BookKeeper bk = mock(BookKeeper.class);
- doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(),
any(), any(), any(), any(), any());
+ Position position = ledger.addEntry("entry-1".getBytes());
+
+ // ensure that the read isn't cached
+ factory.getEntryCacheManager().clear();
+
+ bkc.setReadHandleInterceptor(new PulsarMockReadHandleInterceptor() {
+ @Override
+ public CompletableFuture<LedgerEntries> interceptReadAsync(long
ledgerId, long firstEntry, long lastEntry,
+
LedgerEntries entries) {
+ return CompletableFuture.supplyAsync(() -> {
+ return entries;
+ }, CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS));
+ }
+ });
+
AtomicReference<ManagedLedgerException> responseException1 = new
AtomicReference<>();
String ctxStr = "timeoutCtx";
- CompletableFuture<LedgerEntries> entriesFuture = new
CompletableFuture<>();
- ReadHandle ledgerHandle = mock(ReadHandle.class);
-
doReturn(entriesFuture).when(ledgerHandle).readAsync(PositionImpl.EARLIEST.getLedgerId(),
- PositionImpl.EARLIEST.getEntryId());
// (1) test read-timeout for: ManagedLedger.asyncReadEntry(..)
- ledger.asyncReadEntry(ledgerHandle, PositionImpl.EARLIEST, new
ReadEntryCallback() {
+ ledger.asyncReadEntry((PositionImpl) position, new ReadEntryCallback()
{
@Override
public void readEntryComplete(Entry entry, Object ctx) {
responseException1.set(null);
@@ -3151,18 +3161,20 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
responseException1.set(exception);
}
}, ctxStr);
- ledger.asyncCreateLedger(bk, config, null, (rc, lh, ctx) -> {},
Collections.emptyMap());
- retryStrategically((test) -> responseException1.get() != null, 5,
1000);
- assertNotNull(responseException1.get());
- assertTrue(responseException1.get().getMessage()
-
.startsWith(BKException.getMessage(BKException.Code.TimeoutException)));
- // (2) test read-timeout for: ManagedLedger.asyncReadEntry(..)
- AtomicReference<ManagedLedgerException> responseException2 = new
AtomicReference<>();
- PositionImpl readPositionRef = PositionImpl.EARLIEST;
- ManagedCursorImpl cursor = new ManagedCursorImpl(bk, ledger,
"cursor1");
- OpReadEntry opReadEntry = OpReadEntry.create(cursor, readPositionRef,
1, new ReadEntriesCallback() {
+ Awaitility.await().untilAsserted(() -> {
+ assertNotNull(responseException1.get());
+ assertTrue(responseException1.get().getMessage()
+
.startsWith(BKException.getMessage(BKException.Code.TimeoutException)));
+ });
+ // ensure that the read isn't cached
+ factory.getEntryCacheManager().clear();
+
+ // (2) test read-timeout for: ManagedCursor.asyncReadEntries(..)
+ AtomicReference<ManagedLedgerException> responseException2 = new
AtomicReference<>();
+ ManagedCursor cursor = ledger.openCursor("cursor1",
InitialPosition.Earliest);
+ cursor.asyncReadEntries(1, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
}
@@ -3172,16 +3184,13 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
assertEquals(ctxStr, (String) ctx);
responseException2.set(exception);
}
+ }, ctxStr, PositionImpl.LATEST);
- }, null, PositionImpl.LATEST, null);
- ledger.asyncReadEntry(ledgerHandle,
PositionImpl.EARLIEST.getEntryId(), PositionImpl.EARLIEST.getEntryId(),
- opReadEntry, ctxStr);
- retryStrategically((test) -> {
- return responseException2.get() != null;
- }, 5, 1000);
- assertNotNull(responseException2.get());
- assertTrue(responseException2.get().getMessage()
-
.startsWith(BKException.getMessage(BKException.Code.TimeoutException)));
+ Awaitility.await().untilAsserted(() -> {
+ assertNotNull(responseException2.get());
+ assertTrue(responseException2.get().getMessage()
+
.startsWith(BKException.getMessage(BKException.Code.TimeoutException)));
+ });
ledger.close();
}
@@ -3720,6 +3729,10 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
for (int i = 0; i < entries; i++) {
ledger.addEntry(String.valueOf(i).getBytes(Encoding));
}
+
+ // clear the cache to avoid flakiness
+ factory.getEntryCacheManager().clear();
+
List<Entry> entryList = cursor.readEntries(3);
assertEquals(entryList.size(), 3);
Awaitility.await().untilAsserted(() -> {
@@ -3788,10 +3801,16 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
for (int i = 0; i < entries; i++) {
ledger.addEntry(String.valueOf(i).getBytes(Encoding));
}
- List<Entry> entryList = cursor.readEntries(3);
- assertEquals(entryList.size(), 3);
- assertEquals(ledger.ledgers.size(), 4);
- assertEquals(ledger.ledgerCache.size(), 3);
+
+ // clear the cache to avoid flakiness
+ factory.getEntryCacheManager().clear();
+
+ final List<Entry> entryList = cursor.readEntries(3);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(entryList.size(), 3);
+ assertEquals(ledger.ledgers.size(), 4);
+ assertEquals(ledger.ledgerCache.size(), 3);
+ });
cursor.clearBacklog();
cursor2.clearBacklog();
ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
@@ -3800,11 +3819,17 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
assertEquals(ledger.ledgerCache.size(), 0);
});
+ // clear the cache to avoid flakiness
+ factory.getEntryCacheManager().clear();
+
// Verify the ReadHandle can be reopened.
ManagedCursor cursor3 = ledger.openCursor("test-cursor3",
InitialPosition.Earliest);
- entryList = cursor3.readEntries(3);
- assertEquals(entryList.size(), 3);
- assertEquals(ledger.ledgerCache.size(), 3);
+ final List<Entry> entryList2 = cursor3.readEntries(3);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(entryList2.size(), 3);
+ assertEquals(ledger.ledgerCache.size(), 3);
+ });
+
cursor3.clearBacklog();
ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
Awaitility.await().untilAsserted(() -> {
@@ -3812,7 +3837,6 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
assertEquals(ledger.ledgerCache.size(), 0);
});
-
cursor.close();
cursor2.close();
cursor3.close();
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
index 2b69581ca2c..3da8cdf517c 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
@@ -18,155 +18,492 @@
*/
package org.apache.bookkeeper.mledger.impl.cache;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
public class InflightReadsLimiterTest {
+ private static final int ACQUIRE_QUEUE_SIZE = 1000;
+ private static final int ACQUIRE_TIMEOUT_MILLIS = 500;
+
+ @DataProvider
+ private static Object[][] isDisabled() {
+ return new Object[][]{
+ {0, true},
+ {-1, true},
+ {1, false},
+ };
+ }
- @Test
- public void testDisabled() throws Exception {
-
- InflightReadsLimiter limiter = new InflightReadsLimiter(0);
- assertTrue(limiter.isDisabled());
-
- limiter = new InflightReadsLimiter(-1);
- assertTrue(limiter.isDisabled());
+ @DataProvider
+ private static Object[] booleanValues() {
+ return new Object[]{ true, false };
+ }
- limiter = new InflightReadsLimiter(1);
- assertFalse(limiter.isDisabled());
+ @Test(dataProvider = "isDisabled")
+ public void testDisabled(long maxReadsInFlightSize, boolean
shouldBeDisabled) throws Exception {
+ var limiter = new InflightReadsLimiter(maxReadsInFlightSize,
ACQUIRE_QUEUE_SIZE, ACQUIRE_TIMEOUT_MILLIS,
+ mock(ScheduledExecutorService.class));
+ assertThat(limiter.isDisabled()).isEqualTo(shouldBeDisabled);
}
@Test
public void testBasicAcquireRelease() throws Exception {
- InflightReadsLimiter limiter = new InflightReadsLimiter(100);
- assertEquals(100, limiter.getRemainingBytes());
- InflightReadsLimiter.Handle handle = limiter.acquire(100, null);
- assertEquals(0, limiter.getRemainingBytes());
- assertTrue(handle.success);
- assertEquals(handle.acquiredPermits, 100);
- assertEquals(1, handle.trials);
+ InflightReadsLimiter limiter =
+ new InflightReadsLimiter(100, ACQUIRE_QUEUE_SIZE,
ACQUIRE_TIMEOUT_MILLIS,
+ mock(ScheduledExecutorService.class));
+ assertThat(limiter.getRemainingBytes()).isEqualTo(100);
+
+ Optional<InflightReadsLimiter.Handle> optionalHandle =
limiter.acquire(100, null);
+ assertThat(limiter.getRemainingBytes()).isZero();
+ assertThat(optionalHandle).isPresent();
+ InflightReadsLimiter.Handle handle = optionalHandle.get();
+ assertThat(handle.success()).isTrue();
+ assertThat(handle.permits()).isEqualTo(100);
+
limiter.release(handle);
- assertEquals(100, limiter.getRemainingBytes());
+ assertThat(limiter.getRemainingBytes()).isEqualTo(100);
}
@Test
public void testNotEnoughPermits() throws Exception {
- InflightReadsLimiter limiter = new InflightReadsLimiter(100);
- assertEquals(100, limiter.getRemainingBytes());
- InflightReadsLimiter.Handle handle = limiter.acquire(100, null);
- assertEquals(0, limiter.getRemainingBytes());
- assertTrue(handle.success);
- assertEquals(handle.acquiredPermits, 100);
- assertEquals(1, handle.trials);
-
- InflightReadsLimiter.Handle handle2 = limiter.acquire(100, null);
- assertEquals(0, limiter.getRemainingBytes());
- assertFalse(handle2.success);
- assertEquals(handle2.acquiredPermits, 0);
- assertEquals(1, handle2.trials);
+ InflightReadsLimiter limiter =
+ new InflightReadsLimiter(100, ACQUIRE_QUEUE_SIZE,
ACQUIRE_TIMEOUT_MILLIS,
+ mock(ScheduledExecutorService.class));
+ assertThat(limiter.getRemainingBytes()).isEqualTo(100);
+ Optional<InflightReadsLimiter.Handle> optionalHandle =
limiter.acquire(100, null);
+ assertThat(limiter.getRemainingBytes()).isZero();
+ assertThat(optionalHandle).isPresent();
+ InflightReadsLimiter.Handle handle = optionalHandle.get();
+ assertThat(handle.success()).isTrue();
+ assertThat(handle.permits()).isEqualTo(100);
+
+ AtomicReference<InflightReadsLimiter.Handle> handle2Reference = new
AtomicReference<>();
+ Optional<InflightReadsLimiter.Handle> optionalHandle2 =
limiter.acquire(100, handle2Reference::set);
+ assertThat(limiter.getRemainingBytes()).isZero();
+ assertThat(optionalHandle2).isNotPresent();
limiter.release(handle);
- assertEquals(100, limiter.getRemainingBytes());
-
- handle2 = limiter.acquire(100, handle2);
- assertEquals(0, limiter.getRemainingBytes());
- assertTrue(handle2.success);
- assertEquals(handle2.acquiredPermits, 100);
- assertEquals(2, handle2.trials);
-
- limiter.release(handle2);
- assertEquals(100, limiter.getRemainingBytes());
+ assertThat(handle2Reference)
+ .hasValueSatisfying(h ->
+ assertThat(h.success()).isTrue());
+ limiter.release(handle2Reference.get());
+ assertThat(limiter.getRemainingBytes()).isEqualTo(100);
}
@Test
- public void testPartialAcquire() throws Exception {
- InflightReadsLimiter limiter = new InflightReadsLimiter(100);
- assertEquals(100, limiter.getRemainingBytes());
-
- InflightReadsLimiter.Handle handle = limiter.acquire(30, null);
- assertEquals(70, limiter.getRemainingBytes());
- assertTrue(handle.success);
- assertEquals(handle.acquiredPermits, 30);
- assertEquals(1, handle.trials);
-
- InflightReadsLimiter.Handle handle2 = limiter.acquire(100, null);
- assertEquals(0, limiter.getRemainingBytes());
- assertFalse(handle2.success);
- assertEquals(handle2.acquiredPermits, 70);
- assertEquals(1, handle2.trials);
-
- limiter.release(handle);
-
- handle2 = limiter.acquire(100, handle2);
- assertEquals(0, limiter.getRemainingBytes());
- assertTrue(handle2.success);
- assertEquals(handle2.acquiredPermits, 100);
- assertEquals(2, handle2.trials);
-
- limiter.release(handle2);
- assertEquals(100, limiter.getRemainingBytes());
-
+ public void testAcquireTimeout() throws Exception {
+ @Cleanup("shutdownNow")
+ ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
+ InflightReadsLimiter limiter =
+ new InflightReadsLimiter(100, ACQUIRE_QUEUE_SIZE,
ACQUIRE_TIMEOUT_MILLIS,
+ executor);
+ assertThat(limiter.getRemainingBytes()).isEqualTo(100);
+ limiter.acquire(100, null);
+
+ AtomicReference<InflightReadsLimiter.Handle> handle2Reference = new
AtomicReference<>();
+ Optional<InflightReadsLimiter.Handle> optionalHandle2 =
limiter.acquire(100, handle2Reference::set);
+ assertThat(optionalHandle2).isNotPresent();
+
+ Thread.sleep(ACQUIRE_TIMEOUT_MILLIS + 100);
+
+ assertThat(handle2Reference).hasValueSatisfying(h ->
assertThat(h.success()).isFalse());
}
@Test
- public void testTooManyTrials() throws Exception {
- InflightReadsLimiter limiter = new InflightReadsLimiter(100);
- assertEquals(100, limiter.getRemainingBytes());
-
- InflightReadsLimiter.Handle handle = limiter.acquire(30, null);
- assertEquals(70, limiter.getRemainingBytes());
- assertTrue(handle.success);
- assertEquals(handle.acquiredPermits, 30);
- assertEquals(1, handle.trials);
-
- InflightReadsLimiter.Handle handle2 = limiter.acquire(100, null);
- assertEquals(0, limiter.getRemainingBytes());
- assertFalse(handle2.success);
- assertEquals(handle2.acquiredPermits, 70);
- assertEquals(1, handle2.trials);
-
- handle2 = limiter.acquire(100, handle2);
- assertEquals(0, limiter.getRemainingBytes());
- assertFalse(handle2.success);
- assertEquals(handle2.acquiredPermits, 70);
- assertEquals(2, handle2.trials);
-
- handle2 = limiter.acquire(100, handle2);
- assertEquals(0, limiter.getRemainingBytes());
- assertFalse(handle2.success);
- assertEquals(handle2.acquiredPermits, 70);
- assertEquals(3, handle2.trials);
-
- handle2 = limiter.acquire(100, handle2);
- assertEquals(0, limiter.getRemainingBytes());
- assertFalse(handle2.success);
- assertEquals(handle2.acquiredPermits, 70);
- assertEquals(4, handle2.trials);
-
- // too many trials, start from scratch
- handle2 = limiter.acquire(100, handle2);
- assertEquals(70, limiter.getRemainingBytes());
- assertFalse(handle2.success);
- assertEquals(handle2.acquiredPermits, 0);
- assertEquals(1, handle2.trials);
+ public void testMultipleQueuedEntriesWithExceptionInFirstCallback() throws
Exception {
+ @Cleanup("shutdownNow")
+ ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
+ InflightReadsLimiter limiter =
+ new InflightReadsLimiter(100, ACQUIRE_QUEUE_SIZE,
ACQUIRE_TIMEOUT_MILLIS,
+ executor);
+ assertThat(limiter.getRemainingBytes())
+ .as("Initial remaining bytes should be 100")
+ .isEqualTo(100);
+
+ // Acquire the initial permits
+ Optional<InflightReadsLimiter.Handle> handle1 = limiter.acquire(100,
null);
+ assertThat(handle1)
+ .as("Initial handle should be present")
+ .isPresent();
+ assertThat(limiter.getRemainingBytes())
+ .as("Remaining bytes should be 0 after acquiring 100 permits")
+ .isEqualTo(0);
+
+ // Queue the first handle with a callback that throws an exception
+ AtomicReference<InflightReadsLimiter.Handle> handle2Reference = new
AtomicReference<>();
+ Optional<InflightReadsLimiter.Handle> handle2 = limiter.acquire(50,
handle -> {
+ handle2Reference.set(handle);
+ throw new RuntimeException("Callback exception");
+ });
+ assertThat(handle2)
+ .as("Second handle should not be present")
+ .isNotPresent();
+ assertThat(limiter.getRemainingBytes())
+ .as("Remaining bytes should still be 0 after failed
acquisition")
+ .isEqualTo(0);
+
+ // Queue the second handle with a successful callback
+ AtomicReference<InflightReadsLimiter.Handle> handle3Reference = new
AtomicReference<>();
+ Optional<InflightReadsLimiter.Handle> handle3 = limiter.acquire(50,
handle3Reference::set);
+ assertThat(handle3)
+ .as("Third handle should not be present as queue is full")
+ .isNotPresent();
+ assertThat(limiter.getRemainingBytes())
+ .as("Remaining bytes should still be 0")
+ .isEqualTo(0);
+
+ // Release the initial handle to trigger the queued callbacks
+ limiter.release(handle1.get());
+
+ // Verify the first callback threw an exception but the second
callback was handled successfully
+ assertThat(handle2Reference)
+ .as("Handle2 should have been set in the callback despite the
exception")
+ .hasValueSatisfying(handle -> assertThat(handle.success())
+ .as("Handle2 should be marked as successful")
+ .isTrue());
+ assertThat(handle3Reference)
+ .as("Handle3 should have been set successfully")
+ .hasValueSatisfying(handle -> assertThat(handle.success())
+ .as("Handle3 should be marked as successful")
+ .isTrue());
+ assertThat(limiter.getRemainingBytes())
+ .as("Remaining bytes should still be 0 after first releases
are acquired")
+ .isEqualTo(0);
+
+ // Release the second handle
+ limiter.release(handle3Reference.get());
+ assertThat(limiter.getRemainingBytes())
+ .as("Remaining bytes should be 50 after releasing handle3")
+ .isEqualTo(50);
+
+ // Release the third handle
+ limiter.release(handle3Reference.get());
+ assertThat(limiter.getRemainingBytes())
+ .as("All bytes should be released, so remaining bytes should
be 100")
+ .isEqualTo(100);
+ }
- limiter.release(handle);
+ @Test
+ public void
testMultipleQueuedEntriesWithTimeoutAndExceptionInFirstCallback() throws
Exception {
+ @Cleanup("shutdownNow")
+ ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
+ InflightReadsLimiter limiter =
+ new InflightReadsLimiter(100, ACQUIRE_QUEUE_SIZE,
ACQUIRE_TIMEOUT_MILLIS,
+ executor);
+ assertThat(limiter.getRemainingBytes())
+ .as("Initial remaining bytes should be 100")
+ .isEqualTo(100);
+
+ // Acquire the initial permits
+ Optional<InflightReadsLimiter.Handle> handle1 = limiter.acquire(100,
null);
+ assertThat(handle1)
+ .as("The first handle should be present after acquiring 100
permits")
+ .isPresent();
+ assertThat(limiter.getRemainingBytes())
+ .as("Remaining bytes should be 0 after acquiring all permits")
+ .isEqualTo(0);
+
+ // Queue the first handle with a callback that times out and throws an
exception
+ AtomicReference<InflightReadsLimiter.Handle> handle2Reference = new
AtomicReference<>();
+ Optional<InflightReadsLimiter.Handle> handle2 = limiter.acquire(50,
handle -> {
+ handle2Reference.set(handle);
+ throw new RuntimeException("Callback exception on timeout");
+ });
+ assertThat(handle2)
+ .as("The second handle should not be present as the callback
throws an exception")
+ .isNotPresent();
+ assertThat(limiter.getRemainingBytes())
+ .as("Remaining bytes should still be 0 after failed
acquisition")
+ .isEqualTo(0);
+
+ // Introduce a delay to differentiate operations between queued entries
+ Thread.sleep(50);
+
+ // Queue the second handle with a successful callback
+ AtomicReference<InflightReadsLimiter.Handle> handle3Reference = new
AtomicReference<>();
+ Optional<InflightReadsLimiter.Handle> handle3 = limiter.acquire(50,
handle3Reference::set);
+ assertThat(handle3)
+ .as("The third handle should not be present as permits are
still unavailable")
+ .isNotPresent();
+ assertThat(limiter.getRemainingBytes())
+ .as("Remaining bytes should still be 0 after failed
acquisition attempt")
+ .isEqualTo(0);
+
+ // Wait for the timeout to occur
+ Thread.sleep(ACQUIRE_TIMEOUT_MILLIS + 100);
+
+ // Verify the first callback timed out and threw an exception, and the
second callback was handled
+ assertThat(handle2Reference)
+ .as("Handle2 should have been set in the callback despite the
exception")
+ .hasValueSatisfying(handle -> assertThat(handle.success())
+ .as("Handle2 should be marked as unsuccessful due to a
timeout")
+ .isFalse());
+ assertThat(handle3Reference)
+ .as("Handle3 should have been set in the callback after the
permits became available")
+ .hasValueSatisfying(handle -> assertThat(handle.success())
+ .as("Handle3 should be marked as unsuccessful due to a
timeout")
+ .isFalse());
+ assertThat(limiter.getRemainingBytes())
+ .as("Remaining bytes should still be 0 as no permits were
released")
+ .isEqualTo(0);
+
+ // Release the first handle
+ limiter.release(handle1.get());
+ assertThat(limiter.getRemainingBytes())
+ .as("Remaining bytes should be fully restored to 100 after
releasing all permits")
+ .isEqualTo(100);
+ }
- handle2 = limiter.acquire(100, handle2);
- assertEquals(0, limiter.getRemainingBytes());
- assertTrue(handle2.success);
- assertEquals(handle2.acquiredPermits, 100);
- assertEquals(2, handle2.trials);
+ @Test
+ public void
testMultipleQueuedEntriesWithTimeoutsThatAreTimedOutWhenPermitsAreAvailable()
throws Exception {
+ // Use a mock executor to simulate scenarios where timed out queued
handles are processed when permits become
+ // available
+ ScheduledExecutorService executor =
mock(ScheduledExecutorService.class);
+ InflightReadsLimiter limiter =
+ new InflightReadsLimiter(100, ACQUIRE_QUEUE_SIZE,
ACQUIRE_TIMEOUT_MILLIS,
+ executor);
+ assertThat(limiter.getRemainingBytes())
+ .as("Initial remaining bytes should be 100")
+ .isEqualTo(100);
+
+ // Acquire the initial permits
+ Optional<InflightReadsLimiter.Handle> handle1 = limiter.acquire(100,
null);
+ assertThat(handle1)
+ .as("The first handle should be present after acquiring 100
permits")
+ .isPresent();
+ assertThat(limiter.getRemainingBytes())
+ .as("Remaining bytes should be 0 after acquiring all permits")
+ .isEqualTo(0);
+
+ // Queue the first handle
+ AtomicReference<InflightReadsLimiter.Handle> handle2Reference = new
AtomicReference<>();
+ Optional<InflightReadsLimiter.Handle> handle2 = limiter.acquire(50,
handle2Reference::set);
+ assertThat(handle2)
+ .as("The second handle should not be present as permits are
unavailable")
+ .isNotPresent();
+ assertThat(limiter.getRemainingBytes())
+ .as("Remaining bytes should still be 0 after failed
acquisition attempt for handle2")
+ .isEqualTo(0);
+
+ // Queue the second handle
+ AtomicReference<InflightReadsLimiter.Handle> handle3Reference = new
AtomicReference<>();
+ Optional<InflightReadsLimiter.Handle> handle3 = limiter.acquire(50,
handle3Reference::set);
+ assertThat(handle3)
+ .as("The third handle should not be present as permits are
unavailable")
+ .isNotPresent();
+ assertThat(limiter.getRemainingBytes())
+ .as("Remaining bytes should still be 0 after failed
acquisition attempt for handle3")
+ .isEqualTo(0);
+
+ // Wait for the timeout to occur
+ Thread.sleep(ACQUIRE_TIMEOUT_MILLIS + 100);
+
+ // Queue another handle
+ AtomicReference<InflightReadsLimiter.Handle> handle4Reference = new
AtomicReference<>();
+ Optional<InflightReadsLimiter.Handle> handle4 = limiter.acquire(50,
handle4Reference::set);
+ assertThat(handle4)
+ .as("The fourth handle should not be present because permits
are unavailable")
+ .isNotPresent();
+ assertThat(limiter.getRemainingBytes())
+ .as("Remaining bytes should still be 0 after failed
acquisition attempt for handle4")
+ .isEqualTo(0);
+
+ // Queue another handle
+ AtomicReference<InflightReadsLimiter.Handle> handle5Reference = new
AtomicReference<>();
+ Optional<InflightReadsLimiter.Handle> handle5 = limiter.acquire(100,
handle5Reference::set);
+ assertThat(handle5)
+ .as("The fifth handle should not be present as permits are
unavailable")
+ .isNotPresent();
+ assertThat(limiter.getRemainingBytes())
+ .as("Remaining bytes should still be 0 after failed
acquisition attempt for handle5")
+ .isEqualTo(0);
+
+ // Release the first handle
+ limiter.release(handle1.get());
+
+ assertThat(handle2Reference)
+ .as("Handle2 should have been set in the callback and marked
unsuccessful")
+ .hasValueSatisfying(handle ->
assertThat(handle.success()).isFalse());
+
+ assertThat(handle3Reference)
+ .as("Handle3 should have been set in the callback and marked
unsuccessful")
+ .hasValueSatisfying(handle ->
assertThat(handle.success()).isFalse());
+
+ assertThat(handle4Reference)
+ .as("Handle4 should have been set in the callback and marked
successful")
+ .hasValueSatisfying(handle ->
assertThat(handle.success()).isTrue());
+
+ assertThat(limiter.getRemainingBytes())
+ .as("Remaining bytes should be 50 after releasing handle4")
+ .isEqualTo(50);
+
+ limiter.release(handle4Reference.get());
+
+ assertThat(handle5Reference)
+ .as("Handle5 should have been set in the callback and marked
successful")
+ .hasValueSatisfying(handle ->
assertThat(handle.success()).isTrue());
+
+ limiter.release(handle5Reference.get());
+
+ assertThat(limiter.getRemainingBytes())
+ .as("All bytes should be released, so remaining bytes should
be back to 100")
+ .isEqualTo(100);
+ }
- limiter.release(handle2);
- assertEquals(100, limiter.getRemainingBytes());
+ @Test
+ public void testQueueSizeLimitReached() throws Exception {
+ @Cleanup("shutdownNow")
+ ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
+
+ // Minimum queue size is 4.
+ final int queueSizeLimit = 4;
+ InflightReadsLimiter limiter =
+ new InflightReadsLimiter(100, queueSizeLimit,
ACQUIRE_TIMEOUT_MILLIS, executor);
+
+ assertThat(limiter.getRemainingBytes())
+ .as("Initial remaining bytes should be 100")
+ .isEqualTo(100);
+
+ // Acquire all available permits (consume 100 bytes)
+ Optional<InflightReadsLimiter.Handle> handle1 = limiter.acquire(100,
null);
+ assertThat(handle1)
+ .as("The first handle should be present after acquiring all
available permits")
+ .isPresent()
+ .hasValueSatisfying(handle ->
assertThat(handle.success()).isTrue());
+ assertThat(limiter.getRemainingBytes())
+ .as("Remaining bytes should be zero after acquiring all
permits")
+ .isEqualTo(0);
+
+ // Queue up to the limit (4 requests)
+ AtomicReference<InflightReadsLimiter.Handle> handle2Reference = new
AtomicReference<>();
+ assertThat(limiter.acquire(50, handle2Reference::set)).isNotPresent();
+
+ AtomicReference<InflightReadsLimiter.Handle> handle3Reference = new
AtomicReference<>();
+ assertThat(limiter.acquire(50, handle3Reference::set)).isNotPresent();
+
+ AtomicReference<InflightReadsLimiter.Handle> handle4Reference = new
AtomicReference<>();
+ assertThat(limiter.acquire(50, handle4Reference::set)).isNotPresent();
+
+ AtomicReference<InflightReadsLimiter.Handle> handle5Reference = new
AtomicReference<>();
+ assertThat(limiter.acquire(50, handle5Reference::set)).isNotPresent();
+
+ // Attempt to add one more request, which should fail as the queue is
full
+ Optional<InflightReadsLimiter.Handle> handle6 = limiter.acquire(50,
null);
+ assertThat(handle6)
+ .as("The sixth handle should not be successfull since the
queue is full")
+ .hasValueSatisfying(handle ->
assertThat(handle.success()).isFalse());
+ }
+ @Test(dataProvider = "booleanValues")
+ public void testAcquireExceedingMaxReadsInFlightSize(boolean firstInQueue)
throws Exception {
+ @Cleanup("shutdownNow")
+ ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
+
+ long maxReadsInFlightSize = 100;
+ InflightReadsLimiter limiter =
+ new InflightReadsLimiter(maxReadsInFlightSize,
ACQUIRE_QUEUE_SIZE, ACQUIRE_TIMEOUT_MILLIS, executor);
+
+ // Initial state
+ assertThat(limiter.getRemainingBytes())
+ .as("Initial remaining bytes should match
maxReadsInFlightSize")
+ .isEqualTo(maxReadsInFlightSize);
+
+ // Acquire all permits (consume 100 bytes)
+ Optional<InflightReadsLimiter.Handle> handle1 = limiter.acquire(100,
null);
+ assertThat(handle1)
+ .as("The first handle should be present")
+ .isPresent();
+ assertThat(limiter.getRemainingBytes())
+ .as("Remaining bytes should be zero after acquiring all
permits")
+ .isEqualTo(0);
+
+
+ AtomicReference<InflightReadsLimiter.Handle> handle2Reference = new
AtomicReference<>();
+
+ if (!firstInQueue) {
+ Optional<InflightReadsLimiter.Handle> handle2 =
limiter.acquire(50, handle2Reference::set);
+ assertThat(handle2)
+ .as("The second handle should not be present as remaining
permits are zero")
+ .isNotPresent();
+ }
+
+ // Attempt to acquire more than maxReadsInFlightSize while all permits
are in use
+ AtomicReference<InflightReadsLimiter.Handle>
handleExceedingMaxReference = new AtomicReference<>();
+ Optional<InflightReadsLimiter.Handle> handleExceedingMaxOptional =
+ limiter.acquire(200, handleExceedingMaxReference::set);
+ assertThat(handleExceedingMaxOptional)
+ .as("The second handle should not be present as remaining
permits are zero")
+ .isNotPresent();
+
+ // Release handle1 permits
+ limiter.release(handle1.get());
+
+ if (!firstInQueue) {
+ assertThat(handle2Reference)
+ .as("Handle2 should have been set in the callback and
marked successful")
+ .hasValueSatisfying(handle -> {
+ assertThat(handle.success()).isTrue();
+ assertThat(handle.permits()).isEqualTo(50);
+ });
+ limiter.release(handle2Reference.get());
+ }
+
+ assertThat(handleExceedingMaxReference)
+ .as("Handle2 should have been set in the callback and marked
successful")
+ .hasValueSatisfying(handle -> {
+ assertThat(handle.success()).isTrue();
+
assertThat(handle.permits()).isEqualTo(maxReadsInFlightSize);
+ });
+
+ limiter.release(handleExceedingMaxReference.get());
+
+ assertThat(limiter.getRemainingBytes())
+ .as("Remaining bytes should be fully replenished after
releasing all permits")
+ .isEqualTo(maxReadsInFlightSize);
}
+ @Test
+ public void testAcquireExceedingMaxReadsWhenAllPermitsAvailable() throws
Exception {
+ @Cleanup("shutdownNow")
+ ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
+
+ long maxReadsInFlightSize = 100;
+ InflightReadsLimiter limiter =
+ new InflightReadsLimiter(maxReadsInFlightSize,
ACQUIRE_QUEUE_SIZE, ACQUIRE_TIMEOUT_MILLIS, executor);
+
+ // Initial state
+ assertThat(limiter.getRemainingBytes())
+ .as("Initial remaining bytes should match
maxReadsInFlightSize")
+ .isEqualTo(maxReadsInFlightSize);
+
+ // Acquire permits > maxReadsInFlightSize
+ Optional<InflightReadsLimiter.Handle> handleExceedingMaxOptional =
+ limiter.acquire(2 * maxReadsInFlightSize, null);
+ assertThat(handleExceedingMaxOptional)
+ .as("The handle for exceeding max permits should be present")
+ .hasValueSatisfying(handle -> {
+ assertThat(handle.success()).isTrue();
+
assertThat(handle.permits()).isEqualTo(maxReadsInFlightSize);
+ });
+ assertThat(limiter.getRemainingBytes())
+ .as("Remaining bytes should be zero after acquiring all
permits")
+ .isEqualTo(0);
+
+ // Release permits
+ limiter.release(handleExceedingMaxOptional.get());
+
+ assertThat(limiter.getRemainingBytes())
+ .as("Remaining bytes should be fully replenished after
releasing all permits")
+ .isEqualTo(maxReadsInFlightSize);
+ }
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
index 75e371fa97c..ea4c7189933 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.impl.cache;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.ReadHandle;
@@ -93,7 +94,8 @@ public class PendingReadsManagerTest {
config.setReadEntryTimeoutSeconds(10000);
when(rangeEntryCache.getName()).thenReturn("my-topic");
when(rangeEntryCache.getManagedLedgerConfig()).thenReturn(config);
- inflighReadsLimiter = new InflightReadsLimiter(0);
+ inflighReadsLimiter = new InflightReadsLimiter(0, 0, 0,
+ mock(ScheduledExecutorService.class));
when(rangeEntryCache.getPendingReadsLimiter()).thenReturn(inflighReadsLimiter);
pendingReadsManager = new PendingReadsManager(rangeEntryCache);
doAnswer(new Answer() {
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index ff4646820f2..d58679876ae 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1997,6 +1997,15 @@ public class ServiceConfiguration implements
PulsarConfiguration {
+ " Consumer Netty channel. Use O to disable")
private long managedLedgerMaxReadsInFlightSizeInMB = 0;
+ @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Maximum time to wait
for acquiring permits for max reads in "
+ + "flight when managedLedgerMaxReadsInFlightSizeInMB is set (>0)
and the limit is reached.")
+ private long managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis =
60000;
+
+ @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Maximum number of
reads that can be queued for acquiring "
+ + "permits for max reads in flight when
managedLedgerMaxReadsInFlightSizeInMB is set (>0) and the limit "
+ + "is reached.")
+ private int managedLedgerMaxReadsInFlightPermitsAcquireQueueSize = 50000;
+
@FieldContext(
category = CATEGORY_STORAGE_ML,
dynamic = true,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 0dc9f25fc33..805127dfcfb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -65,8 +65,21 @@ public class ManagedLedgerClientFactory implements
ManagedLedgerStorage {
managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis(
conf.getManagedLedgerCacheEvictionTimeThresholdMillis());
managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());
- managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightSize(
- conf.getManagedLedgerMaxReadsInFlightSizeInMB() * 1024L *
1024L);
+ long managedLedgerMaxReadsInFlightSizeBytes =
conf.getManagedLedgerMaxReadsInFlightSizeInMB() * 1024L * 1024L;
+ if (managedLedgerMaxReadsInFlightSizeBytes > 0 &&
conf.getDispatcherMaxReadSizeBytes() > 0
+ && managedLedgerMaxReadsInFlightSizeBytes <
conf.getDispatcherMaxReadSizeBytes()) {
+ log.warn("Invalid configuration for
managedLedgerMaxReadsInFlightSizeInMB: {}, "
+ + "dispatcherMaxReadSizeBytes: {}.
managedLedgerMaxReadsInFlightSizeInMB in bytes should "
+ + "be greater than dispatcherMaxReadSizeBytes. You
should set "
+ + "managedLedgerMaxReadsInFlightSizeInMB to at
least {}",
+ conf.getManagedLedgerMaxReadsInFlightSizeInMB(),
conf.getDispatcherMaxReadSizeBytes(),
+ (conf.getDispatcherMaxReadSizeBytes() / (1024L * 1024L)) +
1);
+ }
+
managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightSize(managedLedgerMaxReadsInFlightSizeBytes);
+
managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis(
+
conf.getManagedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis());
+
managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightPermitsAcquireQueueSize(
+
conf.getManagedLedgerMaxReadsInFlightPermitsAcquireQueueSize());
managedLedgerFactoryConfig.setPrometheusStatsLatencyRolloverSeconds(
conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution());