Xuanwo commented on code in PR #6618:
URL: https://github.com/apache/opendal/pull/6618#discussion_r2567276751


##########
bindings/java/src/layer.rs:
##########
@@ -59,6 +59,6 @@ pub extern "system" fn 
Java_org_apache_opendal_layer_ConcurrentLimitLayer_doLaye
     permits: jlong,
 ) -> jlong {
     let op = unsafe { &*op };
-    let concurrent_limit = ConcurrentLimitLayer::new(permits as usize);
+    let concurrent_limit = ConcurrentLimitLayer::with_permits(permits as 
usize);

Review Comment:
   I don't think we need to this change this API.



##########
core/src/layers/concurrent_limit.rs:
##########
@@ -15,20 +15,52 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::fmt::Debug;
 use std::pin::Pin;
 use std::sync::Arc;
 use std::task::Context;
 use std::task::Poll;
 
 use futures::Stream;
 use futures::StreamExt;
-use tokio::sync::OwnedSemaphorePermit;
+use std::any::Any;
+use std::future::Future;
 use tokio::sync::Semaphore;
 
 use crate::raw::*;
 use crate::*;
 
+/// ConcurrencySemaphore abstracts a semaphore-like concurrency primitive
+/// that yields an owned permit released on drop. It mirrors RetryLayer's
+/// interceptor pattern by serving as a generic extension point for the layer.
+pub trait ConcurrencySemaphore: Send + Sync + Any + Clone + 'static {

Review Comment:
   How about naming it after the layer like `ConcurrentLimitSemaphore`?



##########
core/src/layers/concurrent_limit.rs:
##########
@@ -81,41 +121,78 @@ use crate::*;
 /// # }
 /// ```
 #[derive(Clone)]
-pub struct ConcurrentLimitLayer {
-    operation_semaphore: Arc<Semaphore>,
-    http_semaphore: Option<Arc<Semaphore>>,
+pub struct ConcurrentLimitLayer<S: ConcurrencySemaphore = TokioSemaphore> {
+    operation_semaphore: Arc<S>,
+    http_semaphore: Option<Arc<S>>,
 }
 
-impl ConcurrentLimitLayer {
-    /// Create a new ConcurrentLimitLayer will specify permits.
+impl ConcurrentLimitLayer<TokioSemaphore> {
+    /// Create a new `ConcurrentLimitLayer` with a custom semaphore.
     ///
-    /// This permits will applied to all operations.
-    pub fn new(permits: usize) -> Self {
+    /// The provided semaphore will be shared by every operator wrapped by this
+    /// layer, giving callers full control over its configuration.
+    pub fn new(operation_semaphore: Arc<Semaphore>) -> Self {
+        
Self::with_semaphore(Arc::new(TokioSemaphore::new(operation_semaphore)))
+    }
+
+    // with_semaphore is provided by the generic impl below for the default 
Tokio adapter.
+
+    /// Create a new `ConcurrentLimitLayer` with the specified number of
+    /// permits.
+    pub fn with_permits(permits: usize) -> Self {
+        Self::new(Arc::new(Semaphore::new(permits)))
+    }

Review Comment:
   We can remove this API.



##########
core/src/layers/concurrent_limit.rs:
##########
@@ -81,41 +121,78 @@ use crate::*;
 /// # }
 /// ```
 #[derive(Clone)]
-pub struct ConcurrentLimitLayer {
-    operation_semaphore: Arc<Semaphore>,
-    http_semaphore: Option<Arc<Semaphore>>,
+pub struct ConcurrentLimitLayer<S: ConcurrencySemaphore = TokioSemaphore> {
+    operation_semaphore: Arc<S>,
+    http_semaphore: Option<Arc<S>>,
 }
 
-impl ConcurrentLimitLayer {
-    /// Create a new ConcurrentLimitLayer will specify permits.
+impl ConcurrentLimitLayer<TokioSemaphore> {
+    /// Create a new `ConcurrentLimitLayer` with a custom semaphore.
     ///
-    /// This permits will applied to all operations.
-    pub fn new(permits: usize) -> Self {
+    /// The provided semaphore will be shared by every operator wrapped by this
+    /// layer, giving callers full control over its configuration.
+    pub fn new(operation_semaphore: Arc<Semaphore>) -> Self {

Review Comment:
   Please don't change public API unless we have to.



##########
core/src/layers/concurrent_limit.rs:
##########
@@ -81,41 +121,78 @@ use crate::*;
 /// # }
 /// ```
 #[derive(Clone)]
-pub struct ConcurrentLimitLayer {
-    operation_semaphore: Arc<Semaphore>,
-    http_semaphore: Option<Arc<Semaphore>>,
+pub struct ConcurrentLimitLayer<S: ConcurrencySemaphore = TokioSemaphore> {
+    operation_semaphore: Arc<S>,
+    http_semaphore: Option<Arc<S>>,
 }
 
-impl ConcurrentLimitLayer {
-    /// Create a new ConcurrentLimitLayer will specify permits.
+impl ConcurrentLimitLayer<TokioSemaphore> {
+    /// Create a new `ConcurrentLimitLayer` with a custom semaphore.
     ///
-    /// This permits will applied to all operations.
-    pub fn new(permits: usize) -> Self {
+    /// The provided semaphore will be shared by every operator wrapped by this
+    /// layer, giving callers full control over its configuration.
+    pub fn new(operation_semaphore: Arc<Semaphore>) -> Self {
+        
Self::with_semaphore(Arc::new(TokioSemaphore::new(operation_semaphore)))
+    }
+
+    // with_semaphore is provided by the generic impl below for the default 
Tokio adapter.
+
+    /// Create a new `ConcurrentLimitLayer` with the specified number of
+    /// permits.
+    pub fn with_permits(permits: usize) -> Self {
+        Self::new(Arc::new(Semaphore::new(permits)))
+    }
+
+    /// Set a concurrent limit for HTTP requests.
+    ///
+    /// This convenience helper constructs a new semaphore with the specified
+    /// number of permits and calls 
[`ConcurrentLimitLayer::with_http_semaphore`].
+    /// Use [`ConcurrentLimitLayer::with_http_semaphore`] directly when reusing
+    /// a shared semaphore.
+    pub fn with_http_concurrent_limit(self, permits: usize) -> Self {
+        self.with_http_semaphore(Arc::new(Semaphore::new(permits)))
+    }
+
+    /// Provide a custom semaphore to limit HTTP request concurrency.
+    ///
+    /// Sharing the same semaphore across layers allows coordinating HTTP
+    /// usage with other parts of the application.
+    pub fn with_http_semaphore(mut self, semaphore: Arc<Semaphore>) -> Self {
+        self.http_semaphore = Some(Arc::new(TokioSemaphore::new(semaphore)));
+        self
+    }
+
+    // with_http_concurrency is provided by the generic impl below.
+}
+
+impl<S: ConcurrencySemaphore> ConcurrentLimitLayer<S> {
+    /// Create a layer with any ConcurrencySemaphore implementation.
+    pub fn with_semaphore(operation_semaphore: Arc<S>) -> Self {
         Self {
-            operation_semaphore: Arc::new(Semaphore::new(permits)),
+            operation_semaphore,
             http_semaphore: None,
         }
     }
 
-    /// Set a concurrent limit for HTTP requests.
-    ///
-    /// This will limit the number of concurrent HTTP requests made by the
-    /// operator.
-    pub fn with_http_concurrent_limit(mut self, permits: usize) -> Self {
-        self.http_semaphore = Some(Arc::new(Semaphore::new(permits)));
+    /// Provide a custom HTTP concurrency semaphore instance.
+    pub fn with_http_concurrency(mut self, semaphore: Arc<S>) -> Self {
+        self.http_semaphore = Some(semaphore);

Review Comment:
   It's better to always follow the same API naming pattern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to