This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 68f8dea33 feat(core): Allow add concurrent limit for in-progress http 
requests (#6025)
68f8dea33 is described below

commit 68f8dea33c4dc4efa18c68847bb76995696a1c60
Author: Xuanwo <[email protected]>
AuthorDate: Tue Apr 15 18:25:41 2025 +0800

    feat(core): Allow add concurrent limit for in-progress http requests (#6025)
    
    * feat(core): Allow add concurrent limit for in-progress http requests
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix typo
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/concurrent_limit.rs | 113 ++++++++++++++++++++++++++++++++++--
 1 file changed, 109 insertions(+), 4 deletions(-)

diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index 7152a977d..738a1e034 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -16,8 +16,13 @@
 // under the License.
 
 use std::fmt::Debug;
+use std::pin::Pin;
 use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll;
 
+use futures::Stream;
+use futures::StreamExt;
 use tokio::sync::OwnedSemaphorePermit;
 use tokio::sync::Semaphore;
 
@@ -31,8 +36,15 @@ use crate::*;
 /// Users can control how many concurrent connections could be established
 /// between OpenDAL and underlying storage services.
 ///
+/// All operators wrapped by this layer will share a common semaphore. This
+/// allows you to reuse the same layer across multiple operators, ensuring
+/// that the total number of concurrent requests across the entire
+/// application does not exceed the limit.
+///
 /// # Examples
 ///
+/// Add a concurrent limit layer to the operator:
+///
 /// ```no_run
 /// # use opendal::layers::ConcurrentLimitLayer;
 /// # use opendal::services;
@@ -47,15 +59,53 @@ use crate::*;
 /// Ok(())
 /// # }
 /// ```
+///
+/// Share a concurrent limit layer between the operators:
+///
+/// ```no_run
+/// # use opendal::layers::ConcurrentLimitLayer;
+/// # use opendal::services;
+/// # use opendal::Operator;
+/// # use opendal::Result;
+/// # use opendal::Scheme;
+///
+/// # fn main() -> Result<()> {
+/// let limit = ConcurrentLimitLayer::new(1024);
+///
+/// let _operator_a = Operator::new(services::Memory::default())?
+///     .layer(limit.clone())
+///     .finish();
+/// let _operator_b = Operator::new(services::Memory::default())?
+///     .layer(limit.clone())
+///     .finish();
+///
+/// Ok(())
+/// # }
+/// ```
 #[derive(Clone)]
 pub struct ConcurrentLimitLayer {
-    permits: usize,
+    operation_semaphore: Arc<Semaphore>,
+    http_semaphore: Option<Arc<Semaphore>>,
 }
 
 impl ConcurrentLimitLayer {
-    /// Create a new ConcurrentLimitLayer will specify permits
+    /// Create a new ConcurrentLimitLayer will specify permits.
+    ///
+    /// This permits will applied to all operations.
     pub fn new(permits: usize) -> Self {
-        Self { permits }
+        Self {
+            operation_semaphore: Arc::new(Semaphore::new(permits)),
+            http_semaphore: None,
+        }
+    }
+
+    /// Set a concurrent limit for HTTP requests.
+    ///
+    /// This will limit the number of concurrent HTTP requests made by the
+    /// operator.
+    pub fn with_http_concurrent_limit(mut self, permits: usize) -> Self {
+        self.http_semaphore = Some(Arc::new(Semaphore::new(permits)));
+        self
     }
 }
 
@@ -63,13 +113,68 @@ impl<A: Access> Layer<A> for ConcurrentLimitLayer {
     type LayeredAccess = ConcurrentLimitAccessor<A>;
 
     fn layer(&self, inner: A) -> Self::LayeredAccess {
+        let info = inner.info();
+
+        // Update http client with metrics http fetcher.
+        info.update_http_client(|client| {
+            HttpClient::with(ConcurrentLimitHttpFetcher {
+                inner: client.into_inner(),
+                http_semaphore: self.http_semaphore.clone(),
+            })
+        });
+
         ConcurrentLimitAccessor {
             inner,
-            semaphore: Arc::new(Semaphore::new(self.permits)),
+            semaphore: self.operation_semaphore.clone(),
         }
     }
 }
 
+pub struct ConcurrentLimitHttpFetcher {
+    inner: HttpFetcher,
+    http_semaphore: Option<Arc<Semaphore>>,
+}
+
+impl HttpFetch for ConcurrentLimitHttpFetcher {
+    async fn fetch(&self, req: http::Request<Buffer>) -> 
Result<http::Response<HttpBody>> {
+        let Some(semaphore) = self.http_semaphore.clone() else {
+            return self.inner.fetch(req).await;
+        };
+
+        let permit = semaphore
+            .acquire_owned()
+            .await
+            .expect("semaphore must be valid");
+
+        let resp = self.inner.fetch(req).await?;
+        let (parts, body) = resp.into_parts();
+        let body = body.map_inner(|s| {
+            Box::new(ConcurrentLimitStream {
+                inner: s,
+                _permit: permit,
+            })
+        });
+        Ok(http::Response::from_parts(parts, body))
+    }
+}
+
+pub struct ConcurrentLimitStream<S> {
+    inner: S,
+    // Hold on this permit until this reader has been dropped.
+    _permit: OwnedSemaphorePermit,
+}
+
+impl<S> Stream for ConcurrentLimitStream<S>
+where
+    S: Stream<Item = Result<Buffer>> + Unpin + 'static,
+{
+    type Item = Result<Buffer>;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+        self.inner.poll_next_unpin(cx)
+    }
+}
+
 #[derive(Debug, Clone)]
 pub struct ConcurrentLimitAccessor<A: Access> {
     inner: A,

Reply via email to