This is an automated email from the ASF dual-hosted git repository.

psiace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new b23593ea3 feat(layers/concurrent-limit): accept custom semaphore 
without API break (#7082)
b23593ea3 is described below

commit b23593ea323609f1bc3edace3efd8bac19ce57dd
Author: Chojan Shang <[email protected]>
AuthorDate: Tue Jan 20 11:05:24 2026 +0800

    feat(layers/concurrent-limit): accept custom semaphore without API break 
(#7082)
    
    * feat(layers/concurrent-limit): accept custom semaphore without API break
    
    Signed-off-by: Chojan Shang <[email protected]>
    
    * refactor(layers/concurrent-limit): drop Any bound and refresh http test
    
    Signed-off-by: Chojan Shang <[email protected]>
    
    ---------
    
    Signed-off-by: Chojan Shang <[email protected]>
---
 core/Cargo.lock                         |   1 +
 core/layers/concurrent-limit/Cargo.toml |   1 +
 core/layers/concurrent-limit/src/lib.rs | 244 +++++++++++++++++++++++++-------
 3 files changed, 197 insertions(+), 49 deletions(-)

diff --git a/core/Cargo.lock b/core/Cargo.lock
index bd9e70adf..661025134 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -6116,6 +6116,7 @@ dependencies = [
  "http 1.4.0",
  "mea 0.6.0",
  "opendal-core",
+ "tokio",
 ]
 
 [[package]]
diff --git a/core/layers/concurrent-limit/Cargo.toml 
b/core/layers/concurrent-limit/Cargo.toml
index 0960ab099..eb9656389 100644
--- a/core/layers/concurrent-limit/Cargo.toml
+++ b/core/layers/concurrent-limit/Cargo.toml
@@ -38,3 +38,4 @@ opendal-core = { path = "../../core", version = "0.55.0", 
default-features = fal
 
 [dev-dependencies]
 opendal-core = { path = "../../core", version = "0.55.0" }
+tokio = { workspace = true, features = ["macros", "rt", "time"] }
diff --git a/core/layers/concurrent-limit/src/lib.rs 
b/core/layers/concurrent-limit/src/lib.rs
index 53c15b208..ce0ef012d 100644
--- a/core/layers/concurrent-limit/src/lib.rs
+++ b/core/layers/concurrent-limit/src/lib.rs
@@ -20,6 +20,7 @@
 #![cfg_attr(docsrs, feature(doc_cfg))]
 #![deny(missing_docs)]
 
+use std::future::Future;
 use std::pin::Pin;
 use std::sync::Arc;
 use std::task::Context;
@@ -32,6 +33,25 @@ use mea::semaphore::Semaphore;
 use opendal_core::raw::*;
 use opendal_core::*;
 
+/// ConcurrentLimitSemaphore abstracts a semaphore-like concurrency primitive
+/// that yields an owned permit released on drop.
+pub trait ConcurrentLimitSemaphore: Send + Sync + Clone + Unpin + 'static {
+    /// The owned permit type associated with the semaphore. Dropping it
+    /// must release the permit back to the semaphore.
+    type Permit: Send + Sync + 'static;
+
+    /// Acquire an owned permit asynchronously.
+    fn acquire(&self) -> impl Future<Output = Self::Permit> + MaybeSend;
+}
+
+impl ConcurrentLimitSemaphore for Arc<Semaphore> {
+    type Permit = OwnedSemaphorePermit;
+
+    async fn acquire(&self) -> Self::Permit {
+        self.clone().acquire_owned(1).await
+    }
+}
+
 /// Add concurrent request limit.
 ///
 /// # Notes
@@ -83,41 +103,67 @@ use opendal_core::*;
 /// # }
 /// ```
 #[derive(Clone)]
-pub struct ConcurrentLimitLayer {
-    operation_semaphore: Arc<Semaphore>,
-    http_semaphore: Option<Arc<Semaphore>>,
+pub struct ConcurrentLimitLayer<S: ConcurrentLimitSemaphore = Arc<Semaphore>> {
+    operation_semaphore: S,
+    http_semaphore: Option<S>,
 }
 
-impl ConcurrentLimitLayer {
-    /// Create a new ConcurrentLimitLayer will specify permits.
+impl ConcurrentLimitLayer<Arc<Semaphore>> {
+    /// Create a new `ConcurrentLimitLayer` with the specified number of
+    /// permits.
     ///
-    /// This permits will applied to all operations.
+    /// These permits will be applied to all operations.
     pub fn new(permits: usize) -> Self {
+        Self::with_semaphore(Arc::new(Semaphore::new(permits)))
+    }
+
+    /// Set a concurrent limit for HTTP requests.
+    ///
+    /// This convenience helper constructs a new semaphore with the specified
+    /// number of permits and calls 
[`ConcurrentLimitLayer::with_http_semaphore`].
+    /// Use [`ConcurrentLimitLayer::with_http_semaphore`] directly when reusing
+    /// a shared semaphore.
+    pub fn with_http_concurrent_limit(self, permits: usize) -> Self {
+        self.with_http_semaphore(Arc::new(Semaphore::new(permits)))
+    }
+}
+
+impl<S: ConcurrentLimitSemaphore> ConcurrentLimitLayer<S> {
+    /// Create a layer with any ConcurrentLimitSemaphore implementation.
+    ///
+    /// ```
+    /// # use std::sync::Arc;
+    /// # use mea::semaphore::Semaphore;
+    /// # use opendal_layer_concurrent_limit::ConcurrentLimitLayer;
+    /// let semaphore = Arc::new(Semaphore::new(1024));
+    /// let _layer = ConcurrentLimitLayer::with_semaphore(semaphore);
+    /// ```
+    pub fn with_semaphore(operation_semaphore: S) -> Self {
         Self {
-            operation_semaphore: Arc::new(Semaphore::new(permits)),
+            operation_semaphore,
             http_semaphore: None,
         }
     }
 
-    /// Set a concurrent limit for HTTP requests.
-    ///
-    /// This will limit the number of concurrent HTTP requests made by the
-    /// operator.
-    pub fn with_http_concurrent_limit(mut self, permits: usize) -> Self {
-        self.http_semaphore = Some(Arc::new(Semaphore::new(permits)));
+    /// Provide a custom HTTP concurrency semaphore instance.
+    pub fn with_http_semaphore(mut self, semaphore: S) -> Self {
+        self.http_semaphore = Some(semaphore);
         self
     }
 }
 
-impl<A: Access> Layer<A> for ConcurrentLimitLayer {
-    type LayeredAccess = ConcurrentLimitAccessor<A>;
+impl<A: Access, S: ConcurrentLimitSemaphore> Layer<A> for 
ConcurrentLimitLayer<S>
+where
+    S::Permit: Unpin,
+{
+    type LayeredAccess = ConcurrentLimitAccessor<A, S>;
 
     fn layer(&self, inner: A) -> Self::LayeredAccess {
         let info = inner.info();
 
-        // Update http client with metrics http fetcher.
+        // Update http client with concurrent limit http fetcher.
         info.update_http_client(|client| {
-            HttpClient::with(ConcurrentLimitHttpFetcher {
+            HttpClient::with(ConcurrentLimitHttpFetcher::<S> {
                 inner: client.into_inner(),
                 http_semaphore: self.http_semaphore.clone(),
             })
@@ -131,23 +177,26 @@ impl<A: Access> Layer<A> for ConcurrentLimitLayer {
 }
 
 #[doc(hidden)]
-pub struct ConcurrentLimitHttpFetcher {
+pub struct ConcurrentLimitHttpFetcher<S: ConcurrentLimitSemaphore> {
     inner: HttpFetcher,
-    http_semaphore: Option<Arc<Semaphore>>,
+    http_semaphore: Option<S>,
 }
 
-impl HttpFetch for ConcurrentLimitHttpFetcher {
+impl<S: ConcurrentLimitSemaphore> HttpFetch for ConcurrentLimitHttpFetcher<S>
+where
+    S::Permit: Unpin,
+{
     async fn fetch(&self, req: http::Request<Buffer>) -> 
Result<http::Response<HttpBody>> {
         let Some(semaphore) = self.http_semaphore.clone() else {
             return self.inner.fetch(req).await;
         };
 
-        let permit = semaphore.acquire_owned(1).await;
+        let permit = semaphore.acquire().await;
 
         let resp = self.inner.fetch(req).await?;
         let (parts, body) = resp.into_parts();
         let body = body.map_inner(|s| {
-            Box::new(ConcurrentLimitStream {
+            Box::new(ConcurrentLimitStream::<_, S::Permit> {
                 inner: s,
                 _permit: permit,
             })
@@ -156,49 +205,63 @@ impl HttpFetch for ConcurrentLimitHttpFetcher {
     }
 }
 
-struct ConcurrentLimitStream<S> {
+struct ConcurrentLimitStream<S, P> {
     inner: S,
     // Hold on this permit until this reader has been dropped.
-    _permit: OwnedSemaphorePermit,
+    _permit: P,
 }
 
-impl<S> Stream for ConcurrentLimitStream<S>
+impl<S, P> Stream for ConcurrentLimitStream<S, P>
 where
     S: Stream<Item = Result<Buffer>> + Unpin + 'static,
+    P: Unpin,
 {
     type Item = Result<Buffer>;
 
-    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
-        self.inner.poll_next_unpin(cx)
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+        // Safe due to Unpin bounds on S and P (thus on Self).
+        let this = self.get_mut();
+        this.inner.poll_next_unpin(cx)
     }
 }
 
 #[doc(hidden)]
-#[derive(Debug)]
-pub struct ConcurrentLimitAccessor<A: Access> {
+#[derive(Clone)]
+pub struct ConcurrentLimitAccessor<A: Access, S: ConcurrentLimitSemaphore> {
     inner: A,
-    semaphore: Arc<Semaphore>,
+    semaphore: S,
+}
+
+impl<A: Access, S: ConcurrentLimitSemaphore> std::fmt::Debug for 
ConcurrentLimitAccessor<A, S> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ConcurrentLimitAccessor")
+            .field("inner", &self.inner)
+            .finish_non_exhaustive()
+    }
 }
 
-impl<A: Access> LayeredAccess for ConcurrentLimitAccessor<A> {
+impl<A: Access, S: ConcurrentLimitSemaphore> LayeredAccess for 
ConcurrentLimitAccessor<A, S>
+where
+    S::Permit: Unpin,
+{
     type Inner = A;
-    type Reader = ConcurrentLimitWrapper<A::Reader>;
-    type Writer = ConcurrentLimitWrapper<A::Writer>;
-    type Lister = ConcurrentLimitWrapper<A::Lister>;
-    type Deleter = ConcurrentLimitWrapper<A::Deleter>;
+    type Reader = ConcurrentLimitWrapper<A::Reader, S::Permit>;
+    type Writer = ConcurrentLimitWrapper<A::Writer, S::Permit>;
+    type Lister = ConcurrentLimitWrapper<A::Lister, S::Permit>;
+    type Deleter = ConcurrentLimitWrapper<A::Deleter, S::Permit>;
 
     fn inner(&self) -> &Self::Inner {
         &self.inner
     }
 
     async fn create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
-        let _permit = self.semaphore.acquire(1).await;
+        let _permit = self.semaphore.acquire().await;
 
         self.inner.create_dir(path, args).await
     }
 
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        let permit = self.semaphore.clone().acquire_owned(1).await;
+        let permit = self.semaphore.acquire().await;
 
         self.inner
             .read(path, args)
@@ -207,7 +270,7 @@ impl<A: Access> LayeredAccess for 
ConcurrentLimitAccessor<A> {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let permit = self.semaphore.clone().acquire_owned(1).await;
+        let permit = self.semaphore.acquire().await;
 
         self.inner
             .write(path, args)
@@ -216,13 +279,13 @@ impl<A: Access> LayeredAccess for 
ConcurrentLimitAccessor<A> {
     }
 
     async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
-        let _permit = self.semaphore.acquire(1).await;
+        let _permit = self.semaphore.acquire().await;
 
         self.inner.stat(path, args).await
     }
 
     async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
-        let permit = self.semaphore.clone().acquire_owned(1).await;
+        let permit = self.semaphore.acquire().await;
 
         self.inner
             .delete()
@@ -231,7 +294,7 @@ impl<A: Access> LayeredAccess for 
ConcurrentLimitAccessor<A> {
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
-        let permit = self.semaphore.clone().acquire_owned(1).await;
+        let permit = self.semaphore.acquire().await;
 
         self.inner
             .list(path, args)
@@ -241,15 +304,15 @@ impl<A: Access> LayeredAccess for 
ConcurrentLimitAccessor<A> {
 }
 
 #[doc(hidden)]
-pub struct ConcurrentLimitWrapper<R> {
+pub struct ConcurrentLimitWrapper<R, P> {
     inner: R,
 
     // Hold on this permit until this reader has been dropped.
-    _permit: OwnedSemaphorePermit,
+    _permit: P,
 }
 
-impl<R> ConcurrentLimitWrapper<R> {
-    fn new(inner: R, permit: OwnedSemaphorePermit) -> Self {
+impl<R, P> ConcurrentLimitWrapper<R, P> {
+    fn new(inner: R, permit: P) -> Self {
         Self {
             inner,
             _permit: permit,
@@ -257,13 +320,13 @@ impl<R> ConcurrentLimitWrapper<R> {
     }
 }
 
-impl<R: oio::Read> oio::Read for ConcurrentLimitWrapper<R> {
+impl<R: oio::Read, P: Send + Sync + 'static + Unpin> oio::Read for 
ConcurrentLimitWrapper<R, P> {
     async fn read(&mut self) -> Result<Buffer> {
         self.inner.read().await
     }
 }
 
-impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
+impl<R: oio::Write, P: Send + Sync + 'static + Unpin> oio::Write for 
ConcurrentLimitWrapper<R, P> {
     async fn write(&mut self, bs: Buffer) -> Result<()> {
         self.inner.write(bs).await
     }
@@ -277,13 +340,15 @@ impl<R: oio::Write> oio::Write for 
ConcurrentLimitWrapper<R> {
     }
 }
 
-impl<R: oio::List> oio::List for ConcurrentLimitWrapper<R> {
+impl<R: oio::List, P: Send + Sync + 'static + Unpin> oio::List for 
ConcurrentLimitWrapper<R, P> {
     async fn next(&mut self) -> Result<Option<oio::Entry>> {
         self.inner.next().await
     }
 }
 
-impl<R: oio::Delete> oio::Delete for ConcurrentLimitWrapper<R> {
+impl<R: oio::Delete, P: Send + Sync + 'static + Unpin> oio::Delete
+    for ConcurrentLimitWrapper<R, P>
+{
     async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
         self.inner.delete(path, args).await
     }
@@ -292,3 +357,84 @@ impl<R: oio::Delete> oio::Delete for 
ConcurrentLimitWrapper<R> {
         self.inner.close().await
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use opendal_core::Operator;
+    use opendal_core::services;
+    use std::sync::Arc;
+    use std::time::Duration;
+    use tokio::time::timeout;
+
+    use futures::stream;
+    use http::Response;
+
+    #[tokio::test]
+    async fn operation_semaphore_can_be_shared() {
+        let semaphore = Arc::new(Semaphore::new(1));
+        let layer = ConcurrentLimitLayer::with_semaphore(semaphore.clone());
+
+        let permit = semaphore.clone().acquire_owned(1).await;
+
+        let op = Operator::new(services::Memory::default())
+            .expect("operator must build")
+            .layer(layer)
+            .finish();
+
+        let blocked = timeout(Duration::from_millis(50), op.stat("any")).await;
+        assert!(
+            blocked.is_err(),
+            "operation should be limited by shared semaphore"
+        );
+
+        drop(permit);
+
+        let completed = timeout(Duration::from_millis(50), 
op.stat("any")).await;
+        assert!(
+            completed.is_ok(),
+            "operation should proceed once permit is released"
+        );
+    }
+
+    #[tokio::test]
+    async fn http_semaphore_holds_until_body_dropped() {
+        struct DummyFetcher;
+
+        impl HttpFetch for DummyFetcher {
+            async fn fetch(&self, _req: http::Request<Buffer>) -> 
Result<Response<HttpBody>> {
+                let body = HttpBody::new(stream::empty(), None);
+                Ok(Response::builder()
+                    .status(http::StatusCode::OK)
+                    .body(body)
+                    .expect("response must build"))
+            }
+        }
+
+        let semaphore = Arc::new(Semaphore::new(1));
+        let layer = 
ConcurrentLimitLayer::new(1).with_http_semaphore(semaphore.clone());
+        let fetcher = ConcurrentLimitHttpFetcher::<Arc<Semaphore>> {
+            inner: HttpClient::with(DummyFetcher).into_inner(),
+            http_semaphore: layer.http_semaphore.clone(),
+        };
+
+        let request = http::Request::builder()
+            .uri("http://example.invalid/";)
+            .body(Buffer::new())
+            .expect("request must build");
+        let _resp = fetcher
+            .fetch(request)
+            .await
+            .expect("first fetch should succeed");
+
+        let request = http::Request::builder()
+            .uri("http://example.invalid/";)
+            .body(Buffer::new())
+            .expect("request must build");
+        let blocked = timeout(Duration::from_millis(50), 
fetcher.fetch(request)).await;
+        assert!(
+            blocked.is_err(),
+            "http fetch should block while the body holds the permit"
+        );
+    }
+}

Reply via email to