This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new c237cb52b04 [improve][ml] Use lock-free queue in InflightReadsLimiter
since there's no concurrent access (#23962)
c237cb52b04 is described below
commit c237cb52b042c142822fffd64a654d1004365d0f
Author: guan46 <[email protected]>
AuthorDate: Tue Feb 25 19:08:33 2025 +0800
[improve][ml] Use lock-free queue in InflightReadsLimiter since there's no
concurrent access (#23962)
(cherry picked from commit 38a41e0d29192d0e29cc172cccf6c187cf7cb542)
---
.../mledger/impl/cache/InflightReadsLimiter.java | 15 +++++++++------
1 file changed, 9 insertions(+), 6 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
index 3a6bb3cd039..c2b9b82b694 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
@@ -20,13 +20,13 @@ package org.apache.bookkeeper.mledger.impl.cache;
import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.Gauge;
+import java.util.ArrayDeque;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
-import org.jctools.queues.SpscArrayQueue;
@Slf4j
public class InflightReadsLimiter {
@@ -37,6 +37,7 @@ public class InflightReadsLimiter {
.help("Estimated number of bytes retained by data read from
storage or cache")
.register();
+ private final int maxReadsInFlightAcquireQueueSize;
private static final Gauge PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE = Gauge
.build()
.name("pulsar_ml_reads_available_inflight_bytes")
@@ -64,9 +65,10 @@ public class InflightReadsLimiter {
this.remainingBytes = maxReadsInFlightSize;
this.acquireTimeoutMillis = acquireTimeoutMillis;
this.timeOutExecutor = timeOutExecutor;
+ this.maxReadsInFlightAcquireQueueSize =
maxReadsInFlightAcquireQueueSize;
if (maxReadsInFlightSize > 0) {
enabled = true;
- this.queuedHandles = new
SpscArrayQueue<>(maxReadsInFlightAcquireQueueSize);
+ this.queuedHandles = new ArrayDeque<>();
} else {
enabled = false;
this.queuedHandles = null;
@@ -129,13 +131,14 @@ public class InflightReadsLimiter {
updateMetrics();
return Optional.of(new Handle(maxReadsInFlightSize,
handle.creationTime, true));
} else {
- if (queuedHandles.offer(new QueuedHandle(handle, callback))) {
- scheduleTimeOutCheck(acquireTimeoutMillis);
- return Optional.empty();
- } else {
+ if (queuedHandles.size() >= maxReadsInFlightAcquireQueueSize) {
log.warn("Failed to queue handle for acquiring permits: {},
creationTime: {}, remainingBytes:{}",
permits, handle.creationTime, remainingBytes);
return Optional.of(new Handle(0, handle.creationTime, false));
+ } else {
+ queuedHandles.offer(new QueuedHandle(handle, callback));
+ scheduleTimeOutCheck(acquireTimeoutMillis);
+ return Optional.empty();
}
}
}