This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new e8cbfbfae feat(bindings/nodejs): add concurrent limit layer (#6739)
e8cbfbfae is described below

commit e8cbfbfae9a97bdbd6f0269fed33b6710effb014
Author: Kingsword <[email protected]>
AuthorDate: Mon Oct 27 17:02:30 2025 +0800

    feat(bindings/nodejs): add concurrent limit layer (#6739)
    
    * feat(bindings/nodejs): add ConcurrentLimitLayer
    
    * add test
    
    * format code
    
    * ci
    
    * fix test
    
    * ci
---
 .github/workflows/ci_bindings_nodejs.yml     |  10 +-
 bindings/nodejs/generated.d.ts               |  61 ++++++-
 bindings/nodejs/generated.js                 |   1 +
 bindings/nodejs/index.cjs                    |  11 +-
 bindings/nodejs/index.mjs                    |   3 +-
 bindings/nodejs/src/layer.rs                 | 235 +++++++++++++++++++++++++++
 bindings/nodejs/src/lib.rs                   | 133 +--------------
 bindings/nodejs/tests/suites/index.mjs       |   2 +
 bindings/nodejs/tests/suites/layer.suite.mjs |  55 +++++++
 9 files changed, 375 insertions(+), 136 deletions(-)

diff --git a/.github/workflows/ci_bindings_nodejs.yml 
b/.github/workflows/ci_bindings_nodejs.yml
index 3fb3287e1..2bd6bfa1f 100644
--- a/.github/workflows/ci_bindings_nodejs.yml
+++ b/.github/workflows/ci_bindings_nodejs.yml
@@ -40,6 +40,10 @@ on:
       - ".github/workflows/ci_bindings_nodejs.yml"
   workflow_dispatch:
 
+concurrency:
+  group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
+  cancel-in-progress: true
+
 jobs:
   test:
     runs-on: ubuntu-latest
@@ -72,6 +76,9 @@ jobs:
       - name: Check format
         run: pnpm exec prettier --check .
 
+      - name: Check Clippy
+        run: cargo clippy -- -D warnings
+
       - name: Build
         run: pnpm build
 
@@ -80,6 +87,3 @@ jobs:
 
       - name: Test
         run: cargo test --no-fail-fast
-
-      - name: Check Clippy
-        run: cargo clippy -- -D warnings
diff --git a/bindings/nodejs/generated.d.ts b/bindings/nodejs/generated.d.ts
index ab99e28dc..c2219d529 100644
--- a/bindings/nodejs/generated.d.ts
+++ b/bindings/nodejs/generated.d.ts
@@ -205,6 +205,65 @@ export declare class Capability {
   get shared(): boolean
 }
 
+/**
+ * Concurrent limit layer
+ *
+ * Add concurrent request limit.
+ *
+ * # Notes
+ *
+ * Users can control how many concurrent connections could be established
+ * between OpenDAL and underlying storage services.
+ *
+ * All operators wrapped by this layer will share a common semaphore.
+ *
+ * # Examples
+ *
+ * ```javascript
+ * const op = new Operator("fs", { root: "/tmp" })
+ *
+ * // Create a concurrent limit layer with 1024 permits
+ * const limit = new ConcurrentLimitLayer(1024);
+ * op.layer(limit.build());
+ * ```
+ *
+ * With HTTP concurrent limit:
+ *
+ * ```javascript
+ * const limit = new ConcurrentLimitLayer(1024);
+ * limit.httpPermits = 512;
+ * op.layer(limit.build());
+ * ```
+ */
+export declare class ConcurrentLimitLayer {
+  /**
+   * Create a new ConcurrentLimitLayer with specified permits.
+   *
+   * This permits will be applied to all operations.
+   *
+   * # Arguments
+   *
+   * * `permits` - The maximum number of concurrent operations allowed.
+   */
+  constructor(permits: number)
+  /**
+   * Set a concurrent limit for HTTP requests.
+   *
+   * This will limit the number of concurrent HTTP requests made by the 
operator.
+   *
+   * # Arguments
+   *
+   * * `v` - The maximum number of concurrent HTTP requests allowed.
+   */
+  set httpPermits(v: number)
+  /**
+   * Build the layer.
+   *
+   * Returns an `External<Layer>` that can be used with `Operator.layer()`.
+   */
+  build(): ExternalObject<Layer>
+}
+
 /** Entry returned by Lister or BlockingLister to represent a path, and it's a 
relative metadata. */
 export declare class Entry {
   /** Return the path of this entry. */
@@ -702,7 +761,7 @@ export declare class Reader {
  * # Examples
  *
  * ```javascript
- * const op = new Operator("file", { root: "/tmp" })
+ * const op = new Operator("fs", { root: "/tmp" })
  *
  * const retry = new RetryLayer();
  * retry.max_times = 3;
diff --git a/bindings/nodejs/generated.js b/bindings/nodejs/generated.js
index e019e715f..a937b9d68 100644
--- a/bindings/nodejs/generated.js
+++ b/bindings/nodejs/generated.js
@@ -531,6 +531,7 @@ module.exports.BlockingLister = nativeBinding.BlockingLister
 module.exports.BlockingReader = nativeBinding.BlockingReader
 module.exports.BlockingWriter = nativeBinding.BlockingWriter
 module.exports.Capability = nativeBinding.Capability
+module.exports.ConcurrentLimitLayer = nativeBinding.ConcurrentLimitLayer
 module.exports.Entry = nativeBinding.Entry
 module.exports.Layer = nativeBinding.Layer
 module.exports.Lister = nativeBinding.Lister
diff --git a/bindings/nodejs/index.cjs b/bindings/nodejs/index.cjs
index 9b17b4685..f976edc82 100644
--- a/bindings/nodejs/index.cjs
+++ b/bindings/nodejs/index.cjs
@@ -119,7 +119,15 @@ class BlockingWriteStream extends Writable {
   }
 }
 
-const { Operator, RetryLayer, BlockingReader, Reader, BlockingWriter, Writer } 
= require('./generated.js')
+const {
+  Operator,
+  RetryLayer,
+  ConcurrentLimitLayer,
+  BlockingReader,
+  Reader,
+  BlockingWriter,
+  Writer,
+} = require('./generated.js')
 
 BlockingReader.prototype.createReadStream = function (options) {
   return new BlockingReadStream(this, options)
@@ -140,4 +148,5 @@ Writer.prototype.createWriteStream = function (options) {
 module.exports.Operator = Operator
 module.exports.layers = {
   RetryLayer,
+  ConcurrentLimitLayer,
 }
diff --git a/bindings/nodejs/index.mjs b/bindings/nodejs/index.mjs
index ccafda569..1eac4ec77 100644
--- a/bindings/nodejs/index.mjs
+++ b/bindings/nodejs/index.mjs
@@ -120,7 +120,7 @@ class BlockingWriteStream extends Writable {
 }
 
 import * as generated from './generated.js'
-const { Operator, RetryLayer, BlockingReader, Reader, BlockingWriter, Writer } 
= generated
+const { Operator, RetryLayer, ConcurrentLimitLayer, BlockingReader, Reader, 
BlockingWriter, Writer } = generated
 
 BlockingReader.prototype.createReadStream = function (options) {
   return new BlockingReadStream(this, options)
@@ -140,6 +140,7 @@ Writer.prototype.createWriteStream = function (options) {
 
 export const layers = {
   RetryLayer,
+  ConcurrentLimitLayer,
 }
 
 export * from './generated.js'
diff --git a/bindings/nodejs/src/layer.rs b/bindings/nodejs/src/layer.rs
new file mode 100644
index 000000000..80b2c65d7
--- /dev/null
+++ b/bindings/nodejs/src/layer.rs
@@ -0,0 +1,235 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::time::Duration;
+
+use napi::bindgen_prelude::External;
+
+pub trait NodeLayer: Send + Sync {
+    fn layer(&self, op: opendal::Operator) -> opendal::Operator;
+}
+
+/// A public layer wrapper
+#[napi]
+pub struct Layer {
+    pub(crate) inner: Box<dyn NodeLayer>,
+}
+
+impl NodeLayer for opendal::layers::RetryLayer {
+    fn layer(&self, op: opendal::Operator) -> opendal::Operator {
+        op.layer(self.clone())
+    }
+}
+
+/// Retry layer
+///
+/// Add retry for temporary failed operations.
+///
+/// # Notes
+///
+/// This layer will retry failed operations when [`Error::is_temporary`]
+/// returns true.
+/// If the operation still failed, this layer will set error to
+/// `Persistent` which means error has been retried.
+///
+/// `write` and `blocking_write` don't support retry so far,
+/// visit [this issue](https://github.com/apache/opendal/issues/1223) for more 
details.
+///
+/// # Examples
+///
+/// ```javascript
+/// const op = new Operator("fs", { root: "/tmp" })
+///
+/// const retry = new RetryLayer();
+/// retry.max_times = 3;
+/// retry.jitter = true;
+///
+/// op.layer(retry.build());
+/// ```
+#[derive(Default)]
+#[napi]
+pub struct RetryLayer {
+    jitter: bool,
+    max_times: Option<u32>,
+    factor: Option<f64>,
+    max_delay: Option<f64>,
+    min_delay: Option<f64>,
+}
+
+#[napi]
+impl RetryLayer {
+    #[napi(constructor)]
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Set jitter of current backoff.
+    ///
+    /// If jitter is enabled, ExponentialBackoff will add a random jitter in 
`[0, min_delay)`
+    /// to current delay.
+    #[napi(setter)]
+    pub fn jitter(&mut self, v: bool) {
+        self.jitter = v;
+    }
+
+    /// Set max_times of current backoff.
+    ///
+    /// Backoff will return `None` if max times are reached.
+    #[napi(setter)]
+    pub fn max_times(&mut self, v: u32) {
+        self.max_times = Some(v);
+    }
+
+    /// Set factor of current backoff.
+    ///
+    /// # Panics
+    ///
+    /// This function will panic if the input factor is smaller than `1.0`.
+    #[napi(setter)]
+    pub fn factor(&mut self, v: f64) {
+        self.factor = Some(v);
+    }
+
+    /// Set max_delay of current backoff.
+    ///
+    /// Delay will not increase if the current delay is larger than max_delay.
+    ///
+    /// # Notes
+    ///
+    /// - The unit of max_delay is millisecond.
+    #[napi(setter)]
+    pub fn max_delay(&mut self, v: f64) {
+        self.max_delay = Some(v);
+    }
+
+    /// Set min_delay of current backoff.
+    ///
+    /// # Notes
+    ///
+    /// - The unit of min_delay is millisecond.
+    #[napi(setter)]
+    pub fn min_delay(&mut self, v: f64) {
+        self.min_delay = Some(v);
+    }
+
+    #[napi]
+    pub fn build(&self) -> External<Layer> {
+        let mut l = opendal::layers::RetryLayer::default();
+        if self.jitter {
+            l = l.with_jitter();
+        }
+        if let Some(max_times) = self.max_times {
+            l = l.with_max_times(max_times as usize);
+        }
+        if let Some(factor) = self.factor {
+            l = l.with_factor(factor as f32);
+        }
+        if let Some(max_delay) = self.max_delay {
+            l = l.with_max_delay(Duration::from_millis(max_delay as u64));
+        }
+        if let Some(min_delay) = self.min_delay {
+            l = l.with_min_delay(Duration::from_millis(min_delay as u64));
+        }
+
+        External::new(Layer { inner: Box::new(l) })
+    }
+}
+
+impl NodeLayer for opendal::layers::ConcurrentLimitLayer {
+    fn layer(&self, op: opendal::Operator) -> opendal::Operator {
+        op.layer(self.clone())
+    }
+}
+
+/// Concurrent limit layer
+///
+/// Add concurrent request limit.
+///
+/// # Notes
+///
+/// Users can control how many concurrent connections could be established
+/// between OpenDAL and underlying storage services.
+///
+/// All operators wrapped by this layer will share a common semaphore.
+///
+/// # Examples
+///
+/// ```javascript
+/// const op = new Operator("fs", { root: "/tmp" })
+///
+/// // Create a concurrent limit layer with 1024 permits
+/// const limit = new ConcurrentLimitLayer(1024);
+/// op.layer(limit.build());
+/// ```
+///
+/// With HTTP concurrent limit:
+///
+/// ```javascript
+/// const limit = new ConcurrentLimitLayer(1024);
+/// limit.httpPermits = 512;
+/// op.layer(limit.build());
+/// ```
+#[napi]
+pub struct ConcurrentLimitLayer {
+    permits: i64,
+    http_permits: Option<i64>,
+}
+
+#[napi]
+impl ConcurrentLimitLayer {
+    /// Create a new ConcurrentLimitLayer with specified permits.
+    ///
+    /// This permits will be applied to all operations.
+    ///
+    /// # Arguments
+    ///
+    /// * `permits` - The maximum number of concurrent operations allowed.
+    #[napi(constructor)]
+    pub fn new(permits: i64) -> Self {
+        Self {
+            permits,
+            http_permits: None,
+        }
+    }
+
+    /// Set a concurrent limit for HTTP requests.
+    ///
+    /// This will limit the number of concurrent HTTP requests made by the 
operator.
+    ///
+    /// # Arguments
+    ///
+    /// * `v` - The maximum number of concurrent HTTP requests allowed.
+    #[napi(setter)]
+    pub fn http_permits(&mut self, v: i64) {
+        self.http_permits = Some(v);
+    }
+
+    /// Build the layer.
+    ///
+    /// Returns an `External<Layer>` that can be used with `Operator.layer()`.
+    #[napi]
+    pub fn build(&self) -> External<Layer> {
+        let permits = self.permits;
+        let mut l = opendal::layers::ConcurrentLimitLayer::new(permits as 
usize);
+
+        if let Some(http_permits) = self.http_permits {
+            l = l.with_http_concurrent_limit(http_permits as usize);
+        }
+
+        External::new(Layer { inner: Box::new(l) })
+    }
+}
diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs
index 736bc1e9e..932539ea6 100644
--- a/bindings/nodejs/src/lib.rs
+++ b/bindings/nodejs/src/lib.rs
@@ -31,7 +31,10 @@ use opendal::options::{
     DeleteOptions, ListOptions, ReadOptions, ReaderOptions, StatOptions, 
WriteOptions,
 };
 
+use crate::layer::Layer;
+
 mod capability;
+mod layer;
 mod options;
 
 #[napi]
@@ -1052,16 +1055,6 @@ impl PresignedRequest {
     }
 }
 
-pub trait NodeLayer: Send + Sync {
-    fn layer(&self, op: opendal::Operator) -> opendal::Operator;
-}
-
-/// A public layer wrapper
-#[napi]
-pub struct Layer {
-    inner: Box<dyn NodeLayer>,
-}
-
 #[napi]
 impl Operator {
     /// Add a layer to this operator.
@@ -1079,126 +1072,6 @@ impl Operator {
     }
 }
 
-impl NodeLayer for opendal::layers::RetryLayer {
-    fn layer(&self, op: opendal::Operator) -> opendal::Operator {
-        op.layer(self.clone())
-    }
-}
-
-/// Retry layer
-///
-/// Add retry for temporary failed operations.
-///
-/// # Notes
-///
-/// This layer will retry failed operations when [`Error::is_temporary`]
-/// returns true.
-/// If the operation still failed, this layer will set error to
-/// `Persistent` which means error has been retried.
-///
-/// `write` and `blocking_write` don't support retry so far,
-/// visit [this issue](https://github.com/apache/opendal/issues/1223) for more 
details.
-///
-/// # Examples
-///
-/// ```javascript
-/// const op = new Operator("file", { root: "/tmp" })
-///
-/// const retry = new RetryLayer();
-/// retry.max_times = 3;
-/// retry.jitter = true;
-///
-/// op.layer(retry.build());
-/// ```
-#[derive(Default)]
-#[napi]
-pub struct RetryLayer {
-    jitter: bool,
-    max_times: Option<u32>,
-    factor: Option<f64>,
-    max_delay: Option<f64>,
-    min_delay: Option<f64>,
-}
-
-#[napi]
-impl RetryLayer {
-    #[napi(constructor)]
-    pub fn new() -> Self {
-        Self::default()
-    }
-
-    /// Set jitter of current backoff.
-    ///
-    /// If jitter is enabled, ExponentialBackoff will add a random jitter in 
`[0, min_delay)`
-    /// to current delay.
-    #[napi(setter)]
-    pub fn jitter(&mut self, v: bool) {
-        self.jitter = v;
-    }
-
-    /// Set max_times of current backoff.
-    ///
-    /// Backoff will return `None` if max times are reached.
-    #[napi(setter)]
-    pub fn max_times(&mut self, v: u32) {
-        self.max_times = Some(v);
-    }
-
-    /// Set factor of current backoff.
-    ///
-    /// # Panics
-    ///
-    /// This function will panic if the input factor is smaller than `1.0`.
-    #[napi(setter)]
-    pub fn factor(&mut self, v: f64) {
-        self.factor = Some(v);
-    }
-
-    /// Set max_delay of current backoff.
-    ///
-    /// Delay will not increase if the current delay is larger than max_delay.
-    ///
-    /// # Notes
-    ///
-    /// - The unit of max_delay is millisecond.
-    #[napi(setter)]
-    pub fn max_delay(&mut self, v: f64) {
-        self.max_delay = Some(v);
-    }
-
-    /// Set min_delay of current backoff.
-    ///
-    /// # Notes
-    ///
-    /// - The unit of min_delay is millisecond.
-    #[napi(setter)]
-    pub fn min_delay(&mut self, v: f64) {
-        self.min_delay = Some(v);
-    }
-
-    #[napi]
-    pub fn build(&self) -> External<Layer> {
-        let mut l = opendal::layers::RetryLayer::default();
-        if self.jitter {
-            l = l.with_jitter();
-        }
-        if let Some(max_times) = self.max_times {
-            l = l.with_max_times(max_times as usize);
-        }
-        if let Some(factor) = self.factor {
-            l = l.with_factor(factor as f32);
-        }
-        if let Some(max_delay) = self.max_delay {
-            l = l.with_max_delay(Duration::from_millis(max_delay as u64));
-        }
-        if let Some(min_delay) = self.min_delay {
-            l = l.with_min_delay(Duration::from_millis(min_delay as u64));
-        }
-
-        External::new(Layer { inner: Box::new(l) })
-    }
-}
-
 /// Format opendal error to napi error.
 ///
 /// FIXME: handle error correctly.
diff --git a/bindings/nodejs/tests/suites/index.mjs 
b/bindings/nodejs/tests/suites/index.mjs
index f7848c2dd..791b15c48 100644
--- a/bindings/nodejs/tests/suites/index.mjs
+++ b/bindings/nodejs/tests/suites/index.mjs
@@ -24,6 +24,7 @@ import { checkRandomRootEnabled, generateRandomRoot, 
loadConfigFromEnv } from '.
 import { run as AsyncIOTestRun } from './async.suite.mjs'
 import { run as ServicesTestRun } from './services.suite.mjs'
 import { run as SyncIOTestRun } from './sync.suite.mjs'
+import { run as LayerTestRun } from './layer.suite.mjs'
 import { run as AsyncStatOptionsTestRun } from './asyncStatOptions.suite.mjs'
 import { run as SyncStatOptionsTestRun } from './syncStatOptions.suite.mjs'
 import { run as AsyncReadOptionsTestRun } from './asyncReadOptions.suite.mjs'
@@ -63,6 +64,7 @@ export function runner(testName, scheme) {
     AsyncIOTestRun(operator)
     ServicesTestRun(operator)
     SyncIOTestRun(operator)
+    LayerTestRun(operator)
     AsyncStatOptionsTestRun(operator)
     SyncStatOptionsTestRun(operator)
     AsyncReadOptionsTestRun(operator)
diff --git a/bindings/nodejs/tests/suites/layer.suite.mjs 
b/bindings/nodejs/tests/suites/layer.suite.mjs
new file mode 100644
index 000000000..dc9f0307c
--- /dev/null
+++ b/bindings/nodejs/tests/suites/layer.suite.mjs
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { test, assert } from 'vitest'
+
+import { RetryLayer, ConcurrentLimitLayer } from '../../index.mjs'
+
+/**
+ * @param {import("../../index").Operator} op
+ */
+export function run(op) {
+  test('test operator with retry layer', () => {
+    const retryLayer = new RetryLayer()
+    retryLayer.maxTimes = 3
+    retryLayer.jitter = true
+
+    const layerOp = op.layer(retryLayer.build())
+    assert.ok(layerOp)
+    assert.ok(layerOp.capability())
+  })
+
+  test('test operator with concurrent limit layer', () => {
+    const concurrentLimitLayer = new ConcurrentLimitLayer(1024)
+    const layerOp = op.layer(concurrentLimitLayer.build())
+
+    assert.ok(layerOp)
+    assert.ok(layerOp.capability())
+  })
+
+  test('test operator with concurrent limit layer and http permits', () => {
+    const concurrentLimitLayer = new ConcurrentLimitLayer(1024)
+    concurrentLimitLayer.httpPermits = 512
+
+    const layerOp = op.layer(concurrentLimitLayer.build())
+
+    assert.ok(layerOp)
+    assert.ok(layerOp.capability())
+  })
+}

Reply via email to