This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 14cd10d8ddf [improve][ml] Use lock-free queue in InflightReadsLimiter 
since there's no concurrent access  (#23962)
14cd10d8ddf is described below

commit 14cd10d8ddfd80319f21ca730fb48818f17d16dc
Author: guan46 <[email protected]>
AuthorDate: Tue Feb 25 19:08:33 2025 +0800

    [improve][ml] Use lock-free queue in InflightReadsLimiter since there's no 
concurrent access  (#23962)
    
    (cherry picked from commit 38a41e0d29192d0e29cc172cccf6c187cf7cb542)
---
 .../mledger/impl/cache/InflightReadsLimiter.java          | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
index 3a6bb3cd039..c2b9b82b694 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
@@ -20,13 +20,13 @@ package org.apache.bookkeeper.mledger.impl.cache;
 
 import com.google.common.annotations.VisibleForTesting;
 import io.prometheus.client.Gauge;
+import java.util.ArrayDeque;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import lombok.extern.slf4j.Slf4j;
-import org.jctools.queues.SpscArrayQueue;
 
 @Slf4j
 public class InflightReadsLimiter {
@@ -37,6 +37,7 @@ public class InflightReadsLimiter {
             .help("Estimated number of bytes retained by data read from 
storage or cache")
             .register();
 
+    private final int maxReadsInFlightAcquireQueueSize;
     private static final Gauge PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE = Gauge
             .build()
             .name("pulsar_ml_reads_available_inflight_bytes")
@@ -64,9 +65,10 @@ public class InflightReadsLimiter {
         this.remainingBytes = maxReadsInFlightSize;
         this.acquireTimeoutMillis = acquireTimeoutMillis;
         this.timeOutExecutor = timeOutExecutor;
+        this.maxReadsInFlightAcquireQueueSize = 
maxReadsInFlightAcquireQueueSize;
         if (maxReadsInFlightSize > 0) {
             enabled = true;
-            this.queuedHandles = new 
SpscArrayQueue<>(maxReadsInFlightAcquireQueueSize);
+            this.queuedHandles = new ArrayDeque<>();
         } else {
             enabled = false;
             this.queuedHandles = null;
@@ -129,13 +131,14 @@ public class InflightReadsLimiter {
             updateMetrics();
             return Optional.of(new Handle(maxReadsInFlightSize, 
handle.creationTime, true));
         } else {
-            if (queuedHandles.offer(new QueuedHandle(handle, callback))) {
-                scheduleTimeOutCheck(acquireTimeoutMillis);
-                return Optional.empty();
-            } else {
+            if (queuedHandles.size() >= maxReadsInFlightAcquireQueueSize) {
                 log.warn("Failed to queue handle for acquiring permits: {}, 
creationTime: {}, remainingBytes:{}",
                         permits, handle.creationTime, remainingBytes);
                 return Optional.of(new Handle(0, handle.creationTime, false));
+            } else {
+                queuedHandles.offer(new QueuedHandle(handle, callback));
+                scheduleTimeOutCheck(acquireTimeoutMillis);
+                return Optional.empty();
             }
         }
     }

Reply via email to