This is an automated email from the ASF dual-hosted git repository.

suyanhanx pushed a commit to branch nodejs-stream
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit a515ce4f054bd986c82b45595a8c4989c0c114a9
Author: suyanhanx <[email protected]>
AuthorDate: Sun Nov 19 00:34:37 2023 +0800

    stream init
    
    Signed-off-by: suyanhanx <[email protected]>
---
 bindings/nodejs/generated.js                 |   4 +-
 bindings/nodejs/index.d.ts                   |  72 ++++++++++++++++++
 bindings/nodejs/index.js                     |  24 ++++++
 bindings/nodejs/src/lib.rs                   | 106 +++++++++++++++++++++++++++
 bindings/nodejs/tests/fixtures/random.txt    |   1 +
 bindings/nodejs/tests/suites/async.suite.mjs |  12 +++
 bindings/nodejs/tests/suites/sync.suite.mjs  |  14 ++++
 7 files changed, 232 insertions(+), 1 deletion(-)

diff --git a/bindings/nodejs/generated.js b/bindings/nodejs/generated.js
index b21cd679e..6f1f80175 100644
--- a/bindings/nodejs/generated.js
+++ b/bindings/nodejs/generated.js
@@ -271,12 +271,14 @@ if (!nativeBinding) {
   throw new Error(`Failed to load native binding`)
 }
 
-const { Capability, Operator, Entry, Metadata, Lister, BlockingLister, Layer, 
RetryLayer } = nativeBinding
+const { Capability, Operator, Entry, Metadata, BlockingWriter, Writer, Lister, 
BlockingLister, Layer, RetryLayer } = nativeBinding
 
 module.exports.Capability = Capability
 module.exports.Operator = Operator
 module.exports.Entry = Entry
 module.exports.Metadata = Metadata
+module.exports.BlockingWriter = BlockingWriter
+module.exports.Writer = Writer
 module.exports.Lister = Lister
 module.exports.BlockingLister = BlockingLister
 module.exports.Layer = Layer
diff --git a/bindings/nodejs/index.d.ts b/bindings/nodejs/index.d.ts
index 0ef6a3e94..560cd0eda 100644
--- a/bindings/nodejs/index.d.ts
+++ b/bindings/nodejs/index.d.ts
@@ -259,6 +259,8 @@ export class Operator {
    * ```
    */
   write(path: string, content: Buffer | string): Promise<void>
+  writer(path: string): Promise<Writer>
+  writerSync(path: string): BlockingWriter
   /**
    * Write bytes into path synchronously.
    *
@@ -541,6 +543,76 @@ export class Metadata {
    */
   get lastModified(): string | null
 }
+export class BlockingWriter {
+  /**
+   * # Safety
+   *
+   * > &mut self in async napi methods should be marked as unsafe
+   *
+   * Write bytes into this writer.
+   *
+   * ### Example
+   * ```javascript
+   * const writer = await op.writer("path/to/file");
+   * await writer.write(Buffer.from("hello world"));
+   * await writer.close();
+   * ```
+   */
+  write(content: Buffer | string): void
+  /**
+   * # Safety
+   *
+   * > &mut self in async napi methods should be marked as unsafe
+   *
+   * Close this writer.
+   *
+   * ### Example
+   *
+   * ```javascript
+   * const writer = op.writerSync("path/to/file");
+   * writer.write(Buffer.from("hello world"));
+   * writer.close();
+   * ```
+   */
+  close(): void
+}
+/**
+ * Writer
+ *
+ * Could be used to write bytes into a file.
+ */
+export class Writer {
+  /**
+   * # Safety
+   *
+   * > &mut self in async napi methods should be marked as unsafe
+   *
+   * Write bytes into this writer.
+   *
+   * ### Example
+   * ```javascript
+   * const writer = await op.writer("path/to/file");
+   * await writer.write(Buffer.from("hello world"));
+   * await writer.close();
+   * ```
+   */
+  write(content: Buffer | string): Promise<void>
+  /**
+   * # Safety
+   *
+   * > &mut self in async napi methods should be marked as unsafe
+   *
+   * Close this writer.
+   *
+   * ### Example
+   * ```javascript
+   * const writer = await op.writer("path/to/file");
+   * await writer.write(Buffer.from("hello world"));
+   * await writer.close();
+   * ```
+   */
+  close(): Promise<void>
+}
 export class Lister {
   /**
    * # Safety
diff --git a/bindings/nodejs/index.js b/bindings/nodejs/index.js
index c1af66747..c60016a08 100644
--- a/bindings/nodejs/index.js
+++ b/bindings/nodejs/index.js
@@ -19,9 +19,33 @@
 
 /// <reference types="node" />
 
+const { Writable } = require('node:stream')
+
+class WriterStream extends Writable {
+  constructor(operator, path, options) {
+    super(options)
+    this.writer = operator.writerSync(path)
+  }
+
+  _write(chunk, encoding, callback) {
+    this.writer.write(chunk)
+    if (typeof callback === 'function') {
+      callback()
+    }
+  }
+
+  _final(callback) {
+    this.writer.close()
+    if (typeof callback === 'function') {
+      callback()
+    }
+  }
+}
+
 const { Operator, RetryLayer } = require('./generated.js')
 
 module.exports.Operator = Operator
 module.exports.layers = {
   RetryLayer,
 }
+module.exports.WriterStream = WriterStream
diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs
index e243ce8b1..da8bbb574 100644
--- a/bindings/nodejs/src/lib.rs
+++ b/bindings/nodejs/src/lib.rs
@@ -205,6 +205,18 @@ impl Operator {
         self.0.write(&path, c).await.map_err(format_napi_error)
     }
 
+    #[napi]
+    pub async fn writer(&self, path: String) -> Result<Writer> {
+        let w = self.0.writer(&path).await.map_err(format_napi_error)?;
+        Ok(Writer(w))
+    }
+
+    #[napi]
+    pub fn writer_sync(&self, path: String) -> Result<BlockingWriter>{
+        let w = self.0.blocking().writer(&path).map_err(format_napi_error)?;
+        Ok(BlockingWriter(w))
+    }
+
     /// Write bytes into path synchronously.
     ///
     /// ### Example
@@ -616,6 +628,100 @@ impl Metadata {
     }
 }
 
+#[napi]
+pub struct BlockingWriter(opendal::BlockingWriter);
+
+#[napi]
+impl BlockingWriter{
+    /// # Safety
+    ///
+    /// > &mut self in async napi methods should be marked as unsafe
+    ///
+    /// Write bytes into this writer.
+    ///
+    /// ### Example
+    /// ```javascript
+    /// const writer = await op.writer("path/to/file");
+    /// await writer.write(Buffer.from("hello world"));
+    /// await writer.close();
+    /// ```
+    #[napi]
+    pub unsafe fn write(&mut self, content: Either<Buffer, String>) -> 
Result<()> {
+        let c = match content {
+            Either::A(buf) => buf.as_ref().to_owned(),
+            Either::B(s) => s.into_bytes(),
+        };
+        self.0.write(c).map_err(format_napi_error)
+    }
+
+    /// # Safety
+    ///
+    /// > &mut self in async napi methods should be marked as unsafe
+    ///
+    /// Close this writer.
+    ///
+    /// ### Example
+    ///
+    /// ```javascript
+    /// const writer = op.writerSync("path/to/file");
+    /// writer.write(Buffer.from("hello world"));
+    /// writer.close();
+    /// ```
+    #[napi]
+    pub unsafe fn close(&mut self) -> Result<()> {
+        self.0.close().map_err(format_napi_error)
+    }
+}
+
+/// Writer
+///
+/// Could be used to write bytes into a file.
+#[napi]
+pub struct Writer(opendal::Writer);
+
+#[napi]
+impl Writer {
+    /// # Safety
+    ///
+    /// > &mut self in async napi methods should be marked as unsafe
+    ///
+    /// Write bytes into this writer.
+    ///
+    /// ### Example
+    /// ```javascript
+    /// const writer = await op.writer("path/to/file");
+    /// await writer.write(Buffer.from("hello world"));
+    /// await writer.close();
+    /// ```
+    #[napi]
+    pub async unsafe fn write(&mut self, content: Either<Buffer, String>) -> 
Result<()> {
+        let c = match content {
+            Either::A(buf) => buf.as_ref().to_owned(),
+            Either::B(s) => s.into_bytes(),
+        };
+        self.0.write(c).await.map_err(format_napi_error)
+    }
+
+
+
+    /// # Safety
+    ///
+    /// > &mut self in async napi methods should be marked as unsafe
+    ///
+    /// Close this writer.
+    ///
+    /// ### Example
+    /// ```javascript
+    /// const writer = await op.writer("path/to/file");
+    /// await writer.write(Buffer.from("hello world"));
+    /// await writer.close();
+    /// ```
+    #[napi]
+    pub async unsafe fn close(&mut self) -> Result<()> {
+        self.0.close().await.map_err(format_napi_error)
+    }
+}
+
 #[napi]
 pub struct Lister(opendal::Lister);
 
diff --git a/bindings/nodejs/tests/fixtures/random.txt 
b/bindings/nodejs/tests/fixtures/random.txt
new file mode 100644
index 000000000..9896b0e28
--- /dev/null
+++ b/bindings/nodejs/tests/fixtures/random.txt
@@ -0,0 +1 @@
+opendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalop
 [...]
diff --git a/bindings/nodejs/tests/suites/async.suite.mjs 
b/bindings/nodejs/tests/suites/async.suite.mjs
index 4e6545661..f834a7008 100644
--- a/bindings/nodejs/tests/suites/async.suite.mjs
+++ b/bindings/nodejs/tests/suites/async.suite.mjs
@@ -19,6 +19,7 @@
 
 import { randomUUID } from 'node:crypto'
 import { test } from 'vitest'
+import { generateBytes } from '../utils.mjs'
 
 export function run(operator) {
   test('async stat not exist files', async () => {
@@ -30,4 +31,15 @@ export function run(operator) {
       assert.include(error.message, 'NotFound')
     }
   })
+
+  test('async writer', async () => {
+    const filename = `random_file_${randomUUID()}`
+    const writer = await operator.writer(filename)
+    const data = generateBytes(1024)
+    await writer.write(data)
+    await writer.write(data)
+    await writer.close()
+    const stat = await operator.stat(filename)
+    assert.equal(stat.contentLength, data.length * 2)
+  })
 }
diff --git a/bindings/nodejs/tests/suites/sync.suite.mjs 
b/bindings/nodejs/tests/suites/sync.suite.mjs
index d27a3895a..732f8e5b2 100644
--- a/bindings/nodejs/tests/suites/sync.suite.mjs
+++ b/bindings/nodejs/tests/suites/sync.suite.mjs
@@ -19,6 +19,9 @@
 
 import { randomUUID } from 'node:crypto'
 import { test } from 'vitest'
+import fs from 'node:fs'
+import path from 'node:path'
+import { WriterStream } from '../../index.js'
 
 export function run(operator) {
   test('sync stat not exist files', () => {
@@ -30,4 +33,15 @@ export function run(operator) {
       assert.include(error.message, 'NotFound')
     }
   })
+
+  test('blocking writer stream', async () => {
+    const filename = `random_file_${randomUUID()}`
+    const r = fs.createReadStream(path.resolve(__dirname, 
'../fixtures/random.txt'))
+    const w = new WriterStream(operator, filename)
+    r.pipe(w)
+    w.on('finish', () => {
+      const t = operator.statSync(filename)
+      assert.equal(t.contentLength, fs.statSync(path.resolve(__dirname, 
'../fixtures/random.txt')).size)
+    })
+  })
 }

Reply via email to