This is an automated email from the ASF dual-hosted git repository. psiace pushed a commit to branch port-6618 in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 87e04ecd944f27e5b020ed42d8cad9ae29008e0e Author: Chojan Shang <[email protected]> AuthorDate: Mon Dec 22 15:45:02 2025 +0800 feat(layers/concurrent-limit): accept custom semaphore without API break Signed-off-by: Chojan Shang <[email protected]> --- core/Cargo.lock | 1 + core/layers/concurrent-limit/Cargo.toml | 1 + core/layers/concurrent-limit/src/lib.rs | 288 ++++++++++++++++++++++++++------ 3 files changed, 239 insertions(+), 51 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index c5ca7a3a8..74a15c15f 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -5770,6 +5770,7 @@ dependencies = [ "http 1.4.0", "mea", "opendal-core", + "tokio", ] [[package]] diff --git a/core/layers/concurrent-limit/Cargo.toml b/core/layers/concurrent-limit/Cargo.toml index da80d3e45..d57f8a2a0 100644 --- a/core/layers/concurrent-limit/Cargo.toml +++ b/core/layers/concurrent-limit/Cargo.toml @@ -38,3 +38,4 @@ opendal-core = { path = "../../core", version = "0.55.0", default-features = fal [dev-dependencies] opendal-core = { path = "../../core", version = "0.55.0" } +tokio = { workspace = true, features = ["macros", "rt", "time"] } diff --git a/core/layers/concurrent-limit/src/lib.rs b/core/layers/concurrent-limit/src/lib.rs index bde32113e..f4d73b68e 100644 --- a/core/layers/concurrent-limit/src/lib.rs +++ b/core/layers/concurrent-limit/src/lib.rs @@ -15,7 +15,8 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::Debug; +use std::any::Any; +use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::Context; @@ -29,6 +30,25 @@ use mea::semaphore::Semaphore; use opendal_core::raw::*; use opendal_core::*; +/// ConcurrentLimitSemaphore abstracts a semaphore-like concurrency primitive +/// that yields an owned permit released on drop. +pub trait ConcurrentLimitSemaphore: Send + Sync + Any + Clone + Unpin + 'static { + /// The owned permit type associated with the semaphore. Dropping it + /// must release the permit back to the semaphore. + type Permit: Send + Sync + 'static; + + /// Acquire an owned permit asynchronously. + fn acquire(&self) -> impl Future<Output = Self::Permit> + MaybeSend; +} + +impl ConcurrentLimitSemaphore for Arc<Semaphore> { + type Permit = OwnedSemaphorePermit; + + async fn acquire(&self) -> Self::Permit { + self.clone().acquire_owned(1).await + } +} + /// Add concurrent request limit. /// /// # Notes @@ -50,7 +70,6 @@ use opendal_core::*; /// # use opendal_core::services; /// # use opendal_core::Operator; /// # use opendal_core::Result; -/// /// # fn main() -> Result<()> { /// let _ = Operator::new(services::Memory::default())? /// .layer(ConcurrentLimitLayer::new(1024)) @@ -66,7 +85,6 @@ use opendal_core::*; /// # use opendal_core::services; /// # use opendal_core::Operator; /// # use opendal_core::Result; -/// /// # fn main() -> Result<()> { /// let limit = ConcurrentLimitLayer::new(1024); /// @@ -81,41 +99,67 @@ use opendal_core::*; /// # } /// ``` #[derive(Clone)] -pub struct ConcurrentLimitLayer { - operation_semaphore: Arc<Semaphore>, - http_semaphore: Option<Arc<Semaphore>>, +pub struct ConcurrentLimitLayer<S: ConcurrentLimitSemaphore = Arc<Semaphore>> { + operation_semaphore: S, + http_semaphore: Option<S>, } -impl ConcurrentLimitLayer { - /// Create a new ConcurrentLimitLayer will specify permits. +impl ConcurrentLimitLayer<Arc<Semaphore>> { + /// Create a new `ConcurrentLimitLayer` with the specified number of + /// permits. /// - /// This permits will applied to all operations. + /// These permits will be applied to all operations. pub fn new(permits: usize) -> Self { + Self::with_semaphore(Arc::new(Semaphore::new(permits))) + } + + /// Set a concurrent limit for HTTP requests. + /// + /// This convenience helper constructs a new semaphore with the specified + /// number of permits and calls [`ConcurrentLimitLayer::with_http_semaphore`]. + /// Use [`ConcurrentLimitLayer::with_http_semaphore`] directly when reusing + /// a shared semaphore. + pub fn with_http_concurrent_limit(self, permits: usize) -> Self { + self.with_http_semaphore(Arc::new(Semaphore::new(permits))) + } +} + +impl<S: ConcurrentLimitSemaphore> ConcurrentLimitLayer<S> { + /// Create a layer with any ConcurrentLimitSemaphore implementation. + /// + /// ``` + /// # use std::sync::Arc; + /// # use mea::semaphore::Semaphore; + /// # use opendal_layer_concurrent_limit::ConcurrentLimitLayer; + /// let semaphore = Arc::new(Semaphore::new(1024)); + /// let _layer = ConcurrentLimitLayer::with_semaphore(semaphore); + /// ``` + pub fn with_semaphore(operation_semaphore: S) -> Self { Self { - operation_semaphore: Arc::new(Semaphore::new(permits)), + operation_semaphore, http_semaphore: None, } } - /// Set a concurrent limit for HTTP requests. - /// - /// This will limit the number of concurrent HTTP requests made by the - /// operator. - pub fn with_http_concurrent_limit(mut self, permits: usize) -> Self { - self.http_semaphore = Some(Arc::new(Semaphore::new(permits))); + /// Provide a custom HTTP concurrency semaphore instance. + pub fn with_http_semaphore(mut self, semaphore: S) -> Self { + self.http_semaphore = Some(semaphore); self } } -impl<A: Access> Layer<A> for ConcurrentLimitLayer { - type LayeredAccess = ConcurrentLimitAccessor<A>; +impl<A: Access, S: ConcurrentLimitSemaphore> Layer<A> for ConcurrentLimitLayer<S> +where + S::Permit: Unpin, +{ + type LayeredAccess = ConcurrentLimitAccessor<A, S>; fn layer(&self, inner: A) -> Self::LayeredAccess { let info = inner.info(); // Update http client with metrics http fetcher. info.update_http_client(|client| { - HttpClient::with(ConcurrentLimitHttpFetcher { + HttpClient::with(ConcurrentLimitHttpFetcher::<S> { inner: client.into_inner(), http_semaphore: self.http_semaphore.clone(), }) @@ -128,23 +172,26 @@ impl<A: Access> Layer<A> for ConcurrentLimitLayer { } } -pub struct ConcurrentLimitHttpFetcher { +pub struct ConcurrentLimitHttpFetcher<S: ConcurrentLimitSemaphore> { inner: HttpFetcher, - http_semaphore: Option<Arc<Semaphore>>, + http_semaphore: Option<S>, } -impl HttpFetch for ConcurrentLimitHttpFetcher { +impl<S: ConcurrentLimitSemaphore> HttpFetch for ConcurrentLimitHttpFetcher<S> +where + S::Permit: Unpin, +{ async fn fetch(&self, req: http::Request<Buffer>) -> Result<http::Response<HttpBody>> { let Some(semaphore) = self.http_semaphore.clone() else { return self.inner.fetch(req).await; }; - let permit = semaphore.acquire_owned(1).await; + let permit = semaphore.acquire().await; let resp = self.inner.fetch(req).await?; let (parts, body) = resp.into_parts(); let body = body.map_inner(|s| { - Box::new(ConcurrentLimitStream { + Box::new(ConcurrentLimitStream::<_, S::Permit> { inner: s, _permit: permit, }) @@ -153,48 +200,62 @@ impl HttpFetch for ConcurrentLimitHttpFetcher { } } -pub struct ConcurrentLimitStream<S> { +pub struct ConcurrentLimitStream<S, P> { inner: S, // Hold on this permit until this reader has been dropped. - _permit: OwnedSemaphorePermit, + _permit: P, } -impl<S> Stream for ConcurrentLimitStream<S> +impl<S, P> Stream for ConcurrentLimitStream<S, P> where S: Stream<Item = Result<Buffer>> + Unpin + 'static, + P: Unpin, { type Item = Result<Buffer>; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - self.inner.poll_next_unpin(cx) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + // Safe due to Unpin bounds on S and P (thus on Self). + let this = self.get_mut(); + this.inner.poll_next_unpin(cx) } } -#[derive(Debug, Clone)] -pub struct ConcurrentLimitAccessor<A: Access> { +#[derive(Clone)] +pub struct ConcurrentLimitAccessor<A: Access, S: ConcurrentLimitSemaphore> { inner: A, - semaphore: Arc<Semaphore>, + semaphore: S, } -impl<A: Access> LayeredAccess for ConcurrentLimitAccessor<A> { +impl<A: Access, S: ConcurrentLimitSemaphore> std::fmt::Debug for ConcurrentLimitAccessor<A, S> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ConcurrentLimitAccessor") + .field("inner", &self.inner) + .finish_non_exhaustive() + } +} + +impl<A: Access, S: ConcurrentLimitSemaphore> LayeredAccess for ConcurrentLimitAccessor<A, S> +where + S::Permit: Unpin, +{ type Inner = A; - type Reader = ConcurrentLimitWrapper<A::Reader>; - type Writer = ConcurrentLimitWrapper<A::Writer>; - type Lister = ConcurrentLimitWrapper<A::Lister>; - type Deleter = ConcurrentLimitWrapper<A::Deleter>; + type Reader = ConcurrentLimitWrapper<A::Reader, S::Permit>; + type Writer = ConcurrentLimitWrapper<A::Writer, S::Permit>; + type Lister = ConcurrentLimitWrapper<A::Lister, S::Permit>; + type Deleter = ConcurrentLimitWrapper<A::Deleter, S::Permit>; fn inner(&self) -> &Self::Inner { &self.inner } async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> { - let _permit = self.semaphore.acquire(1).await; + let _permit = self.semaphore.acquire().await; self.inner.create_dir(path, args).await } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let permit = self.semaphore.clone().acquire_owned(1).await; + let permit = self.semaphore.acquire().await; self.inner .read(path, args) @@ -203,7 +264,7 @@ impl<A: Access> LayeredAccess for ConcurrentLimitAccessor<A> { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let permit = self.semaphore.clone().acquire_owned(1).await; + let permit = self.semaphore.acquire().await; self.inner .write(path, args) @@ -212,13 +273,13 @@ impl<A: Access> LayeredAccess for ConcurrentLimitAccessor<A> { } async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> { - let _permit = self.semaphore.acquire(1).await; + let _permit = self.semaphore.acquire().await; self.inner.stat(path, args).await } async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { - let permit = self.semaphore.clone().acquire_owned(1).await; + let permit = self.semaphore.acquire().await; self.inner .delete() @@ -227,7 +288,7 @@ impl<A: Access> LayeredAccess for ConcurrentLimitAccessor<A> { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - let permit = self.semaphore.clone().acquire_owned(1).await; + let permit = self.semaphore.acquire().await; self.inner .list(path, args) @@ -236,15 +297,15 @@ impl<A: Access> LayeredAccess for ConcurrentLimitAccessor<A> { } } -pub struct ConcurrentLimitWrapper<R> { +pub struct ConcurrentLimitWrapper<R, P> { inner: R, // Hold on this permit until this reader has been dropped. - _permit: OwnedSemaphorePermit, + _permit: P, } -impl<R> ConcurrentLimitWrapper<R> { - fn new(inner: R, permit: OwnedSemaphorePermit) -> Self { +impl<R, P> ConcurrentLimitWrapper<R, P> { + fn new(inner: R, permit: P) -> Self { Self { inner, _permit: permit, @@ -252,13 +313,13 @@ impl<R> ConcurrentLimitWrapper<R> { } } -impl<R: oio::Read> oio::Read for ConcurrentLimitWrapper<R> { +impl<R: oio::Read, P: Send + Sync + 'static + Unpin> oio::Read for ConcurrentLimitWrapper<R, P> { async fn read(&mut self) -> Result<Buffer> { self.inner.read().await } } -impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> { +impl<R: oio::Write, P: Send + Sync + 'static + Unpin> oio::Write for ConcurrentLimitWrapper<R, P> { async fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs).await } @@ -272,13 +333,15 @@ impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> { } } -impl<R: oio::List> oio::List for ConcurrentLimitWrapper<R> { +impl<R: oio::List, P: Send + Sync + 'static + Unpin> oio::List for ConcurrentLimitWrapper<R, P> { async fn next(&mut self) -> Result<Option<oio::Entry>> { self.inner.next().await } } -impl<R: oio::Delete> oio::Delete for ConcurrentLimitWrapper<R> { +impl<R: oio::Delete, P: Send + Sync + 'static + Unpin> oio::Delete + for ConcurrentLimitWrapper<R, P> +{ async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { self.inner.delete(path, args).await } @@ -287,3 +350,126 @@ impl<R: oio::Delete> oio::Delete for ConcurrentLimitWrapper<R> { self.inner.close().await } } + +#[cfg(test)] +mod tests { + use super::*; + use opendal_core::Operator; + use opendal_core::services; + use std::collections::VecDeque; + use std::sync::Arc; + use std::sync::Mutex as StdMutex; + use std::time::Duration; + use tokio::time::timeout; + + use futures::channel::oneshot; + + // A minimal non-mea semaphore for testing the trait abstraction. + #[derive(Clone)] + struct SimpleSemaphore { + state: Arc<StdMutex<SimpleState>>, + } + + struct SimpleState { + available: usize, + waiters: VecDeque<oneshot::Sender<()>>, + } + + impl SimpleSemaphore { + fn new(permits: usize) -> Self { + Self { + state: Arc::new(StdMutex::new(SimpleState { + available: permits, + waiters: VecDeque::new(), + })), + } + } + } + + struct SimplePermit(Arc<StdMutex<SimpleState>>); + + impl Drop for SimplePermit { + fn drop(&mut self) { + let mut st = self.0.lock().expect("mutex poisoned"); + st.available += 1; + if let Some(tx) = st.waiters.pop_front() { + let _ = tx.send(()); + } + } + } + + impl ConcurrentLimitSemaphore for SimpleSemaphore { + type Permit = SimplePermit; + + async fn acquire(&self) -> Self::Permit { + let state = self.state.clone(); + loop { + let acquired = { + let mut st = state.lock().expect("mutex poisoned"); + if st.available > 0 { + st.available -= 1; + true + } else { + false + } + }; + if acquired { + return SimplePermit(state.clone()); + } + + let (tx, rx) = oneshot::channel(); + { + let mut st = state.lock().expect("mutex poisoned"); + st.waiters.push_back(tx); + } + let _ = rx.await; + } + } + } + + #[tokio::test] + async fn operation_semaphore_can_be_shared() { + let semaphore = Arc::new(Semaphore::new(1)); + let layer = ConcurrentLimitLayer::with_semaphore(semaphore.clone()); + + let permit = semaphore.clone().acquire_owned(1).await; + + let op = Operator::new(services::Memory::default()) + .expect("operator must build") + .layer(layer) + .finish(); + + let blocked = timeout(Duration::from_millis(50), op.stat("any")).await; + assert!( + blocked.is_err(), + "operation should be limited by shared semaphore" + ); + + drop(permit); + + let completed = timeout(Duration::from_millis(50), op.stat("any")).await; + assert!( + completed.is_ok(), + "operation should proceed once permit is released" + ); + } + + #[tokio::test] + async fn custom_semaphore_is_honored() { + let custom = SimpleSemaphore::new(1); + let layer = ConcurrentLimitLayer::with_semaphore(custom); + + let op = Operator::new(services::Memory::default()) + .expect("operator must build") + .layer(layer) + .finish(); + + let _l = op.lister("").await.expect("list should start"); + + let blocked = timeout(Duration::from_millis(50), op.stat("any")).await; + assert!( + blocked.is_err(), + "operation should be limited by custom semaphore" + ); + } +}
