Copilot commented on code in PR #7082:
URL: https://github.com/apache/opendal/pull/7082#discussion_r2638989898
##########
core/layers/concurrent-limit/src/lib.rs:
##########
@@ -287,3 +350,126 @@ impl<R: oio::Delete> oio::Delete for
ConcurrentLimitWrapper<R> {
self.inner.close().await
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use opendal_core::Operator;
+ use opendal_core::services;
+ use std::collections::VecDeque;
+ use std::sync::Arc;
+ use std::sync::Mutex as StdMutex;
+ use std::time::Duration;
+ use tokio::time::timeout;
+
+ use futures::channel::oneshot;
+
+ // A minimal non-mea semaphore for testing the trait abstraction.
+ #[derive(Clone)]
+ struct SimpleSemaphore {
+ state: Arc<StdMutex<SimpleState>>,
+ }
+
+ struct SimpleState {
+ available: usize,
+ waiters: VecDeque<oneshot::Sender<()>>,
+ }
+
+ impl SimpleSemaphore {
+ fn new(permits: usize) -> Self {
+ Self {
+ state: Arc::new(StdMutex::new(SimpleState {
+ available: permits,
+ waiters: VecDeque::new(),
+ })),
+ }
+ }
+ }
+
+ struct SimplePermit(Arc<StdMutex<SimpleState>>);
+
+ impl Drop for SimplePermit {
+ fn drop(&mut self) {
+ let mut st = self.0.lock().expect("mutex poisoned");
+ st.available += 1;
+ if let Some(tx) = st.waiters.pop_front() {
+ let _ = tx.send(());
+ }
+ }
+ }
+
+ impl ConcurrentLimitSemaphore for SimpleSemaphore {
+ type Permit = SimplePermit;
+
+ async fn acquire(&self) -> Self::Permit {
+ let state = self.state.clone();
+ loop {
+ let acquired = {
+ let mut st = state.lock().expect("mutex poisoned");
+ if st.available > 0 {
+ st.available -= 1;
+ true
+ } else {
+ false
+ }
+ };
+ if acquired {
+ return SimplePermit(state.clone());
+ }
+
+ let (tx, rx) = oneshot::channel();
+ {
+ let mut st = state.lock().expect("mutex poisoned");
+ st.waiters.push_back(tx);
+ }
+ let _ = rx.await;
+ }
+ }
+ }
+
+ #[tokio::test]
+ async fn operation_semaphore_can_be_shared() {
+ let semaphore = Arc::new(Semaphore::new(1));
+ let layer = ConcurrentLimitLayer::with_semaphore(semaphore.clone());
+
+ let permit = semaphore.clone().acquire_owned(1).await;
+
+ let op = Operator::new(services::Memory::default())
+ .expect("operator must build")
+ .layer(layer)
+ .finish();
+
+ let blocked = timeout(Duration::from_millis(50), op.stat("any")).await;
+ assert!(
+ blocked.is_err(),
+ "operation should be limited by shared semaphore"
+ );
+
+ drop(permit);
+
+ let completed = timeout(Duration::from_millis(50),
op.stat("any")).await;
+ assert!(
+ completed.is_ok(),
+ "operation should proceed once permit is released"
+ );
+ }
+
+ #[tokio::test]
+ async fn custom_semaphore_is_honored() {
+ let custom = SimpleSemaphore::new(1);
+ let layer = ConcurrentLimitLayer::with_semaphore(custom);
+
+ let op = Operator::new(services::Memory::default())
+ .expect("operator must build")
+ .layer(layer)
+ .finish();
+
+ let _l = op.lister("").await.expect("list should start");
+
+ let blocked = timeout(Duration::from_millis(50), op.stat("any")).await;
+ assert!(
+ blocked.is_err(),
+ "operation should be limited by custom semaphore"
+ );
+ }
+}
Review Comment:
The tests only verify operation-level semaphore limiting but don't test the
HTTP-level semaphore functionality introduced by `with_http_semaphore` and
`with_http_concurrent_limit`. Consider adding a test that verifies HTTP request
limiting works correctly when configured.
##########
core/layers/concurrent-limit/src/lib.rs:
##########
@@ -81,41 +99,67 @@ use opendal_core::*;
/// # }
/// ```
#[derive(Clone)]
-pub struct ConcurrentLimitLayer {
- operation_semaphore: Arc<Semaphore>,
- http_semaphore: Option<Arc<Semaphore>>,
+pub struct ConcurrentLimitLayer<S: ConcurrentLimitSemaphore = Arc<Semaphore>> {
+ operation_semaphore: S,
+ http_semaphore: Option<S>,
}
-impl ConcurrentLimitLayer {
- /// Create a new ConcurrentLimitLayer will specify permits.
+impl ConcurrentLimitLayer<Arc<Semaphore>> {
+ /// Create a new `ConcurrentLimitLayer` with the specified number of
+ /// permits.
///
- /// This permits will applied to all operations.
+ /// These permits will be applied to all operations.
pub fn new(permits: usize) -> Self {
+ Self::with_semaphore(Arc::new(Semaphore::new(permits)))
+ }
+
+ /// Set a concurrent limit for HTTP requests.
+ ///
+ /// This convenience helper constructs a new semaphore with the specified
+ /// number of permits and calls
[`ConcurrentLimitLayer::with_http_semaphore`].
+ /// Use [`ConcurrentLimitLayer::with_http_semaphore`] directly when reusing
+ /// a shared semaphore.
+ pub fn with_http_concurrent_limit(self, permits: usize) -> Self {
+ self.with_http_semaphore(Arc::new(Semaphore::new(permits)))
+ }
+}
+
+impl<S: ConcurrentLimitSemaphore> ConcurrentLimitLayer<S> {
+ /// Create a layer with any ConcurrentLimitSemaphore implementation.
+ ///
+ /// ```
+ /// # use std::sync::Arc;
+ /// # use mea::semaphore::Semaphore;
+ /// # use opendal_layer_concurrent_limit::ConcurrentLimitLayer;
+ /// let semaphore = Arc::new(Semaphore::new(1024));
+ /// let _layer = ConcurrentLimitLayer::with_semaphore(semaphore);
+ /// ```
+ pub fn with_semaphore(operation_semaphore: S) -> Self {
Self {
- operation_semaphore: Arc::new(Semaphore::new(permits)),
+ operation_semaphore,
http_semaphore: None,
}
}
- /// Set a concurrent limit for HTTP requests.
- ///
- /// This will limit the number of concurrent HTTP requests made by the
- /// operator.
- pub fn with_http_concurrent_limit(mut self, permits: usize) -> Self {
- self.http_semaphore = Some(Arc::new(Semaphore::new(permits)));
+ /// Provide a custom HTTP concurrency semaphore instance.
Review Comment:
The `with_http_semaphore` method documentation is minimal. It should include
more detail explaining its purpose (limiting HTTP request concurrency), when to
use it versus `with_http_concurrent_limit`, and ideally include a usage example
similar to `with_semaphore`.
```suggestion
/// Provide a custom HTTP concurrency semaphore instance.
///
/// The semaphore configured here is used to limit the number of
/// in-flight HTTP requests issued by the underlying [`HttpClient`].
/// It is independent from the operation-level semaphore passed to
/// [`ConcurrentLimitLayer::with_semaphore`], which controls the
/// concurrency of high-level operations (such as reads or writes).
///
/// Use this method when you need to **reuse a shared semaphore**
/// across multiple `ConcurrentLimitLayer` instances or services.
/// For simple, per-layer HTTP concurrency limits, prefer
/// [`ConcurrentLimitLayer::with_http_concurrent_limit`], which
/// constructs a new semaphore for you.
///
/// # Examples
///
/// Reuse a shared HTTP semaphore for several layers:
///
/// ```
/// # use std::sync::Arc;
/// # use mea::semaphore::Semaphore;
/// # use opendal_layer_concurrent_limit::ConcurrentLimitLayer;
/// let op_semaphore = Arc::new(Semaphore::new(1024));
/// let http_semaphore = Arc::new(Semaphore::new(64));
///
/// let layer = ConcurrentLimitLayer::with_semaphore(op_semaphore)
/// .with_http_semaphore(http_semaphore.clone());
/// ```
```
##########
core/layers/concurrent-limit/src/lib.rs:
##########
@@ -287,3 +350,126 @@ impl<R: oio::Delete> oio::Delete for
ConcurrentLimitWrapper<R> {
self.inner.close().await
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use opendal_core::Operator;
+ use opendal_core::services;
+ use std::collections::VecDeque;
+ use std::sync::Arc;
+ use std::sync::Mutex as StdMutex;
+ use std::time::Duration;
+ use tokio::time::timeout;
+
+ use futures::channel::oneshot;
+
+ // A minimal non-mea semaphore for testing the trait abstraction.
+ #[derive(Clone)]
+ struct SimpleSemaphore {
+ state: Arc<StdMutex<SimpleState>>,
+ }
+
+ struct SimpleState {
+ available: usize,
+ waiters: VecDeque<oneshot::Sender<()>>,
+ }
+
+ impl SimpleSemaphore {
+ fn new(permits: usize) -> Self {
+ Self {
+ state: Arc::new(StdMutex::new(SimpleState {
+ available: permits,
+ waiters: VecDeque::new(),
+ })),
+ }
+ }
+ }
+
+ struct SimplePermit(Arc<StdMutex<SimpleState>>);
+
+ impl Drop for SimplePermit {
+ fn drop(&mut self) {
+ let mut st = self.0.lock().expect("mutex poisoned");
+ st.available += 1;
+ if let Some(tx) = st.waiters.pop_front() {
+ let _ = tx.send(());
+ }
+ }
+ }
+
+ impl ConcurrentLimitSemaphore for SimpleSemaphore {
+ type Permit = SimplePermit;
+
+ async fn acquire(&self) -> Self::Permit {
+ let state = self.state.clone();
+ loop {
+ let acquired = {
+ let mut st = state.lock().expect("mutex poisoned");
+ if st.available > 0 {
+ st.available -= 1;
+ true
+ } else {
+ false
+ }
+ };
+ if acquired {
+ return SimplePermit(state.clone());
+ }
+
+ let (tx, rx) = oneshot::channel();
+ {
+ let mut st = state.lock().expect("mutex poisoned");
+ st.waiters.push_back(tx);
+ }
+ let _ = rx.await;
+ }
+ }
Review Comment:
The `SimpleSemaphore::acquire` implementation has a race condition. Between
checking if permits are available (line 409) and adding a waiter to the queue
(line 423), another thread could release a permit and signal a waiter that
doesn't exist yet. This could lead to lost wakeups where the acquiring task
waits indefinitely even though a permit is available.
The check-and-register pattern should be atomic: if no permits are
available, the waiter should be registered before releasing the lock, and then
re-check availability after registration to handle the race.
##########
core/layers/concurrent-limit/src/lib.rs:
##########
@@ -29,6 +30,25 @@ use mea::semaphore::Semaphore;
use opendal_core::raw::*;
use opendal_core::*;
+/// ConcurrentLimitSemaphore abstracts a semaphore-like concurrency primitive
+/// that yields an owned permit released on drop.
+pub trait ConcurrentLimitSemaphore: Send + Sync + Any + Clone + Unpin +
'static {
Review Comment:
The `Any` trait bound on `ConcurrentLimitSemaphore` appears unnecessary. The
trait doesn't perform any downcasting operations or use `type_id()`. Unless
there's a specific reason for future extensibility, this bound can be removed
to simplify the trait constraints.
##########
core/layers/concurrent-limit/src/lib.rs:
##########
@@ -81,41 +99,67 @@ use opendal_core::*;
/// # }
/// ```
#[derive(Clone)]
-pub struct ConcurrentLimitLayer {
- operation_semaphore: Arc<Semaphore>,
- http_semaphore: Option<Arc<Semaphore>>,
+pub struct ConcurrentLimitLayer<S: ConcurrentLimitSemaphore = Arc<Semaphore>> {
+ operation_semaphore: S,
+ http_semaphore: Option<S>,
}
-impl ConcurrentLimitLayer {
- /// Create a new ConcurrentLimitLayer will specify permits.
+impl ConcurrentLimitLayer<Arc<Semaphore>> {
+ /// Create a new `ConcurrentLimitLayer` with the specified number of
+ /// permits.
///
- /// This permits will applied to all operations.
+ /// These permits will be applied to all operations.
pub fn new(permits: usize) -> Self {
+ Self::with_semaphore(Arc::new(Semaphore::new(permits)))
+ }
+
+ /// Set a concurrent limit for HTTP requests.
+ ///
+ /// This convenience helper constructs a new semaphore with the specified
+ /// number of permits and calls
[`ConcurrentLimitLayer::with_http_semaphore`].
+ /// Use [`ConcurrentLimitLayer::with_http_semaphore`] directly when reusing
+ /// a shared semaphore.
+ pub fn with_http_concurrent_limit(self, permits: usize) -> Self {
+ self.with_http_semaphore(Arc::new(Semaphore::new(permits)))
+ }
+}
+
+impl<S: ConcurrentLimitSemaphore> ConcurrentLimitLayer<S> {
+ /// Create a layer with any ConcurrentLimitSemaphore implementation.
+ ///
+ /// ```
+ /// # use std::sync::Arc;
+ /// # use mea::semaphore::Semaphore;
+ /// # use opendal_layer_concurrent_limit::ConcurrentLimitLayer;
+ /// let semaphore = Arc::new(Semaphore::new(1024));
+ /// let _layer = ConcurrentLimitLayer::with_semaphore(semaphore);
+ /// ```
+ pub fn with_semaphore(operation_semaphore: S) -> Self {
Self {
- operation_semaphore: Arc::new(Semaphore::new(permits)),
+ operation_semaphore,
http_semaphore: None,
}
}
- /// Set a concurrent limit for HTTP requests.
- ///
- /// This will limit the number of concurrent HTTP requests made by the
- /// operator.
- pub fn with_http_concurrent_limit(mut self, permits: usize) -> Self {
- self.http_semaphore = Some(Arc::new(Semaphore::new(permits)));
+ /// Provide a custom HTTP concurrency semaphore instance.
+ pub fn with_http_semaphore(mut self, semaphore: S) -> Self {
+ self.http_semaphore = Some(semaphore);
self
}
}
-impl<A: Access> Layer<A> for ConcurrentLimitLayer {
- type LayeredAccess = ConcurrentLimitAccessor<A>;
+impl<A: Access, S: ConcurrentLimitSemaphore> Layer<A> for
ConcurrentLimitLayer<S>
+where
+ S::Permit: Unpin,
+{
+ type LayeredAccess = ConcurrentLimitAccessor<A, S>;
fn layer(&self, inner: A) -> Self::LayeredAccess {
let info = inner.info();
// Update http client with metrics http fetcher.
Review Comment:
This comment incorrectly references "metrics http fetcher" when it should
say "concurrent limit http fetcher". The comment appears to be copied from
another layer and wasn't updated to match the current context.
```suggestion
// Update http client with concurrent limit http fetcher.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]