This is an automated email from the ASF dual-hosted git repository. suyanhanx pushed a commit to branch nodejs-stream in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit c0a01e154a8a100af127ef8b6050cfab6c2182a5 Author: suyanhanx <[email protected]> AuthorDate: Mon Nov 27 19:47:47 2023 +0800 polish Signed-off-by: suyanhanx <[email protected]> --- bindings/nodejs/{index.d.ts => generated.d.ts} | 59 ++- bindings/nodejs/generated.js | 3 +- bindings/nodejs/index.d.ts | 700 +------------------------ bindings/nodejs/index.js | 94 +++- bindings/nodejs/package.json | 7 +- bindings/nodejs/scripts/header.js | 2 +- bindings/nodejs/src/lib.rs | 53 +- bindings/nodejs/tests/suites/async.suite.mjs | 49 +- bindings/nodejs/tests/suites/sync.suite.mjs | 46 +- bindings/nodejs/tests/utils.mjs | 1 - bindings/nodejs/yarn.lock | 16 +- 11 files changed, 273 insertions(+), 757 deletions(-) diff --git a/bindings/nodejs/index.d.ts b/bindings/nodejs/generated.d.ts similarity index 92% copy from bindings/nodejs/index.d.ts copy to bindings/nodejs/generated.d.ts index 28e480328..c5e1d1ba8 100644 --- a/bindings/nodejs/index.d.ts +++ b/bindings/nodejs/generated.d.ts @@ -28,6 +28,7 @@ export class ExternalObject<T> { [K: symbol]: T } } +/** PresignedRequest is a presigned request return by `presign`. */ export interface PresignedRequest { /** HTTP method of this request. */ method: string @@ -239,6 +240,12 @@ export class Operator { * ``` */ read(path: string): Promise<Buffer> + /** + * Create a reader to read the given path. + * + * It could be used to read large file in a streaming way. + */ + reader(path: string): Promise<Reader> /** * Read the whole path into a buffer synchronously. * @@ -248,6 +255,11 @@ export class Operator { * ``` */ readSync(path: string): Buffer + /** + * Create a reader to read the given path synchronously. + * + * It could be used to read large file in a streaming way. + */ readerSync(path: string): BlockingReader /** * Write bytes into path. @@ -260,7 +272,17 @@ export class Operator { * ``` */ write(path: string, content: Buffer | string): Promise<void> + /** + * Write multiple bytes into path. + * + * It could be used to write large file in a streaming way. + */ writer(path: string): Promise<Writer> + /** + * Write multiple bytes into path synchronously. + * + * It could be used to write large file in a streaming way. + */ writerSync(path: string): BlockingWriter /** * Write bytes into path synchronously. @@ -518,10 +540,12 @@ export class Operator { /** Add a layer to this operator. */ layer(layer: ExternalObject<Layer>): this } +/** Entry returned by Lister or BlockingLister to represent a path and it's relative metadata. */ export class Entry { /** Return the path of this entry. */ path(): string } +/** Metadata carries all metadata associated with a path. */ export class Metadata { /** Returns true if the <op.stat> object describes a file system directory. */ isDirectory(): boolean @@ -544,9 +568,31 @@ export class Metadata { */ get lastModified(): string | null } +/** + * BlockingReader is designed to read data from given path in an blocking + * manner. + */ export class BlockingReader { read(buf: Buffer): bigint } +/** + * Reader is designed to read data from given path in an asynchronous + * manner. + */ +export class Reader { + /** + * # Safety + * + * > &mut self in async napi methods should be marked as unsafe + * + * Read bytes from this reader into given buffer. + */ + read(buf: Buffer): Promise<bigint> +} +/** + * BlockingWriter is designed to write data into given path in an blocking + * manner. + */ export class BlockingWriter { /** * # Safety @@ -581,9 +627,8 @@ export class BlockingWriter { close(): void } /** - * Writer - * - * Could be used to write bytes into a file. + * Writer is designed to write data into given path in an asynchronous + * manner. */ export class Writer { /** @@ -617,6 +662,10 @@ export class Writer { */ close(): Promise<void> } +/** + * Lister is designed to list entries at given path in an asynchronous + * manner. + */ export class Lister { /** * # Safety @@ -628,6 +677,10 @@ export class Lister { */ next(): Promise<Entry | null> } +/** + * BlockingLister is designed to list entries at given path in a blocking + * manner. + */ export class BlockingLister { next(): Entry | null } diff --git a/bindings/nodejs/generated.js b/bindings/nodejs/generated.js index 4af2ebbb3..385177b58 100644 --- a/bindings/nodejs/generated.js +++ b/bindings/nodejs/generated.js @@ -271,13 +271,14 @@ if (!nativeBinding) { throw new Error(`Failed to load native binding`) } -const { Capability, Operator, Entry, Metadata, BlockingReader, BlockingWriter, Writer, Lister, BlockingLister, Layer, RetryLayer } = nativeBinding +const { Capability, Operator, Entry, Metadata, BlockingReader, Reader, BlockingWriter, Writer, Lister, BlockingLister, Layer, RetryLayer } = nativeBinding module.exports.Capability = Capability module.exports.Operator = Operator module.exports.Entry = Entry module.exports.Metadata = Metadata module.exports.BlockingReader = BlockingReader +module.exports.Reader = Reader module.exports.BlockingWriter = BlockingWriter module.exports.Writer = Writer module.exports.Lister = Lister diff --git a/bindings/nodejs/index.d.ts b/bindings/nodejs/index.d.ts index 28e480328..fb18dab95 100644 --- a/bindings/nodejs/index.d.ts +++ b/bindings/nodejs/index.d.ts @@ -17,687 +17,27 @@ * under the License. */ -/* tslint:disable */ -/* eslint-disable */ +import { Readable, ReadableOptions, Writable, WritableOptions } from "node:stream" -/* auto-generated by NAPI-RS */ +declare module './generated' { + interface BlockingReader { + /** Create a readable stream from the underlying reader. */ + createReadStream(options?: ReadableOptions): Readable + } + interface Reader { + /** Create a readable stream from the underlying reader. */ + createReadStream(options?: ReadableOptions): Readable + } -export class ExternalObject<T> { - readonly '': { - readonly '': unique symbol - [K: symbol]: T + interface BlockingWriter { + /** Create a writable stream from the underlying writer. */ + createWriteStream(options?: WritableOptions): Writable + } + + interface Writer { + /** Create a writable stream from the underlying writer. */ + createWriteStream(options?: WritableOptions): Writable } } -export interface PresignedRequest { - /** HTTP method of this request. */ - method: string - /** URL of this request. */ - url: string - /** HTTP headers of this request. */ - headers: Record<string, string> -} -/** - * Capability is used to describe what operations are supported - * by current Operator. - * - * Via capability, we can know: - * - * - Whether current Operator supports read or not. - * - Whether current Operator supports read with if match or not. - * - What's current Operator max supports batch operations count. - * - * Add fields of Capabilities with be public and can be accessed directly. - */ -export class Capability { - /** If operator supports stat. */ - get stat(): boolean - /** If operator supports stat with if match. */ - get statWithIfMatch(): boolean - /** If operator supports stat with if none match. */ - get statWithIfNoneMatch(): boolean - /** If operator supports read. */ - get read(): boolean - /** If operator supports seek on returning reader. */ - get readCanSeek(): boolean - /** If operator supports next on returning reader. */ - get readCanNext(): boolean - /** If operator supports read with range. */ - get readWithRange(): boolean - /** If operator supports read with if match. */ - get readWithIfMatch(): boolean - /** If operator supports read with if none match. */ - get readWithIfNoneMatch(): boolean - /** if operator supports read with override cache control. */ - get readWithOverrideCacheControl(): boolean - /** if operator supports read with override content disposition. */ - get readWithOverrideContentDisposition(): boolean - /** if operator supports read with override content type. */ - get readWithOverrideContentType(): boolean - /** If operator supports write. */ - get write(): boolean - /** If operator supports write can be called in multi times. */ - get writeCanMulti(): boolean - /** If operator supports write with empty content. */ - get writeCanEmpty(): boolean - /** If operator supports write by append. */ - get writeCanAppend(): boolean - /** If operator supports write with content type. */ - get writeWithContentType(): boolean - /** If operator supports write with content disposition. */ - get writeWithContentDisposition(): boolean - /** If operator supports write with cache control. */ - get writeWithCacheControl(): boolean - /** - * write_multi_max_size is the max size that services support in write_multi. - * - * For example, AWS S3 supports 5GiB as max in write_multi. - */ - get writeMultiMaxSize(): bigint | null - /** - * write_multi_min_size is the min size that services support in write_multi. - * - * For example, AWS S3 requires at least 5MiB in write_multi expect the last one. - */ - get writeMultiMinSize(): bigint | null - /** - * write_multi_align_size is the align size that services required in write_multi. - * - * For example, Google GCS requires align size to 256KiB in write_multi. - */ - get writeMultiAlignSize(): bigint | null - /** - * write_total_max_size is the max size that services support in write_total. - * - * For example, Cloudflare D1 supports 1MB as max in write_total. - */ - get writeTotalMaxSize(): bigint | null - /** If operator supports create dir. */ - get createDir(): boolean - /** If operator supports delete. */ - get delete(): boolean - /** If operator supports copy. */ - get copy(): boolean - /** If operator supports rename. */ - get rename(): boolean - /** If operator supports list. */ - get list(): boolean - /** If backend supports list with limit. */ - get listWithLimit(): boolean - /** If backend supports list with start after. */ - get listWithStartAfter(): boolean - /** If backend supports list with recursive. */ - get listWithRecursive(): boolean - /** If backend supports list without recursive. */ - get listWithoutRecursive(): boolean - /** If operator supports presign. */ - get presign(): boolean - /** If operator supports presign read. */ - get presignRead(): boolean - /** If operator supports presign stat. */ - get presignStat(): boolean - /** If operator supports presign write. */ - get presignWrite(): boolean - /** If operator supports batch. */ - get batch(): boolean - /** If operator supports batch delete. */ - get batchDelete(): boolean - /** The max operations that operator supports in batch. */ - get batchMaxOperations(): bigint | null - /** If operator supports blocking. */ - get blocking(): boolean -} -export class Operator { - /** @see For a detailed definition of scheme, see https://opendal.apache.org/docs/category/services */ - constructor(scheme: string, options?: Record<string, string> | undefined | null) - /** Get current operator(service)'s full capability. */ - capability(): Capability - /** - * Get current path's metadata **without cache** directly. - * - * ### Notes - * Use stat if you: - * - * - Want detect the outside changes of path. - * - Don’t want to read from cached metadata. - * - * You may want to use `metadata` if you are working with entries returned by `Lister`. It’s highly possible that metadata you want has already been cached. - * - * ### Example - * ```javascript - * const meta = await op.stat("test"); - * if (meta.isDir) { - * // do something - * } - * ``` - */ - stat(path: string): Promise<Metadata> - /** - * Get current path's metadata **without cache** directly and synchronously. - * - * ### Example - * ```javascript - * const meta = op.statSync("test"); - * if (meta.isDir) { - * // do something - * } - * ``` - */ - statSync(path: string): Metadata - /** - * Check if this operator can work correctly. - * - * We will send a `list` request to path and return any errors we met. - * - * ### Example - * ```javascript - * await op.check(); - * ``` - */ - check(): Promise<void> - /** - * Check if this path exists or not. - * - * ### Example - * ```javascript - * await op.isExist("test"); - * ``` - */ - isExist(path: string): Promise<boolean> - /** - * Check if this path exists or not synchronously. - * - * ### Example - * ```javascript - * op.isExistSync("test"); - * ``` - */ - isExistSync(path: string): boolean - /** - * Create dir with given path. - * - * ### Example - * ```javascript - * await op.createDir("path/to/dir/"); - * ``` - */ - createDir(path: string): Promise<void> - /** - * Create dir with given path synchronously. - * - * ### Example - * ```javascript - * op.createDirSync("path/to/dir/"); - * ``` - */ - createDirSync(path: string): void - /** - * Read the whole path into a buffer. - * - * ### Example - * ```javascript - * const buf = await op.read("path/to/file"); - * ``` - */ - read(path: string): Promise<Buffer> - /** - * Read the whole path into a buffer synchronously. - * - * ### Example - * ```javascript - * const buf = op.readSync("path/to/file"); - * ``` - */ - readSync(path: string): Buffer - readerSync(path: string): BlockingReader - /** - * Write bytes into path. - * - * ### Example - * ```javascript - * await op.write("path/to/file", Buffer.from("hello world")); - * // or - * await op.write("path/to/file", "hello world"); - * ``` - */ - write(path: string, content: Buffer | string): Promise<void> - writer(path: string): Promise<Writer> - writerSync(path: string): BlockingWriter - /** - * Write bytes into path synchronously. - * - * ### Example - * ```javascript - * op.writeSync("path/to/file", Buffer.from("hello world")); - * // or - * op.writeSync("path/to/file", "hello world"); - * ``` - */ - writeSync(path: string, content: Buffer | string): void - /** - * Append bytes into path. - * - * ### Notes - * - * - It always appends content to the end of the file. - * - It will create file if the path not exists. - * - * ### Example - * ```javascript - * await op.append("path/to/file", Buffer.from("hello world")); - * // or - * await op.append("path/to/file", "hello world"); - * ``` - */ - append(path: string, content: Buffer | string): Promise<void> - /** - * Copy file according to given `from` and `to` path. - * - * ### Example - * ```javascript - * await op.copy("path/to/file", "path/to/dest"); - * ``` - */ - copy(from: string, to: string): Promise<void> - /** - * Copy file according to given `from` and `to` path synchronously. - * - * ### Example - * ```javascript - * op.copySync("path/to/file", "path/to/dest"); - * ``` - */ - copySync(from: string, to: string): void - /** - * Rename file according to given `from` and `to` path. - * - * It's similar to `mv` command. - * - * ### Example - * ```javascript - * await op.rename("path/to/file", "path/to/dest"); - * ``` - */ - rename(from: string, to: string): Promise<void> - /** - * Rename file according to given `from` and `to` path synchronously. - * - * It's similar to `mv` command. - * - * ### Example - * ```javascript - * op.renameSync("path/to/file", "path/to/dest"); - * ``` - */ - renameSync(from: string, to: string): void - /** - * List dir in flat way. - * - * This function will create a new handle to list entries. - * - * An error will be returned if given path doesn't end with /. - * - * ### Example - * - * ```javascript - * const lister = await op.scan("/path/to/dir/"); - * while (true) { - * const entry = await lister.next(); - * if (entry === null) { - * break; - * } - * let meta = await op.stat(entry.path); - * if (meta.is_file) { - * // do something - * } - * } - * ````` - */ - scan(path: string): Promise<Lister> - /** - * List dir in flat way synchronously. - * - * This function will create a new handle to list entries. - * - * An error will be returned if given path doesn't end with /. - * - * ### Example - * ```javascript - * const lister = op.scan_sync(/path/to/dir/"); - * while (true) { - * const entry = lister.next(); - * if (entry === null) { - * break; - * } - * let meta = op.statSync(entry.path); - * if (meta.is_file) { - * // do something - * } - * } - * ````` - */ - scanSync(path: string): BlockingLister - /** - * Delete the given path. - * - * ### Notes - * Delete not existing error won’t return errors. - * - * ### Example - * ```javascript - * await op.delete("test"); - * ``` - */ - delete(path: string): Promise<void> - /** - * Delete the given path synchronously. - * - * ### Example - * ```javascript - * op.deleteSync("test"); - * ``` - */ - deleteSync(path: string): void - /** - * Remove given paths. - * - * ### Notes - * If underlying services support delete in batch, we will use batch delete instead. - * - * ### Examples - * ```javascript - * await op.remove(["abc", "def"]); - * ``` - */ - remove(paths: Array<string>): Promise<void> - /** - * Remove the path and all nested dirs and files recursively. - * - * ### Notes - * If underlying services support delete in batch, we will use batch delete instead. - * - * ### Examples - * ```javascript - * await op.removeAll("path/to/dir/"); - * ``` - */ - removeAll(path: string): Promise<void> - /** - * List given path. - * - * This function will create a new handle to list entries. - * - * An error will be returned if given path doesn't end with `/`. - * - * ### Example - * ```javascript - * const lister = await op.list("path/to/dir/"); - * while (true) { - * const entry = await lister.next(); - * if (entry === null) { - * break; - * } - * let meta = await op.stat(entry.path); - * if (meta.isFile) { - * // do something - * } - * } - * ``` - */ - list(path: string): Promise<Lister> - /** - * List given path synchronously. - * - * This function will create a new handle to list entries. - * - * An error will be returned if given path doesn't end with `/`. - * - * ### Example - * ```javascript - * const lister = op.listSync("path/to/dir/"); - * while (true) { - * const entry = lister.next(); - * if (entry === null) { - * break; - * } - * let meta = op.statSync(entry.path); - * if (meta.isFile) { - * // do something - * } - * } - * ``` - */ - listSync(path: string): BlockingLister - /** - * Get a presigned request for read. - * - * Unit of expires is seconds. - * - * ### Example - * - * ```javascript - * const req = await op.presignRead(path, parseInt(expires)); - * - * console.log("method: ", req.method); - * console.log("url: ", req.url); - * console.log("headers: ", req.headers); - * ``` - */ - presignRead(path: string, expires: number): Promise<PresignedRequest> - /** - * Get a presigned request for write. - * - * Unit of expires is seconds. - * - * ### Example - * - * ```javascript - * const req = await op.presignWrite(path, parseInt(expires)); - * - * console.log("method: ", req.method); - * console.log("url: ", req.url); - * console.log("headers: ", req.headers); - * ``` - */ - presignWrite(path: string, expires: number): Promise<PresignedRequest> - /** - * Get a presigned request for stat. - * - * Unit of expires is seconds. - * - * ### Example - * - * ```javascript - * const req = await op.presignStat(path, parseInt(expires)); - * - * console.log("method: ", req.method); - * console.log("url: ", req.url); - * console.log("headers: ", req.headers); - * ``` - */ - presignStat(path: string, expires: number): Promise<PresignedRequest> - /** Add a layer to this operator. */ - layer(layer: ExternalObject<Layer>): this -} -export class Entry { - /** Return the path of this entry. */ - path(): string -} -export class Metadata { - /** Returns true if the <op.stat> object describes a file system directory. */ - isDirectory(): boolean - /** Returns true if the <op.stat> object describes a regular file. */ - isFile(): boolean - /** Content-Disposition of this object */ - get contentDisposition(): string | null - /** Content Length of this object */ - get contentLength(): bigint | null - /** Content MD5 of this object. */ - get contentMd5(): string | null - /** Content Type of this object. */ - get contentType(): string | null - /** ETag of this object. */ - get etag(): string | null - /** - * Last Modified of this object. - * - * We will output this time in RFC3339 format like `1996-12-19T16:39:57+08:00`. - */ - get lastModified(): string | null -} -export class BlockingReader { - read(buf: Buffer): bigint -} -export class BlockingWriter { - /** - * # Safety - * - * > &mut self in async napi methods should be marked as unsafe - * - * Write bytes into this writer. - * - * ### Example - * ```javascript - * const writer = await op.writer("path/to/file"); - * await writer.write(Buffer.from("hello world")); - * await writer.close(); - * ``` - */ - write(content: Buffer | string): void - /** - * # Safety - * - * > &mut self in async napi methods should be marked as unsafe - * - * Close this writer. - * - * ### Example - * - * ```javascript - * const writer = op.writerSync("path/to/file"); - * writer.write(Buffer.from("hello world")); - * writer.close(); - * ``` - */ - close(): void -} -/** - * Writer - * - * Could be used to write bytes into a file. - */ -export class Writer { - /** - * # Safety - * - * > &mut self in async napi methods should be marked as unsafe - * - * Write bytes into this writer. - * - * ### Example - * ```javascript - * const writer = await op.writer("path/to/file"); - * await writer.write(Buffer.from("hello world")); - * await writer.close(); - * ``` - */ - write(content: Buffer | string): Promise<void> - /** - * # Safety - * - * > &mut self in async napi methods should be marked as unsafe - * - * Close this writer. - * - * ### Example - * ```javascript - * const writer = await op.writer("path/to/file"); - * await writer.write(Buffer.from("hello world")); - * await writer.close(); - * ``` - */ - close(): Promise<void> -} -export class Lister { - /** - * # Safety - * - * > &mut self in async napi methods should be marked as unsafe - * - * napi will make sure the function is safe, and we didn't do unsafe - * thing internally. - */ - next(): Promise<Entry | null> -} -export class BlockingLister { - next(): Entry | null -} -/** A public layer wrapper */ -export class Layer { } -/** - * Retry layer - * - * Add retry for temporary failed operations. - * - * # Notes - * - * This layer will retry failed operations when [`Error::is_temporary`] - * returns true. If operation still failed, this layer will set error to - * `Persistent` which means error has been retried. - * - * `write` and `blocking_write` don't support retry so far, visit [this issue](https://github.com/apache/incubator-opendal/issues/1223) for more details. - * - * # Examples - * - * ```javascript - * const op = new Operator("file", { root: "/tmp" }) - * - * const retry = new RetryLayer(); - * retry.max_times = 3; - * retry.jitter = true; - * - * op.layer(retry.build()); - * ``` - */ -export class RetryLayer { - constructor() - /** - * Set jitter of current backoff. - * - * If jitter is enabled, ExponentialBackoff will add a random jitter in `[0, min_delay) - * to current delay. - */ - set jitter(v: boolean) - /** - * Set max_times of current backoff. - * - * Backoff will return `None` if max times is reaching. - */ - set maxTimes(v: number) - /** - * Set factor of current backoff. - * - * # Panics - * - * This function will panic if input factor smaller than `1.0`. - */ - set factor(v: number) - /** - * Set max_delay of current backoff. - * - * Delay will not increasing if current delay is larger than max_delay. - * - * # Notes - * - * - The unit of max_delay is millisecond. - */ - set maxDelay(v: number) - /** - * Set min_delay of current backoff. - * - * # Notes - * - * - The unit of min_delay is millisecond. - */ - set minDelay(v: number) - build(): ExternalObject<Layer> -} + +export * from './generated' diff --git a/bindings/nodejs/index.js b/bindings/nodejs/index.js index 448ea597e..22ef628e0 100644 --- a/bindings/nodejs/index.js +++ b/bindings/nodejs/index.js @@ -22,20 +22,32 @@ const { Writable, Readable } = require('node:stream') class ReadStream extends Readable { - constructor(operator, path, options) { + constructor(reader, options) { super(options) - this.operator = operator - this.path = path - this.reader = null + this.reader = reader } - _construct(callback) { - try { - this.reader = this.operator.readerSync(this.path) - callback() - } catch (e) { - callback(e) - } + _read(size) { + const buf = Buffer.alloc(size) + this.reader + .read(buf) + .then((s) => { + if (s === 0n) { + this.push(null) + } else { + this.push(buf.subarray(0, Number(s))) + } + }) + .catch((e) => { + this.emit('error', e) + }) + } +} + +class BlockingReadStream extends Readable { + constructor(reader, options) { + super(options) + this.reader = reader } _read(size) { @@ -54,20 +66,38 @@ class ReadStream extends Readable { } class WriteStream extends Writable { - constructor(operator, path, options) { + constructor(writer, options) { super(options) - this.operator = operator - this.path = path - this.writer = null + this.writer = writer } - _construct(callback) { - try { - this.writer = this.operator.writerSync(this.path) - callback() - } catch (e) { - callback(e) - } + _write(chunk, encoding, callback) { + this.writer + .write(chunk) + .then(() => { + callback() + }) + .catch((e) => { + callback(e) + }) + } + + _final(callback) { + this.writer + .close() + .then(() => { + callback() + }) + .catch((e) => { + callback(e) + }) + } +} + +class BlockingWriteStream extends Writable { + constructor(writer, options) { + super(options) + this.writer = writer } _write(chunk, encoding, callback) { @@ -89,19 +119,25 @@ class WriteStream extends Writable { } } -const { Operator, RetryLayer } = require('./generated.js') +const { Operator, RetryLayer, BlockingReader, Reader, BlockingWriter, Writer } = require('./generated.js') + +BlockingReader.prototype.createReadStream = function (options) { + return new BlockingReadStream(this, options) +} + +Reader.prototype.createReadStream = function (options) { + return new ReadStream(this, options) +} -Operator.prototype.createWriteStream = function (path, options) { - return new WriteStream(this, path, options) +BlockingWriter.prototype.createWriteStream = function (options) { + return new BlockingWriteStream(this, options) } -Operator.prototype.createReadStream = function (path, options) { - return new ReadStream(this, path, options) +Writer.prototype.createWriteStream = function (options) { + return new WriteStream(this, options) } module.exports.Operator = Operator module.exports.layers = { RetryLayer, } -module.exports.WriteStream = WriteStream -module.exports.ReadStream = ReadStream diff --git a/bindings/nodejs/package.json b/bindings/nodejs/package.json index 366172015..8e7b8739a 100644 --- a/bindings/nodejs/package.json +++ b/bindings/nodejs/package.json @@ -41,6 +41,7 @@ "files": [ "index.d.ts", "index.js", + "generated.d.ts", "generated.js", "LICENSE", "NOTICE" @@ -55,7 +56,7 @@ "benny": "^3.7.1", "dotenv": "^16.0.3", "prettier": "^2.8.4", - "typedoc": "^0.24", + "typedoc": "^0.25", "typescript": "^5.0.2", "vitest": "^0.34.6" }, @@ -63,8 +64,8 @@ "node": ">= 10" }, "scripts": { - "build": "napi build --platform --features \"${NAPI_FEATURES:-}\" --target \"${NAPI_TARGET:-}\" --release --js generated.js && node ./scripts/header.js", - "build:debug": "napi build --platform --features \"${NAPI_FEATURES:-}\" --target \"${NAPI_TARGET:-}\" --js generated.js && node ./scripts/header.js", + "build": "napi build --platform --features \"${NAPI_FEATURES:-}\" --target \"${NAPI_TARGET:-}\" --release --js generated.js --dts generated.d.ts && node ./scripts/header.js", + "build:debug": "napi build --platform --features \"${NAPI_FEATURES:-}\" --target \"${NAPI_TARGET:-}\" --js generated.js --dts generated.d.ts && node ./scripts/header.js", "docs": "typedoc", "format": "prettier --write .", "test": "vitest", diff --git a/bindings/nodejs/scripts/header.js b/bindings/nodejs/scripts/header.js index fd10f52b6..6d15af5c4 100644 --- a/bindings/nodejs/scripts/header.js +++ b/bindings/nodejs/scripts/header.js @@ -19,7 +19,7 @@ const fs = require('fs') -let files = ['generated.js', 'index.d.ts'] +let files = ['generated.js', 'generated.d.ts'] for (path of files) { let data = fs.readFileSync(path, 'utf8') diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs index 0b9204668..bd3525bfc 100644 --- a/bindings/nodejs/src/lib.rs +++ b/bindings/nodejs/src/lib.rs @@ -25,6 +25,7 @@ use std::str::FromStr; use std::time::Duration; use opendal::raw::oio::BlockingRead; +use opendal::raw::oio::ReadExt; use futures::TryStreamExt; use napi::bindgen_prelude::*; @@ -178,6 +179,15 @@ impl Operator { Ok(res.into()) } + /// Create a reader to read the given path. + /// + /// It could be used to read large file in a streaming way. + #[napi] + pub async fn reader(&self, path: String) -> Result<Reader> { + let r = self.0.reader(&path).await.map_err(format_napi_error)?; + Ok(Reader(r)) + } + /// Read the whole path into a buffer synchronously. /// /// ### Example @@ -190,6 +200,9 @@ impl Operator { Ok(res.into()) } + /// Create a reader to read the given path synchronously. + /// + /// It could be used to read large file in a streaming way. #[napi] pub fn reader_sync(&self, path: String) -> Result<BlockingReader> { let r = self.0.blocking().reader(&path).map_err(format_napi_error)?; @@ -213,12 +226,18 @@ impl Operator { self.0.write(&path, c).await.map_err(format_napi_error) } + /// Write multiple bytes into path. + /// + /// It could be used to write large file in a streaming way. #[napi] pub async fn writer(&self, path: String) -> Result<Writer> { let w = self.0.writer(&path).await.map_err(format_napi_error)?; Ok(Writer(w)) } + /// Write multiple bytes into path synchronously. + /// + /// It could be used to write large file in a streaming way. #[napi] pub fn writer_sync(&self, path: String) -> Result<BlockingWriter> { let w = self.0.blocking().writer(&path).map_err(format_napi_error)?; @@ -568,6 +587,7 @@ impl Operator { } } +/// Entry returned by Lister or BlockingLister to represent a path and it's relative metadata. #[napi] pub struct Entry(opendal::Entry); @@ -580,6 +600,7 @@ impl Entry { } } +/// Metadata carries all metadata associated with a path. #[napi] pub struct Metadata(opendal::Metadata); @@ -636,6 +657,8 @@ impl Metadata { } } +/// BlockingReader is designed to read data from given path in an blocking +/// manner. #[napi] pub struct BlockingReader(opendal::BlockingReader); @@ -647,6 +670,26 @@ impl BlockingReader { } } +/// Reader is designed to read data from given path in an asynchronous +/// manner. +#[napi] +pub struct Reader(opendal::Reader); + +#[napi] +impl Reader { + /// # Safety + /// + /// > &mut self in async napi methods should be marked as unsafe + /// + /// Read bytes from this reader into given buffer. + #[napi] + pub async unsafe fn read(&mut self, mut buf: Buffer) -> Result<usize> { + self.0.read(buf.as_mut()).await.map_err(format_napi_error) + } +} + +/// BlockingWriter is designed to write data into given path in an blocking +/// manner. #[napi] pub struct BlockingWriter(opendal::BlockingWriter); @@ -692,9 +735,8 @@ impl BlockingWriter { } } -/// Writer -/// -/// Could be used to write bytes into a file. +/// Writer is designed to write data into given path in an asynchronous +/// manner. #[napi] pub struct Writer(opendal::Writer); @@ -739,6 +781,8 @@ impl Writer { } } +/// Lister is designed to list entries at given path in an asynchronous +/// manner. #[napi] pub struct Lister(opendal::Lister); @@ -761,6 +805,8 @@ impl Lister { } } +/// BlockingLister is designed to list entries at given path in a blocking +/// manner. #[napi] pub struct BlockingLister(opendal::BlockingLister); @@ -781,6 +827,7 @@ impl BlockingLister { } } +/// PresignedRequest is a presigned request return by `presign`. #[napi(object)] pub struct PresignedRequest { /// HTTP method of this request. diff --git a/bindings/nodejs/tests/suites/async.suite.mjs b/bindings/nodejs/tests/suites/async.suite.mjs index a80e9131e..b1196dda4 100644 --- a/bindings/nodejs/tests/suites/async.suite.mjs +++ b/bindings/nodejs/tests/suites/async.suite.mjs @@ -20,6 +20,8 @@ import { randomUUID } from 'node:crypto' import { test } from 'vitest' import { generateBytes, generateFixedBytes } from '../utils.mjs' +import { Readable, Writable } from 'node:stream' +import { finished, pipeline } from 'node:stream/promises' export function run(op) { describe('async tests', () => { @@ -33,20 +35,51 @@ export function run(op) { } }) - test.runIf(op.capability().write && op.capability().writeCanMulti)('async writer', async () => { + test.runIf(op.capability().write && op.capability().writeCanMulti)('reader/writer stream pipeline', async () => { const filename = `random_file_${randomUUID()}` - const writer = await op.writer(filename) + const buf = generateFixedBytes(5 * 1024 * 1024) + const rs = Readable.from(buf, { + highWaterMark: 5 * 1024 * 1024, // to buffer 5MB data to read + }) + const w = await op.writer(filename) + const ws = w.createWriteStream() + await pipeline(rs, ws) - const data = generateFixedBytes(5 * 1024 * 1024) + await finished(ws) - await writer.write(data) - await writer.write(data) - await writer.close() + const t = await op.stat(filename) + assert.equal(t.contentLength, buf.length) - const stat = await op.stat(filename) - assert.equal(stat.contentLength, data.length * 2) + const content = await op.read(filename) + assert.equal(Buffer.compare(content, buf), 0) // 0 means equal await op.delete(filename) }) + + test.runIf(op.capability().write)('read stream', async () => { + let c = generateFixedBytes(3 * 1024 * 1024) + const filename = `random_file_${randomUUID()}` + + await op.write(filename, c) + + const r = await op.reader(filename) + const rs = r.createReadStream() + let chunks = [] + await pipeline( + rs, + new Writable({ + write(chunk, encoding, callback) { + chunks.push(chunk) + callback() + }, + }), + ) + + await finished(rs) + const buf = Buffer.concat(chunks) + assert.equal(Buffer.compare(buf, c), 0) + + op.deleteSync(filename) + }) }) } diff --git a/bindings/nodejs/tests/suites/sync.suite.mjs b/bindings/nodejs/tests/suites/sync.suite.mjs index 298f199a7..38c186d57 100644 --- a/bindings/nodejs/tests/suites/sync.suite.mjs +++ b/bindings/nodejs/tests/suites/sync.suite.mjs @@ -35,39 +35,45 @@ export function run(op) { } }) - test.runIf(op.capability().write && op.capability().writeCanMulti)('blocking writer stream', async () => { - const filename = `random_file_${randomUUID()}` - const buf = generateFixedBytes(5 * 1024 * 1024) - const r = Readable.from(buf, { - highWaterMark: 5 * 1024 * 1024, // to buffer 5MB data to read - }) - const w = op.createWriteStream(filename) - r.pipe(w) + test.runIf(op.capability().write && op.capability().writeCanMulti)( + 'blocking reader/writer stream pipeline', + async () => { + const filename = `random_file_${randomUUID()}` + const buf = generateFixedBytes(5 * 1024 * 1024) + const rs = Readable.from(buf, { + highWaterMark: 5 * 1024 * 1024, // to buffer 5MB data to read + }) + const w = op.writerSync(filename) + const ws = w.createWriteStream() + rs.pipe(ws) - w.on('finish', () => { - const t = op.statSync(filename) - assert.equal(t.contentLength, buf.length) + ws.on('finish', () => { + const t = op.statSync(filename) + assert.equal(t.contentLength, buf.length) - const content = op.readSync(filename) - assert.equal(Buffer.compare(content, buf), 0) // 0 means equal + const content = op.readSync(filename) + assert.equal(Buffer.compare(content, buf), 0) // 0 means equal - op.deleteSync(filename) - }) - }) + op.deleteSync(filename) + }) + }, + ) - test.runIf(op.capability().write)('read stream', async () => { + test.runIf(op.capability().write)('blocking read stream', async () => { let c = generateFixedBytes(3 * 1024 * 1024) const filename = `random_file_${randomUUID()}` await op.write(filename, c) - const r = op.createReadStream(filename) + const r = op.readerSync(filename) + const rs = r.createReadStream() + let chunks = [] - r.on('data', (chunk) => { + rs.on('data', (chunk) => { chunks.push(chunk) }) - r.on('end', () => { + rs.on('end', () => { const buf = Buffer.concat(chunks) assert.equal(Buffer.compare(buf, c), 0) diff --git a/bindings/nodejs/tests/utils.mjs b/bindings/nodejs/tests/utils.mjs index 657221371..bf685612e 100644 --- a/bindings/nodejs/tests/utils.mjs +++ b/bindings/nodejs/tests/utils.mjs @@ -18,7 +18,6 @@ */ import crypto from 'node:crypto' -import { Readable } from 'node:stream' // Generate random bytes between 1 and 1024 KB export function generateBytes() { diff --git a/bindings/nodejs/yarn.lock b/bindings/nodejs/yarn.lock index 40a9b1cbf..e601368ff 100644 --- a/bindings/nodejs/yarn.lock +++ b/bindings/nodejs/yarn.lock @@ -2687,7 +2687,7 @@ __metadata: languageName: node linkType: hard -"minimatch@npm:^9.0.0, minimatch@npm:^9.0.1": +"minimatch@npm:^9.0.1, minimatch@npm:^9.0.3": version: 9.0.3 resolution: "minimatch@npm:9.0.3" dependencies: @@ -2906,7 +2906,7 @@ __metadata: benny: ^3.7.1 dotenv: ^16.0.3 prettier: ^2.8.4 - typedoc: ^0.24 + typedoc: ^0.25 typescript: ^5.0.2 vitest: ^0.34.6 languageName: unknown @@ -3396,19 +3396,19 @@ __metadata: languageName: node linkType: hard -"typedoc@npm:^0.24": - version: 0.24.8 - resolution: "typedoc@npm:0.24.8" +"typedoc@npm:^0.25": + version: 0.25.4 + resolution: "typedoc@npm:0.25.4" dependencies: lunr: ^2.3.9 marked: ^4.3.0 - minimatch: ^9.0.0 + minimatch: ^9.0.3 shiki: ^0.14.1 peerDependencies: - typescript: 4.6.x || 4.7.x || 4.8.x || 4.9.x || 5.0.x || 5.1.x + typescript: 4.6.x || 4.7.x || 4.8.x || 4.9.x || 5.0.x || 5.1.x || 5.2.x || 5.3.x bin: typedoc: bin/typedoc - checksum: a46a14497f789fb3594e6c3af2e45276934ac46df40b7ed15a504ee51dc7a8013a2ffb3a54fd73abca6a2b71f97d3ec9ad356fa9aa81d29743e4645a965a2ae0 + checksum: 6d441baa277c0db4d577db2932a7af316d175415841e2faf2e68e3eda6ad60356c54f56374f89c5233d7bd5c057b0337455e5d484d8463e1445e67c37a6d94eb languageName: node linkType: hard
