This is an automated email from the ASF dual-hosted git repository.

psiace pushed a commit to branch port-6618
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit 87e04ecd944f27e5b020ed42d8cad9ae29008e0e
Author: Chojan Shang <[email protected]>
AuthorDate: Mon Dec 22 15:45:02 2025 +0800

    feat(layers/concurrent-limit): accept custom semaphore without API break
    
    Signed-off-by: Chojan Shang <[email protected]>
---
 core/Cargo.lock                         |   1 +
 core/layers/concurrent-limit/Cargo.toml |   1 +
 core/layers/concurrent-limit/src/lib.rs | 288 ++++++++++++++++++++++++++------
 3 files changed, 239 insertions(+), 51 deletions(-)

diff --git a/core/Cargo.lock b/core/Cargo.lock
index c5ca7a3a8..74a15c15f 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -5770,6 +5770,7 @@ dependencies = [
  "http 1.4.0",
  "mea",
  "opendal-core",
+ "tokio",
 ]
 
 [[package]]
diff --git a/core/layers/concurrent-limit/Cargo.toml 
b/core/layers/concurrent-limit/Cargo.toml
index da80d3e45..d57f8a2a0 100644
--- a/core/layers/concurrent-limit/Cargo.toml
+++ b/core/layers/concurrent-limit/Cargo.toml
@@ -38,3 +38,4 @@ opendal-core = { path = "../../core", version = "0.55.0", 
default-features = fal
 
 [dev-dependencies]
 opendal-core = { path = "../../core", version = "0.55.0" }
+tokio = { workspace = true, features = ["macros", "rt", "time"] }
diff --git a/core/layers/concurrent-limit/src/lib.rs 
b/core/layers/concurrent-limit/src/lib.rs
index bde32113e..f4d73b68e 100644
--- a/core/layers/concurrent-limit/src/lib.rs
+++ b/core/layers/concurrent-limit/src/lib.rs
@@ -15,7 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::fmt::Debug;
+use std::any::Any;
+use std::future::Future;
 use std::pin::Pin;
 use std::sync::Arc;
 use std::task::Context;
@@ -29,6 +30,25 @@ use mea::semaphore::Semaphore;
 use opendal_core::raw::*;
 use opendal_core::*;
 
+/// ConcurrentLimitSemaphore abstracts a semaphore-like concurrency primitive
+/// that yields an owned permit released on drop.
+pub trait ConcurrentLimitSemaphore: Send + Sync + Any + Clone + Unpin + 
'static {
+    /// The owned permit type associated with the semaphore. Dropping it
+    /// must release the permit back to the semaphore.
+    type Permit: Send + Sync + 'static;
+
+    /// Acquire an owned permit asynchronously.
+    fn acquire(&self) -> impl Future<Output = Self::Permit> + MaybeSend;
+}
+
+impl ConcurrentLimitSemaphore for Arc<Semaphore> {
+    type Permit = OwnedSemaphorePermit;
+
+    async fn acquire(&self) -> Self::Permit {
+        self.clone().acquire_owned(1).await
+    }
+}
+
 /// Add concurrent request limit.
 ///
 /// # Notes
@@ -50,7 +70,6 @@ use opendal_core::*;
 /// # use opendal_core::services;
 /// # use opendal_core::Operator;
 /// # use opendal_core::Result;
-///
 /// # fn main() -> Result<()> {
 /// let _ = Operator::new(services::Memory::default())?
 ///     .layer(ConcurrentLimitLayer::new(1024))
@@ -66,7 +85,6 @@ use opendal_core::*;
 /// # use opendal_core::services;
 /// # use opendal_core::Operator;
 /// # use opendal_core::Result;
-///
 /// # fn main() -> Result<()> {
 /// let limit = ConcurrentLimitLayer::new(1024);
 ///
@@ -81,41 +99,67 @@ use opendal_core::*;
 /// # }
 /// ```
 #[derive(Clone)]
-pub struct ConcurrentLimitLayer {
-    operation_semaphore: Arc<Semaphore>,
-    http_semaphore: Option<Arc<Semaphore>>,
+pub struct ConcurrentLimitLayer<S: ConcurrentLimitSemaphore = Arc<Semaphore>> {
+    operation_semaphore: S,
+    http_semaphore: Option<S>,
 }
 
-impl ConcurrentLimitLayer {
-    /// Create a new ConcurrentLimitLayer will specify permits.
+impl ConcurrentLimitLayer<Arc<Semaphore>> {
+    /// Create a new `ConcurrentLimitLayer` with the specified number of
+    /// permits.
     ///
-    /// This permits will applied to all operations.
+    /// These permits will be applied to all operations.
     pub fn new(permits: usize) -> Self {
+        Self::with_semaphore(Arc::new(Semaphore::new(permits)))
+    }
+
+    /// Set a concurrent limit for HTTP requests.
+    ///
+    /// This convenience helper constructs a new semaphore with the specified
+    /// number of permits and calls 
[`ConcurrentLimitLayer::with_http_semaphore`].
+    /// Use [`ConcurrentLimitLayer::with_http_semaphore`] directly when reusing
+    /// a shared semaphore.
+    pub fn with_http_concurrent_limit(self, permits: usize) -> Self {
+        self.with_http_semaphore(Arc::new(Semaphore::new(permits)))
+    }
+}
+
+impl<S: ConcurrentLimitSemaphore> ConcurrentLimitLayer<S> {
+    /// Create a layer with any ConcurrentLimitSemaphore implementation.
+    ///
+    /// ```
+    /// # use std::sync::Arc;
+    /// # use mea::semaphore::Semaphore;
+    /// # use opendal_layer_concurrent_limit::ConcurrentLimitLayer;
+    /// let semaphore = Arc::new(Semaphore::new(1024));
+    /// let _layer = ConcurrentLimitLayer::with_semaphore(semaphore);
+    /// ```
+    pub fn with_semaphore(operation_semaphore: S) -> Self {
         Self {
-            operation_semaphore: Arc::new(Semaphore::new(permits)),
+            operation_semaphore,
             http_semaphore: None,
         }
     }
 
-    /// Set a concurrent limit for HTTP requests.
-    ///
-    /// This will limit the number of concurrent HTTP requests made by the
-    /// operator.
-    pub fn with_http_concurrent_limit(mut self, permits: usize) -> Self {
-        self.http_semaphore = Some(Arc::new(Semaphore::new(permits)));
+    /// Provide a custom HTTP concurrency semaphore instance.
+    pub fn with_http_semaphore(mut self, semaphore: S) -> Self {
+        self.http_semaphore = Some(semaphore);
         self
     }
 }
 
-impl<A: Access> Layer<A> for ConcurrentLimitLayer {
-    type LayeredAccess = ConcurrentLimitAccessor<A>;
+impl<A: Access, S: ConcurrentLimitSemaphore> Layer<A> for 
ConcurrentLimitLayer<S>
+where
+    S::Permit: Unpin,
+{
+    type LayeredAccess = ConcurrentLimitAccessor<A, S>;
 
     fn layer(&self, inner: A) -> Self::LayeredAccess {
         let info = inner.info();
 
         // Update http client with metrics http fetcher.
         info.update_http_client(|client| {
-            HttpClient::with(ConcurrentLimitHttpFetcher {
+            HttpClient::with(ConcurrentLimitHttpFetcher::<S> {
                 inner: client.into_inner(),
                 http_semaphore: self.http_semaphore.clone(),
             })
@@ -128,23 +172,26 @@ impl<A: Access> Layer<A> for ConcurrentLimitLayer {
     }
 }
 
-pub struct ConcurrentLimitHttpFetcher {
+pub struct ConcurrentLimitHttpFetcher<S: ConcurrentLimitSemaphore> {
     inner: HttpFetcher,
-    http_semaphore: Option<Arc<Semaphore>>,
+    http_semaphore: Option<S>,
 }
 
-impl HttpFetch for ConcurrentLimitHttpFetcher {
+impl<S: ConcurrentLimitSemaphore> HttpFetch for ConcurrentLimitHttpFetcher<S>
+where
+    S::Permit: Unpin,
+{
     async fn fetch(&self, req: http::Request<Buffer>) -> 
Result<http::Response<HttpBody>> {
         let Some(semaphore) = self.http_semaphore.clone() else {
             return self.inner.fetch(req).await;
         };
 
-        let permit = semaphore.acquire_owned(1).await;
+        let permit = semaphore.acquire().await;
 
         let resp = self.inner.fetch(req).await?;
         let (parts, body) = resp.into_parts();
         let body = body.map_inner(|s| {
-            Box::new(ConcurrentLimitStream {
+            Box::new(ConcurrentLimitStream::<_, S::Permit> {
                 inner: s,
                 _permit: permit,
             })
@@ -153,48 +200,62 @@ impl HttpFetch for ConcurrentLimitHttpFetcher {
     }
 }
 
-pub struct ConcurrentLimitStream<S> {
+pub struct ConcurrentLimitStream<S, P> {
     inner: S,
     // Hold on this permit until this reader has been dropped.
-    _permit: OwnedSemaphorePermit,
+    _permit: P,
 }
 
-impl<S> Stream for ConcurrentLimitStream<S>
+impl<S, P> Stream for ConcurrentLimitStream<S, P>
 where
     S: Stream<Item = Result<Buffer>> + Unpin + 'static,
+    P: Unpin,
 {
     type Item = Result<Buffer>;
 
-    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
-        self.inner.poll_next_unpin(cx)
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+        // Safe due to Unpin bounds on S and P (thus on Self).
+        let this = self.get_mut();
+        this.inner.poll_next_unpin(cx)
     }
 }
 
-#[derive(Debug, Clone)]
-pub struct ConcurrentLimitAccessor<A: Access> {
+#[derive(Clone)]
+pub struct ConcurrentLimitAccessor<A: Access, S: ConcurrentLimitSemaphore> {
     inner: A,
-    semaphore: Arc<Semaphore>,
+    semaphore: S,
 }
 
-impl<A: Access> LayeredAccess for ConcurrentLimitAccessor<A> {
+impl<A: Access, S: ConcurrentLimitSemaphore> std::fmt::Debug for 
ConcurrentLimitAccessor<A, S> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ConcurrentLimitAccessor")
+            .field("inner", &self.inner)
+            .finish_non_exhaustive()
+    }
+}
+
+impl<A: Access, S: ConcurrentLimitSemaphore> LayeredAccess for 
ConcurrentLimitAccessor<A, S>
+where
+    S::Permit: Unpin,
+{
     type Inner = A;
-    type Reader = ConcurrentLimitWrapper<A::Reader>;
-    type Writer = ConcurrentLimitWrapper<A::Writer>;
-    type Lister = ConcurrentLimitWrapper<A::Lister>;
-    type Deleter = ConcurrentLimitWrapper<A::Deleter>;
+    type Reader = ConcurrentLimitWrapper<A::Reader, S::Permit>;
+    type Writer = ConcurrentLimitWrapper<A::Writer, S::Permit>;
+    type Lister = ConcurrentLimitWrapper<A::Lister, S::Permit>;
+    type Deleter = ConcurrentLimitWrapper<A::Deleter, S::Permit>;
 
     fn inner(&self) -> &Self::Inner {
         &self.inner
     }
 
     async fn create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
-        let _permit = self.semaphore.acquire(1).await;
+        let _permit = self.semaphore.acquire().await;
 
         self.inner.create_dir(path, args).await
     }
 
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        let permit = self.semaphore.clone().acquire_owned(1).await;
+        let permit = self.semaphore.acquire().await;
 
         self.inner
             .read(path, args)
@@ -203,7 +264,7 @@ impl<A: Access> LayeredAccess for 
ConcurrentLimitAccessor<A> {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let permit = self.semaphore.clone().acquire_owned(1).await;
+        let permit = self.semaphore.acquire().await;
 
         self.inner
             .write(path, args)
@@ -212,13 +273,13 @@ impl<A: Access> LayeredAccess for 
ConcurrentLimitAccessor<A> {
     }
 
     async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
-        let _permit = self.semaphore.acquire(1).await;
+        let _permit = self.semaphore.acquire().await;
 
         self.inner.stat(path, args).await
     }
 
     async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
-        let permit = self.semaphore.clone().acquire_owned(1).await;
+        let permit = self.semaphore.acquire().await;
 
         self.inner
             .delete()
@@ -227,7 +288,7 @@ impl<A: Access> LayeredAccess for 
ConcurrentLimitAccessor<A> {
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
-        let permit = self.semaphore.clone().acquire_owned(1).await;
+        let permit = self.semaphore.acquire().await;
 
         self.inner
             .list(path, args)
@@ -236,15 +297,15 @@ impl<A: Access> LayeredAccess for 
ConcurrentLimitAccessor<A> {
     }
 }
 
-pub struct ConcurrentLimitWrapper<R> {
+pub struct ConcurrentLimitWrapper<R, P> {
     inner: R,
 
     // Hold on this permit until this reader has been dropped.
-    _permit: OwnedSemaphorePermit,
+    _permit: P,
 }
 
-impl<R> ConcurrentLimitWrapper<R> {
-    fn new(inner: R, permit: OwnedSemaphorePermit) -> Self {
+impl<R, P> ConcurrentLimitWrapper<R, P> {
+    fn new(inner: R, permit: P) -> Self {
         Self {
             inner,
             _permit: permit,
@@ -252,13 +313,13 @@ impl<R> ConcurrentLimitWrapper<R> {
     }
 }
 
-impl<R: oio::Read> oio::Read for ConcurrentLimitWrapper<R> {
+impl<R: oio::Read, P: Send + Sync + 'static + Unpin> oio::Read for 
ConcurrentLimitWrapper<R, P> {
     async fn read(&mut self) -> Result<Buffer> {
         self.inner.read().await
     }
 }
 
-impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
+impl<R: oio::Write, P: Send + Sync + 'static + Unpin> oio::Write for 
ConcurrentLimitWrapper<R, P> {
     async fn write(&mut self, bs: Buffer) -> Result<()> {
         self.inner.write(bs).await
     }
@@ -272,13 +333,15 @@ impl<R: oio::Write> oio::Write for 
ConcurrentLimitWrapper<R> {
     }
 }
 
-impl<R: oio::List> oio::List for ConcurrentLimitWrapper<R> {
+impl<R: oio::List, P: Send + Sync + 'static + Unpin> oio::List for 
ConcurrentLimitWrapper<R, P> {
     async fn next(&mut self) -> Result<Option<oio::Entry>> {
         self.inner.next().await
     }
 }
 
-impl<R: oio::Delete> oio::Delete for ConcurrentLimitWrapper<R> {
+impl<R: oio::Delete, P: Send + Sync + 'static + Unpin> oio::Delete
+    for ConcurrentLimitWrapper<R, P>
+{
     async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
         self.inner.delete(path, args).await
     }
@@ -287,3 +350,126 @@ impl<R: oio::Delete> oio::Delete for 
ConcurrentLimitWrapper<R> {
         self.inner.close().await
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use opendal_core::Operator;
+    use opendal_core::services;
+    use std::collections::VecDeque;
+    use std::sync::Arc;
+    use std::sync::Mutex as StdMutex;
+    use std::time::Duration;
+    use tokio::time::timeout;
+
+    use futures::channel::oneshot;
+
+    // A minimal non-mea semaphore for testing the trait abstraction.
+    #[derive(Clone)]
+    struct SimpleSemaphore {
+        state: Arc<StdMutex<SimpleState>>,
+    }
+
+    struct SimpleState {
+        available: usize,
+        waiters: VecDeque<oneshot::Sender<()>>,
+    }
+
+    impl SimpleSemaphore {
+        fn new(permits: usize) -> Self {
+            Self {
+                state: Arc::new(StdMutex::new(SimpleState {
+                    available: permits,
+                    waiters: VecDeque::new(),
+                })),
+            }
+        }
+    }
+
+    struct SimplePermit(Arc<StdMutex<SimpleState>>);
+
+    impl Drop for SimplePermit {
+        fn drop(&mut self) {
+            let mut st = self.0.lock().expect("mutex poisoned");
+            st.available += 1;
+            if let Some(tx) = st.waiters.pop_front() {
+                let _ = tx.send(());
+            }
+        }
+    }
+
+    impl ConcurrentLimitSemaphore for SimpleSemaphore {
+        type Permit = SimplePermit;
+
+        async fn acquire(&self) -> Self::Permit {
+            let state = self.state.clone();
+            loop {
+                let acquired = {
+                    let mut st = state.lock().expect("mutex poisoned");
+                    if st.available > 0 {
+                        st.available -= 1;
+                        true
+                    } else {
+                        false
+                    }
+                };
+                if acquired {
+                    return SimplePermit(state.clone());
+                }
+
+                let (tx, rx) = oneshot::channel();
+                {
+                    let mut st = state.lock().expect("mutex poisoned");
+                    st.waiters.push_back(tx);
+                }
+                let _ = rx.await;
+            }
+        }
+    }
+
+    #[tokio::test]
+    async fn operation_semaphore_can_be_shared() {
+        let semaphore = Arc::new(Semaphore::new(1));
+        let layer = ConcurrentLimitLayer::with_semaphore(semaphore.clone());
+
+        let permit = semaphore.clone().acquire_owned(1).await;
+
+        let op = Operator::new(services::Memory::default())
+            .expect("operator must build")
+            .layer(layer)
+            .finish();
+
+        let blocked = timeout(Duration::from_millis(50), op.stat("any")).await;
+        assert!(
+            blocked.is_err(),
+            "operation should be limited by shared semaphore"
+        );
+
+        drop(permit);
+
+        let completed = timeout(Duration::from_millis(50), 
op.stat("any")).await;
+        assert!(
+            completed.is_ok(),
+            "operation should proceed once permit is released"
+        );
+    }
+
+    #[tokio::test]
+    async fn custom_semaphore_is_honored() {
+        let custom = SimpleSemaphore::new(1);
+        let layer = ConcurrentLimitLayer::with_semaphore(custom);
+
+        let op = Operator::new(services::Memory::default())
+            .expect("operator must build")
+            .layer(layer)
+            .finish();
+
+        let _l = op.lister("").await.expect("list should start");
+
+        let blocked = timeout(Duration::from_millis(50), op.stat("any")).await;
+        assert!(
+            blocked.is_err(),
+            "operation should be limited by custom semaphore"
+        );
+    }
+}

Reply via email to