This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 68f8dea33 feat(core): Allow add concurrent limit for in-progress http
requests (#6025)
68f8dea33 is described below
commit 68f8dea33c4dc4efa18c68847bb76995696a1c60
Author: Xuanwo <[email protected]>
AuthorDate: Tue Apr 15 18:25:41 2025 +0800
feat(core): Allow add concurrent limit for in-progress http requests (#6025)
* feat(core): Allow add concurrent limit for in-progress http requests
Signed-off-by: Xuanwo <[email protected]>
* Fix typo
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
core/src/layers/concurrent_limit.rs | 113 ++++++++++++++++++++++++++++++++++--
1 file changed, 109 insertions(+), 4 deletions(-)
diff --git a/core/src/layers/concurrent_limit.rs
b/core/src/layers/concurrent_limit.rs
index 7152a977d..738a1e034 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -16,8 +16,13 @@
// under the License.
use std::fmt::Debug;
+use std::pin::Pin;
use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll;
+use futures::Stream;
+use futures::StreamExt;
use tokio::sync::OwnedSemaphorePermit;
use tokio::sync::Semaphore;
@@ -31,8 +36,15 @@ use crate::*;
/// Users can control how many concurrent connections could be established
/// between OpenDAL and underlying storage services.
///
+/// All operators wrapped by this layer will share a common semaphore. This
+/// allows you to reuse the same layer across multiple operators, ensuring
+/// that the total number of concurrent requests across the entire
+/// application does not exceed the limit.
+///
/// # Examples
///
+/// Add a concurrent limit layer to the operator:
+///
/// ```no_run
/// # use opendal::layers::ConcurrentLimitLayer;
/// # use opendal::services;
@@ -47,15 +59,53 @@ use crate::*;
/// Ok(())
/// # }
/// ```
+///
+/// Share a concurrent limit layer between the operators:
+///
+/// ```no_run
+/// # use opendal::layers::ConcurrentLimitLayer;
+/// # use opendal::services;
+/// # use opendal::Operator;
+/// # use opendal::Result;
+/// # use opendal::Scheme;
+///
+/// # fn main() -> Result<()> {
+/// let limit = ConcurrentLimitLayer::new(1024);
+///
+/// let _operator_a = Operator::new(services::Memory::default())?
+/// .layer(limit.clone())
+/// .finish();
+/// let _operator_b = Operator::new(services::Memory::default())?
+/// .layer(limit.clone())
+/// .finish();
+///
+/// Ok(())
+/// # }
+/// ```
#[derive(Clone)]
pub struct ConcurrentLimitLayer {
- permits: usize,
+ operation_semaphore: Arc<Semaphore>,
+ http_semaphore: Option<Arc<Semaphore>>,
}
impl ConcurrentLimitLayer {
- /// Create a new ConcurrentLimitLayer will specify permits
+ /// Create a new ConcurrentLimitLayer will specify permits.
+ ///
+ /// This permits will applied to all operations.
pub fn new(permits: usize) -> Self {
- Self { permits }
+ Self {
+ operation_semaphore: Arc::new(Semaphore::new(permits)),
+ http_semaphore: None,
+ }
+ }
+
+ /// Set a concurrent limit for HTTP requests.
+ ///
+ /// This will limit the number of concurrent HTTP requests made by the
+ /// operator.
+ pub fn with_http_concurrent_limit(mut self, permits: usize) -> Self {
+ self.http_semaphore = Some(Arc::new(Semaphore::new(permits)));
+ self
}
}
@@ -63,13 +113,68 @@ impl<A: Access> Layer<A> for ConcurrentLimitLayer {
type LayeredAccess = ConcurrentLimitAccessor<A>;
fn layer(&self, inner: A) -> Self::LayeredAccess {
+ let info = inner.info();
+
+ // Update http client with metrics http fetcher.
+ info.update_http_client(|client| {
+ HttpClient::with(ConcurrentLimitHttpFetcher {
+ inner: client.into_inner(),
+ http_semaphore: self.http_semaphore.clone(),
+ })
+ });
+
ConcurrentLimitAccessor {
inner,
- semaphore: Arc::new(Semaphore::new(self.permits)),
+ semaphore: self.operation_semaphore.clone(),
}
}
}
+pub struct ConcurrentLimitHttpFetcher {
+ inner: HttpFetcher,
+ http_semaphore: Option<Arc<Semaphore>>,
+}
+
+impl HttpFetch for ConcurrentLimitHttpFetcher {
+ async fn fetch(&self, req: http::Request<Buffer>) ->
Result<http::Response<HttpBody>> {
+ let Some(semaphore) = self.http_semaphore.clone() else {
+ return self.inner.fetch(req).await;
+ };
+
+ let permit = semaphore
+ .acquire_owned()
+ .await
+ .expect("semaphore must be valid");
+
+ let resp = self.inner.fetch(req).await?;
+ let (parts, body) = resp.into_parts();
+ let body = body.map_inner(|s| {
+ Box::new(ConcurrentLimitStream {
+ inner: s,
+ _permit: permit,
+ })
+ });
+ Ok(http::Response::from_parts(parts, body))
+ }
+}
+
+pub struct ConcurrentLimitStream<S> {
+ inner: S,
+ // Hold on this permit until this reader has been dropped.
+ _permit: OwnedSemaphorePermit,
+}
+
+impl<S> Stream for ConcurrentLimitStream<S>
+where
+ S: Stream<Item = Result<Buffer>> + Unpin + 'static,
+{
+ type Item = Result<Buffer>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
+ self.inner.poll_next_unpin(cx)
+ }
+}
+
#[derive(Debug, Clone)]
pub struct ConcurrentLimitAccessor<A: Access> {
inner: A,