This is an automated email from the ASF dual-hosted git repository.
psiace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new b23593ea3 feat(layers/concurrent-limit): accept custom semaphore
without API break (#7082)
b23593ea3 is described below
commit b23593ea323609f1bc3edace3efd8bac19ce57dd
Author: Chojan Shang <[email protected]>
AuthorDate: Tue Jan 20 11:05:24 2026 +0800
feat(layers/concurrent-limit): accept custom semaphore without API break
(#7082)
* feat(layers/concurrent-limit): accept custom semaphore without API break
Signed-off-by: Chojan Shang <[email protected]>
* refactor(layers/concurrent-limit): drop Any bound and refresh http test
Signed-off-by: Chojan Shang <[email protected]>
---------
Signed-off-by: Chojan Shang <[email protected]>
---
core/Cargo.lock | 1 +
core/layers/concurrent-limit/Cargo.toml | 1 +
core/layers/concurrent-limit/src/lib.rs | 244 +++++++++++++++++++++++++-------
3 files changed, 197 insertions(+), 49 deletions(-)
diff --git a/core/Cargo.lock b/core/Cargo.lock
index bd9e70adf..661025134 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -6116,6 +6116,7 @@ dependencies = [
"http 1.4.0",
"mea 0.6.0",
"opendal-core",
+ "tokio",
]
[[package]]
diff --git a/core/layers/concurrent-limit/Cargo.toml
b/core/layers/concurrent-limit/Cargo.toml
index 0960ab099..eb9656389 100644
--- a/core/layers/concurrent-limit/Cargo.toml
+++ b/core/layers/concurrent-limit/Cargo.toml
@@ -38,3 +38,4 @@ opendal-core = { path = "../../core", version = "0.55.0",
default-features = fal
[dev-dependencies]
opendal-core = { path = "../../core", version = "0.55.0" }
+tokio = { workspace = true, features = ["macros", "rt", "time"] }
diff --git a/core/layers/concurrent-limit/src/lib.rs
b/core/layers/concurrent-limit/src/lib.rs
index 53c15b208..ce0ef012d 100644
--- a/core/layers/concurrent-limit/src/lib.rs
+++ b/core/layers/concurrent-limit/src/lib.rs
@@ -20,6 +20,7 @@
#![cfg_attr(docsrs, feature(doc_cfg))]
#![deny(missing_docs)]
+use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
@@ -32,6 +33,25 @@ use mea::semaphore::Semaphore;
use opendal_core::raw::*;
use opendal_core::*;
+/// ConcurrentLimitSemaphore abstracts a semaphore-like concurrency primitive
+/// that yields an owned permit released on drop.
+pub trait ConcurrentLimitSemaphore: Send + Sync + Clone + Unpin + 'static {
+ /// The owned permit type associated with the semaphore. Dropping it
+ /// must release the permit back to the semaphore.
+ type Permit: Send + Sync + 'static;
+
+ /// Acquire an owned permit asynchronously.
+ fn acquire(&self) -> impl Future<Output = Self::Permit> + MaybeSend;
+}
+
+impl ConcurrentLimitSemaphore for Arc<Semaphore> {
+ type Permit = OwnedSemaphorePermit;
+
+ async fn acquire(&self) -> Self::Permit {
+ self.clone().acquire_owned(1).await
+ }
+}
+
/// Add concurrent request limit.
///
/// # Notes
@@ -83,41 +103,67 @@ use opendal_core::*;
/// # }
/// ```
#[derive(Clone)]
-pub struct ConcurrentLimitLayer {
- operation_semaphore: Arc<Semaphore>,
- http_semaphore: Option<Arc<Semaphore>>,
+pub struct ConcurrentLimitLayer<S: ConcurrentLimitSemaphore = Arc<Semaphore>> {
+ operation_semaphore: S,
+ http_semaphore: Option<S>,
}
-impl ConcurrentLimitLayer {
- /// Create a new ConcurrentLimitLayer will specify permits.
+impl ConcurrentLimitLayer<Arc<Semaphore>> {
+ /// Create a new `ConcurrentLimitLayer` with the specified number of
+ /// permits.
///
- /// This permits will applied to all operations.
+ /// These permits will be applied to all operations.
pub fn new(permits: usize) -> Self {
+ Self::with_semaphore(Arc::new(Semaphore::new(permits)))
+ }
+
+ /// Set a concurrent limit for HTTP requests.
+ ///
+ /// This convenience helper constructs a new semaphore with the specified
+ /// number of permits and calls
[`ConcurrentLimitLayer::with_http_semaphore`].
+ /// Use [`ConcurrentLimitLayer::with_http_semaphore`] directly when reusing
+ /// a shared semaphore.
+ pub fn with_http_concurrent_limit(self, permits: usize) -> Self {
+ self.with_http_semaphore(Arc::new(Semaphore::new(permits)))
+ }
+}
+
+impl<S: ConcurrentLimitSemaphore> ConcurrentLimitLayer<S> {
+ /// Create a layer with any ConcurrentLimitSemaphore implementation.
+ ///
+ /// ```
+ /// # use std::sync::Arc;
+ /// # use mea::semaphore::Semaphore;
+ /// # use opendal_layer_concurrent_limit::ConcurrentLimitLayer;
+ /// let semaphore = Arc::new(Semaphore::new(1024));
+ /// let _layer = ConcurrentLimitLayer::with_semaphore(semaphore);
+ /// ```
+ pub fn with_semaphore(operation_semaphore: S) -> Self {
Self {
- operation_semaphore: Arc::new(Semaphore::new(permits)),
+ operation_semaphore,
http_semaphore: None,
}
}
- /// Set a concurrent limit for HTTP requests.
- ///
- /// This will limit the number of concurrent HTTP requests made by the
- /// operator.
- pub fn with_http_concurrent_limit(mut self, permits: usize) -> Self {
- self.http_semaphore = Some(Arc::new(Semaphore::new(permits)));
+ /// Provide a custom HTTP concurrency semaphore instance.
+ pub fn with_http_semaphore(mut self, semaphore: S) -> Self {
+ self.http_semaphore = Some(semaphore);
self
}
}
-impl<A: Access> Layer<A> for ConcurrentLimitLayer {
- type LayeredAccess = ConcurrentLimitAccessor<A>;
+impl<A: Access, S: ConcurrentLimitSemaphore> Layer<A> for
ConcurrentLimitLayer<S>
+where
+ S::Permit: Unpin,
+{
+ type LayeredAccess = ConcurrentLimitAccessor<A, S>;
fn layer(&self, inner: A) -> Self::LayeredAccess {
let info = inner.info();
- // Update http client with metrics http fetcher.
+ // Update http client with concurrent limit http fetcher.
info.update_http_client(|client| {
- HttpClient::with(ConcurrentLimitHttpFetcher {
+ HttpClient::with(ConcurrentLimitHttpFetcher::<S> {
inner: client.into_inner(),
http_semaphore: self.http_semaphore.clone(),
})
@@ -131,23 +177,26 @@ impl<A: Access> Layer<A> for ConcurrentLimitLayer {
}
#[doc(hidden)]
-pub struct ConcurrentLimitHttpFetcher {
+pub struct ConcurrentLimitHttpFetcher<S: ConcurrentLimitSemaphore> {
inner: HttpFetcher,
- http_semaphore: Option<Arc<Semaphore>>,
+ http_semaphore: Option<S>,
}
-impl HttpFetch for ConcurrentLimitHttpFetcher {
+impl<S: ConcurrentLimitSemaphore> HttpFetch for ConcurrentLimitHttpFetcher<S>
+where
+ S::Permit: Unpin,
+{
async fn fetch(&self, req: http::Request<Buffer>) ->
Result<http::Response<HttpBody>> {
let Some(semaphore) = self.http_semaphore.clone() else {
return self.inner.fetch(req).await;
};
- let permit = semaphore.acquire_owned(1).await;
+ let permit = semaphore.acquire().await;
let resp = self.inner.fetch(req).await?;
let (parts, body) = resp.into_parts();
let body = body.map_inner(|s| {
- Box::new(ConcurrentLimitStream {
+ Box::new(ConcurrentLimitStream::<_, S::Permit> {
inner: s,
_permit: permit,
})
@@ -156,49 +205,63 @@ impl HttpFetch for ConcurrentLimitHttpFetcher {
}
}
-struct ConcurrentLimitStream<S> {
+struct ConcurrentLimitStream<S, P> {
inner: S,
// Hold on this permit until this reader has been dropped.
- _permit: OwnedSemaphorePermit,
+ _permit: P,
}
-impl<S> Stream for ConcurrentLimitStream<S>
+impl<S, P> Stream for ConcurrentLimitStream<S, P>
where
S: Stream<Item = Result<Buffer>> + Unpin + 'static,
+ P: Unpin,
{
type Item = Result<Buffer>;
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
- self.inner.poll_next_unpin(cx)
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
+ // Safe due to Unpin bounds on S and P (thus on Self).
+ let this = self.get_mut();
+ this.inner.poll_next_unpin(cx)
}
}
#[doc(hidden)]
-#[derive(Debug)]
-pub struct ConcurrentLimitAccessor<A: Access> {
+#[derive(Clone)]
+pub struct ConcurrentLimitAccessor<A: Access, S: ConcurrentLimitSemaphore> {
inner: A,
- semaphore: Arc<Semaphore>,
+ semaphore: S,
+}
+
+impl<A: Access, S: ConcurrentLimitSemaphore> std::fmt::Debug for
ConcurrentLimitAccessor<A, S> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("ConcurrentLimitAccessor")
+ .field("inner", &self.inner)
+ .finish_non_exhaustive()
+ }
}
-impl<A: Access> LayeredAccess for ConcurrentLimitAccessor<A> {
+impl<A: Access, S: ConcurrentLimitSemaphore> LayeredAccess for
ConcurrentLimitAccessor<A, S>
+where
+ S::Permit: Unpin,
+{
type Inner = A;
- type Reader = ConcurrentLimitWrapper<A::Reader>;
- type Writer = ConcurrentLimitWrapper<A::Writer>;
- type Lister = ConcurrentLimitWrapper<A::Lister>;
- type Deleter = ConcurrentLimitWrapper<A::Deleter>;
+ type Reader = ConcurrentLimitWrapper<A::Reader, S::Permit>;
+ type Writer = ConcurrentLimitWrapper<A::Writer, S::Permit>;
+ type Lister = ConcurrentLimitWrapper<A::Lister, S::Permit>;
+ type Deleter = ConcurrentLimitWrapper<A::Deleter, S::Permit>;
fn inner(&self) -> &Self::Inner {
&self.inner
}
async fn create_dir(&self, path: &str, args: OpCreateDir) ->
Result<RpCreateDir> {
- let _permit = self.semaphore.acquire(1).await;
+ let _permit = self.semaphore.acquire().await;
self.inner.create_dir(path, args).await
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
- let permit = self.semaphore.clone().acquire_owned(1).await;
+ let permit = self.semaphore.acquire().await;
self.inner
.read(path, args)
@@ -207,7 +270,7 @@ impl<A: Access> LayeredAccess for
ConcurrentLimitAccessor<A> {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- let permit = self.semaphore.clone().acquire_owned(1).await;
+ let permit = self.semaphore.acquire().await;
self.inner
.write(path, args)
@@ -216,13 +279,13 @@ impl<A: Access> LayeredAccess for
ConcurrentLimitAccessor<A> {
}
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
- let _permit = self.semaphore.acquire(1).await;
+ let _permit = self.semaphore.acquire().await;
self.inner.stat(path, args).await
}
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
- let permit = self.semaphore.clone().acquire_owned(1).await;
+ let permit = self.semaphore.acquire().await;
self.inner
.delete()
@@ -231,7 +294,7 @@ impl<A: Access> LayeredAccess for
ConcurrentLimitAccessor<A> {
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Lister)> {
- let permit = self.semaphore.clone().acquire_owned(1).await;
+ let permit = self.semaphore.acquire().await;
self.inner
.list(path, args)
@@ -241,15 +304,15 @@ impl<A: Access> LayeredAccess for
ConcurrentLimitAccessor<A> {
}
#[doc(hidden)]
-pub struct ConcurrentLimitWrapper<R> {
+pub struct ConcurrentLimitWrapper<R, P> {
inner: R,
// Hold on this permit until this reader has been dropped.
- _permit: OwnedSemaphorePermit,
+ _permit: P,
}
-impl<R> ConcurrentLimitWrapper<R> {
- fn new(inner: R, permit: OwnedSemaphorePermit) -> Self {
+impl<R, P> ConcurrentLimitWrapper<R, P> {
+ fn new(inner: R, permit: P) -> Self {
Self {
inner,
_permit: permit,
@@ -257,13 +320,13 @@ impl<R> ConcurrentLimitWrapper<R> {
}
}
-impl<R: oio::Read> oio::Read for ConcurrentLimitWrapper<R> {
+impl<R: oio::Read, P: Send + Sync + 'static + Unpin> oio::Read for
ConcurrentLimitWrapper<R, P> {
async fn read(&mut self) -> Result<Buffer> {
self.inner.read().await
}
}
-impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
+impl<R: oio::Write, P: Send + Sync + 'static + Unpin> oio::Write for
ConcurrentLimitWrapper<R, P> {
async fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs).await
}
@@ -277,13 +340,15 @@ impl<R: oio::Write> oio::Write for
ConcurrentLimitWrapper<R> {
}
}
-impl<R: oio::List> oio::List for ConcurrentLimitWrapper<R> {
+impl<R: oio::List, P: Send + Sync + 'static + Unpin> oio::List for
ConcurrentLimitWrapper<R, P> {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
self.inner.next().await
}
}
-impl<R: oio::Delete> oio::Delete for ConcurrentLimitWrapper<R> {
+impl<R: oio::Delete, P: Send + Sync + 'static + Unpin> oio::Delete
+ for ConcurrentLimitWrapper<R, P>
+{
async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.inner.delete(path, args).await
}
@@ -292,3 +357,84 @@ impl<R: oio::Delete> oio::Delete for
ConcurrentLimitWrapper<R> {
self.inner.close().await
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use opendal_core::Operator;
+ use opendal_core::services;
+ use std::sync::Arc;
+ use std::time::Duration;
+ use tokio::time::timeout;
+
+ use futures::stream;
+ use http::Response;
+
+ #[tokio::test]
+ async fn operation_semaphore_can_be_shared() {
+ let semaphore = Arc::new(Semaphore::new(1));
+ let layer = ConcurrentLimitLayer::with_semaphore(semaphore.clone());
+
+ let permit = semaphore.clone().acquire_owned(1).await;
+
+ let op = Operator::new(services::Memory::default())
+ .expect("operator must build")
+ .layer(layer)
+ .finish();
+
+ let blocked = timeout(Duration::from_millis(50), op.stat("any")).await;
+ assert!(
+ blocked.is_err(),
+ "operation should be limited by shared semaphore"
+ );
+
+ drop(permit);
+
+ let completed = timeout(Duration::from_millis(50),
op.stat("any")).await;
+ assert!(
+ completed.is_ok(),
+ "operation should proceed once permit is released"
+ );
+ }
+
+ #[tokio::test]
+ async fn http_semaphore_holds_until_body_dropped() {
+ struct DummyFetcher;
+
+ impl HttpFetch for DummyFetcher {
+ async fn fetch(&self, _req: http::Request<Buffer>) ->
Result<Response<HttpBody>> {
+ let body = HttpBody::new(stream::empty(), None);
+ Ok(Response::builder()
+ .status(http::StatusCode::OK)
+ .body(body)
+ .expect("response must build"))
+ }
+ }
+
+ let semaphore = Arc::new(Semaphore::new(1));
+ let layer =
ConcurrentLimitLayer::new(1).with_http_semaphore(semaphore.clone());
+ let fetcher = ConcurrentLimitHttpFetcher::<Arc<Semaphore>> {
+ inner: HttpClient::with(DummyFetcher).into_inner(),
+ http_semaphore: layer.http_semaphore.clone(),
+ };
+
+ let request = http::Request::builder()
+ .uri("http://example.invalid/")
+ .body(Buffer::new())
+ .expect("request must build");
+ let _resp = fetcher
+ .fetch(request)
+ .await
+ .expect("first fetch should succeed");
+
+ let request = http::Request::builder()
+ .uri("http://example.invalid/")
+ .body(Buffer::new())
+ .expect("request must build");
+ let blocked = timeout(Duration::from_millis(50),
fetcher.fetch(request)).await;
+ assert!(
+ blocked.is_err(),
+ "http fetch should block while the body holds the permit"
+ );
+ }
+}