This is an automated email from the ASF dual-hosted git repository. suyanhanx pushed a commit to branch nodejs-stream in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit a515ce4f054bd986c82b45595a8c4989c0c114a9 Author: suyanhanx <[email protected]> AuthorDate: Sun Nov 19 00:34:37 2023 +0800 stream init Signed-off-by: suyanhanx <[email protected]> --- bindings/nodejs/generated.js | 4 +- bindings/nodejs/index.d.ts | 72 ++++++++++++++++++ bindings/nodejs/index.js | 24 ++++++ bindings/nodejs/src/lib.rs | 106 +++++++++++++++++++++++++++ bindings/nodejs/tests/fixtures/random.txt | 1 + bindings/nodejs/tests/suites/async.suite.mjs | 12 +++ bindings/nodejs/tests/suites/sync.suite.mjs | 14 ++++ 7 files changed, 232 insertions(+), 1 deletion(-) diff --git a/bindings/nodejs/generated.js b/bindings/nodejs/generated.js index b21cd679e..6f1f80175 100644 --- a/bindings/nodejs/generated.js +++ b/bindings/nodejs/generated.js @@ -271,12 +271,14 @@ if (!nativeBinding) { throw new Error(`Failed to load native binding`) } -const { Capability, Operator, Entry, Metadata, Lister, BlockingLister, Layer, RetryLayer } = nativeBinding +const { Capability, Operator, Entry, Metadata, BlockingWriter, Writer, Lister, BlockingLister, Layer, RetryLayer } = nativeBinding module.exports.Capability = Capability module.exports.Operator = Operator module.exports.Entry = Entry module.exports.Metadata = Metadata +module.exports.BlockingWriter = BlockingWriter +module.exports.Writer = Writer module.exports.Lister = Lister module.exports.BlockingLister = BlockingLister module.exports.Layer = Layer diff --git a/bindings/nodejs/index.d.ts b/bindings/nodejs/index.d.ts index 0ef6a3e94..560cd0eda 100644 --- a/bindings/nodejs/index.d.ts +++ b/bindings/nodejs/index.d.ts @@ -259,6 +259,8 @@ export class Operator { * ``` */ write(path: string, content: Buffer | string): Promise<void> + writer(path: string): Promise<Writer> + writerSync(path: string): BlockingWriter /** * Write bytes into path synchronously. * @@ -541,6 +543,76 @@ export class Metadata { */ get lastModified(): string | null } +export class BlockingWriter { + /** + * # Safety + * + * > &mut self in async napi methods should be marked as unsafe + * + * Write bytes into this writer. + * + * ### Example + * ```javascript + * const writer = await op.writer("path/to/file"); + * await writer.write(Buffer.from("hello world")); + * await writer.close(); + * ``` + */ + write(content: Buffer | string): void + /** + * # Safety + * + * > &mut self in async napi methods should be marked as unsafe + * + * Close this writer. + * + * ### Example + * + * ```javascript + * const writer = op.writerSync("path/to/file"); + * writer.write(Buffer.from("hello world")); + * writer.close(); + * ``` + */ + close(): void +} +/** + * Writer + * + * Could be used to write bytes into a file. + */ +export class Writer { + /** + * # Safety + * + * > &mut self in async napi methods should be marked as unsafe + * + * Write bytes into this writer. + * + * ### Example + * ```javascript + * const writer = await op.writer("path/to/file"); + * await writer.write(Buffer.from("hello world")); + * await writer.close(); + * ``` + */ + write(content: Buffer | string): Promise<void> + /** + * # Safety + * + * > &mut self in async napi methods should be marked as unsafe + * + * Close this writer. + * + * ### Example + * ```javascript + * const writer = await op.writer("path/to/file"); + * await writer.write(Buffer.from("hello world")); + * await writer.close(); + * ``` + */ + close(): Promise<void> +} export class Lister { /** * # Safety diff --git a/bindings/nodejs/index.js b/bindings/nodejs/index.js index c1af66747..c60016a08 100644 --- a/bindings/nodejs/index.js +++ b/bindings/nodejs/index.js @@ -19,9 +19,33 @@ /// <reference types="node" /> +const { Writable } = require('node:stream') + +class WriterStream extends Writable { + constructor(operator, path, options) { + super(options) + this.writer = operator.writerSync(path) + } + + _write(chunk, encoding, callback) { + this.writer.write(chunk) + if (typeof callback === 'function') { + callback() + } + } + + _final(callback) { + this.writer.close() + if (typeof callback === 'function') { + callback() + } + } +} + const { Operator, RetryLayer } = require('./generated.js') module.exports.Operator = Operator module.exports.layers = { RetryLayer, } +module.exports.WriterStream = WriterStream diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs index e243ce8b1..da8bbb574 100644 --- a/bindings/nodejs/src/lib.rs +++ b/bindings/nodejs/src/lib.rs @@ -205,6 +205,18 @@ impl Operator { self.0.write(&path, c).await.map_err(format_napi_error) } + #[napi] + pub async fn writer(&self, path: String) -> Result<Writer> { + let w = self.0.writer(&path).await.map_err(format_napi_error)?; + Ok(Writer(w)) + } + + #[napi] + pub fn writer_sync(&self, path: String) -> Result<BlockingWriter>{ + let w = self.0.blocking().writer(&path).map_err(format_napi_error)?; + Ok(BlockingWriter(w)) + } + /// Write bytes into path synchronously. /// /// ### Example @@ -616,6 +628,100 @@ impl Metadata { } } +#[napi] +pub struct BlockingWriter(opendal::BlockingWriter); + +#[napi] +impl BlockingWriter{ + /// # Safety + /// + /// > &mut self in async napi methods should be marked as unsafe + /// + /// Write bytes into this writer. + /// + /// ### Example + /// ```javascript + /// const writer = await op.writer("path/to/file"); + /// await writer.write(Buffer.from("hello world")); + /// await writer.close(); + /// ``` + #[napi] + pub unsafe fn write(&mut self, content: Either<Buffer, String>) -> Result<()> { + let c = match content { + Either::A(buf) => buf.as_ref().to_owned(), + Either::B(s) => s.into_bytes(), + }; + self.0.write(c).map_err(format_napi_error) + } + + /// # Safety + /// + /// > &mut self in async napi methods should be marked as unsafe + /// + /// Close this writer. + /// + /// ### Example + /// + /// ```javascript + /// const writer = op.writerSync("path/to/file"); + /// writer.write(Buffer.from("hello world")); + /// writer.close(); + /// ``` + #[napi] + pub unsafe fn close(&mut self) -> Result<()> { + self.0.close().map_err(format_napi_error) + } +} + +/// Writer +/// +/// Could be used to write bytes into a file. +#[napi] +pub struct Writer(opendal::Writer); + +#[napi] +impl Writer { + /// # Safety + /// + /// > &mut self in async napi methods should be marked as unsafe + /// + /// Write bytes into this writer. + /// + /// ### Example + /// ```javascript + /// const writer = await op.writer("path/to/file"); + /// await writer.write(Buffer.from("hello world")); + /// await writer.close(); + /// ``` + #[napi] + pub async unsafe fn write(&mut self, content: Either<Buffer, String>) -> Result<()> { + let c = match content { + Either::A(buf) => buf.as_ref().to_owned(), + Either::B(s) => s.into_bytes(), + }; + self.0.write(c).await.map_err(format_napi_error) + } + + + + /// # Safety + /// + /// > &mut self in async napi methods should be marked as unsafe + /// + /// Close this writer. + /// + /// ### Example + /// ```javascript + /// const writer = await op.writer("path/to/file"); + /// await writer.write(Buffer.from("hello world")); + /// await writer.close(); + /// ``` + #[napi] + pub async unsafe fn close(&mut self) -> Result<()> { + self.0.close().await.map_err(format_napi_error) + } +} + #[napi] pub struct Lister(opendal::Lister); diff --git a/bindings/nodejs/tests/fixtures/random.txt b/bindings/nodejs/tests/fixtures/random.txt new file mode 100644 index 000000000..9896b0e28 --- /dev/null +++ b/bindings/nodejs/tests/fixtures/random.txt @@ -0,0 +1 @@ +opendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalopendalop [...] diff --git a/bindings/nodejs/tests/suites/async.suite.mjs b/bindings/nodejs/tests/suites/async.suite.mjs index 4e6545661..f834a7008 100644 --- a/bindings/nodejs/tests/suites/async.suite.mjs +++ b/bindings/nodejs/tests/suites/async.suite.mjs @@ -19,6 +19,7 @@ import { randomUUID } from 'node:crypto' import { test } from 'vitest' +import { generateBytes } from '../utils.mjs' export function run(operator) { test('async stat not exist files', async () => { @@ -30,4 +31,15 @@ export function run(operator) { assert.include(error.message, 'NotFound') } }) + + test('async writer', async () => { + const filename = `random_file_${randomUUID()}` + const writer = await operator.writer(filename) + const data = generateBytes(1024) + await writer.write(data) + await writer.write(data) + await writer.close() + const stat = await operator.stat(filename) + assert.equal(stat.contentLength, data.length * 2) + }) } diff --git a/bindings/nodejs/tests/suites/sync.suite.mjs b/bindings/nodejs/tests/suites/sync.suite.mjs index d27a3895a..732f8e5b2 100644 --- a/bindings/nodejs/tests/suites/sync.suite.mjs +++ b/bindings/nodejs/tests/suites/sync.suite.mjs @@ -19,6 +19,9 @@ import { randomUUID } from 'node:crypto' import { test } from 'vitest' +import fs from 'node:fs' +import path from 'node:path' +import { WriterStream } from '../../index.js' export function run(operator) { test('sync stat not exist files', () => { @@ -30,4 +33,15 @@ export function run(operator) { assert.include(error.message, 'NotFound') } }) + + test('blocking writer stream', async () => { + const filename = `random_file_${randomUUID()}` + const r = fs.createReadStream(path.resolve(__dirname, '../fixtures/random.txt')) + const w = new WriterStream(operator, filename) + r.pipe(w) + w.on('finish', () => { + const t = operator.statSync(filename) + assert.equal(t.contentLength, fs.statSync(path.resolve(__dirname, '../fixtures/random.txt')).size) + }) + }) }
