This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 049ce8dde feat(bindings/nodejs): read/write stream (#3619)
049ce8dde is described below
commit 049ce8dde3b1d4fb9214f7b465287eb2cb16b56f
Author: Suyan <[email protected]>
AuthorDate: Mon Nov 27 20:49:55 2023 +0800
feat(bindings/nodejs): read/write stream (#3619)
Signed-off-by: suyanhanx <[email protected]>
---
bindings/nodejs/.prettierignore | 2 +-
bindings/nodejs/{index.d.ts => generated.d.ts} | 129 +++++
bindings/nodejs/generated.js | 6 +-
bindings/nodejs/index.d.ts | 624 +------------------------
bindings/nodejs/index.js | 118 ++++-
bindings/nodejs/package.json | 7 +-
bindings/nodejs/scripts/header.js | 2 +-
bindings/nodejs/src/lib.rs | 170 +++++++
bindings/nodejs/tests/suites/async.suite.mjs | 68 ++-
bindings/nodejs/tests/suites/index.mjs | 6 +-
bindings/nodejs/tests/suites/sync.suite.mjs | 67 ++-
bindings/nodejs/tests/utils.mjs | 13 +-
bindings/nodejs/vitest.config.js | 1 +
bindings/nodejs/yarn.lock | 16 +-
core/src/services/swift/core.rs | 3 +-
core/tests/behavior/main.rs | 3 +-
16 files changed, 590 insertions(+), 645 deletions(-)
diff --git a/bindings/nodejs/.prettierignore b/bindings/nodejs/.prettierignore
index 3e7db534d..8e6a17e88 100644
--- a/bindings/nodejs/.prettierignore
+++ b/bindings/nodejs/.prettierignore
@@ -1,4 +1,4 @@
target
generated.js
-index.d.ts
+generated.d.ts
.yarn
diff --git a/bindings/nodejs/index.d.ts b/bindings/nodejs/generated.d.ts
similarity index 84%
copy from bindings/nodejs/index.d.ts
copy to bindings/nodejs/generated.d.ts
index 0ef6a3e94..c5e1d1ba8 100644
--- a/bindings/nodejs/index.d.ts
+++ b/bindings/nodejs/generated.d.ts
@@ -28,6 +28,7 @@ export class ExternalObject<T> {
[K: symbol]: T
}
}
+/** PresignedRequest is a presigned request return by `presign`. */
export interface PresignedRequest {
/** HTTP method of this request. */
method: string
@@ -239,6 +240,12 @@ export class Operator {
* ```
*/
read(path: string): Promise<Buffer>
+ /**
+ * Create a reader to read the given path.
+ *
+ * It could be used to read large file in a streaming way.
+ */
+ reader(path: string): Promise<Reader>
/**
* Read the whole path into a buffer synchronously.
*
@@ -248,6 +255,12 @@ export class Operator {
* ```
*/
readSync(path: string): Buffer
+ /**
+ * Create a reader to read the given path synchronously.
+ *
+ * It could be used to read large file in a streaming way.
+ */
+ readerSync(path: string): BlockingReader
/**
* Write bytes into path.
*
@@ -259,6 +272,18 @@ export class Operator {
* ```
*/
write(path: string, content: Buffer | string): Promise<void>
+ /**
+ * Write multiple bytes into path.
+ *
+ * It could be used to write large file in a streaming way.
+ */
+ writer(path: string): Promise<Writer>
+ /**
+ * Write multiple bytes into path synchronously.
+ *
+ * It could be used to write large file in a streaming way.
+ */
+ writerSync(path: string): BlockingWriter
/**
* Write bytes into path synchronously.
*
@@ -515,10 +540,12 @@ export class Operator {
/** Add a layer to this operator. */
layer(layer: ExternalObject<Layer>): this
}
+/** Entry returned by Lister or BlockingLister to represent a path and it's
relative metadata. */
export class Entry {
/** Return the path of this entry. */
path(): string
}
+/** Metadata carries all metadata associated with a path. */
export class Metadata {
/** Returns true if the <op.stat> object describes a file system directory.
*/
isDirectory(): boolean
@@ -541,6 +568,104 @@ export class Metadata {
*/
get lastModified(): string | null
}
+/**
+ * BlockingReader is designed to read data from given path in an blocking
+ * manner.
+ */
+export class BlockingReader {
+ read(buf: Buffer): bigint
+}
+/**
+ * Reader is designed to read data from given path in an asynchronous
+ * manner.
+ */
+export class Reader {
+ /**
+ * # Safety
+ *
+ * > &mut self in async napi methods should be marked as unsafe
+ *
+ * Read bytes from this reader into given buffer.
+ */
+ read(buf: Buffer): Promise<bigint>
+}
+/**
+ * BlockingWriter is designed to write data into given path in an blocking
+ * manner.
+ */
+export class BlockingWriter {
+ /**
+ * # Safety
+ *
+ * > &mut self in async napi methods should be marked as unsafe
+ *
+ * Write bytes into this writer.
+ *
+ * ### Example
+ * ```javascript
+ * const writer = await op.writer("path/to/file");
+ * await writer.write(Buffer.from("hello world"));
+ * await writer.close();
+ * ```
+ */
+ write(content: Buffer | string): void
+ /**
+ * # Safety
+ *
+ * > &mut self in async napi methods should be marked as unsafe
+ *
+ * Close this writer.
+ *
+ * ### Example
+ *
+ * ```javascript
+ * const writer = op.writerSync("path/to/file");
+ * writer.write(Buffer.from("hello world"));
+ * writer.close();
+ * ```
+ */
+ close(): void
+}
+/**
+ * Writer is designed to write data into given path in an asynchronous
+ * manner.
+ */
+export class Writer {
+ /**
+ * # Safety
+ *
+ * > &mut self in async napi methods should be marked as unsafe
+ *
+ * Write bytes into this writer.
+ *
+ * ### Example
+ * ```javascript
+ * const writer = await op.writer("path/to/file");
+ * await writer.write(Buffer.from("hello world"));
+ * await writer.close();
+ * ```
+ */
+ write(content: Buffer | string): Promise<void>
+ /**
+ * # Safety
+ *
+ * > &mut self in async napi methods should be marked as unsafe
+ *
+ * Close this writer.
+ *
+ * ### Example
+ * ```javascript
+ * const writer = await op.writer("path/to/file");
+ * await writer.write(Buffer.from("hello world"));
+ * await writer.close();
+ * ```
+ */
+ close(): Promise<void>
+}
+/**
+ * Lister is designed to list entries at given path in an asynchronous
+ * manner.
+ */
export class Lister {
/**
* # Safety
@@ -552,6 +677,10 @@ export class Lister {
*/
next(): Promise<Entry | null>
}
+/**
+ * BlockingLister is designed to list entries at given path in a blocking
+ * manner.
+ */
export class BlockingLister {
next(): Entry | null
}
diff --git a/bindings/nodejs/generated.js b/bindings/nodejs/generated.js
index b21cd679e..385177b58 100644
--- a/bindings/nodejs/generated.js
+++ b/bindings/nodejs/generated.js
@@ -271,12 +271,16 @@ if (!nativeBinding) {
throw new Error(`Failed to load native binding`)
}
-const { Capability, Operator, Entry, Metadata, Lister, BlockingLister, Layer,
RetryLayer } = nativeBinding
+const { Capability, Operator, Entry, Metadata, BlockingReader, Reader,
BlockingWriter, Writer, Lister, BlockingLister, Layer, RetryLayer } =
nativeBinding
module.exports.Capability = Capability
module.exports.Operator = Operator
module.exports.Entry = Entry
module.exports.Metadata = Metadata
+module.exports.BlockingReader = BlockingReader
+module.exports.Reader = Reader
+module.exports.BlockingWriter = BlockingWriter
+module.exports.Writer = Writer
module.exports.Lister = Lister
module.exports.BlockingLister = BlockingLister
module.exports.Layer = Layer
diff --git a/bindings/nodejs/index.d.ts b/bindings/nodejs/index.d.ts
index 0ef6a3e94..8c22b10e1 100644
--- a/bindings/nodejs/index.d.ts
+++ b/bindings/nodejs/index.d.ts
@@ -17,611 +17,27 @@
* under the License.
*/
-/* tslint:disable */
-/* eslint-disable */
+import { Readable, ReadableOptions, Writable, WritableOptions } from
'node:stream'
-/* auto-generated by NAPI-RS */
+declare module './generated' {
+ interface BlockingReader {
+ /** Create a readable stream from the underlying reader. */
+ createReadStream(options?: ReadableOptions): Readable
+ }
+ interface Reader {
+ /** Create a readable stream from the underlying reader. */
+ createReadStream(options?: ReadableOptions): Readable
+ }
-export class ExternalObject<T> {
- readonly '': {
- readonly '': unique symbol
- [K: symbol]: T
+ interface BlockingWriter {
+ /** Create a writable stream from the underlying writer. */
+ createWriteStream(options?: WritableOptions): Writable
+ }
+
+ interface Writer {
+ /** Create a writable stream from the underlying writer. */
+ createWriteStream(options?: WritableOptions): Writable
}
}
-export interface PresignedRequest {
- /** HTTP method of this request. */
- method: string
- /** URL of this request. */
- url: string
- /** HTTP headers of this request. */
- headers: Record<string, string>
-}
-/**
- * Capability is used to describe what operations are supported
- * by current Operator.
- *
- * Via capability, we can know:
- *
- * - Whether current Operator supports read or not.
- * - Whether current Operator supports read with if match or not.
- * - What's current Operator max supports batch operations count.
- *
- * Add fields of Capabilities with be public and can be accessed directly.
- */
-export class Capability {
- /** If operator supports stat. */
- get stat(): boolean
- /** If operator supports stat with if match. */
- get statWithIfMatch(): boolean
- /** If operator supports stat with if none match. */
- get statWithIfNoneMatch(): boolean
- /** If operator supports read. */
- get read(): boolean
- /** If operator supports seek on returning reader. */
- get readCanSeek(): boolean
- /** If operator supports next on returning reader. */
- get readCanNext(): boolean
- /** If operator supports read with range. */
- get readWithRange(): boolean
- /** If operator supports read with if match. */
- get readWithIfMatch(): boolean
- /** If operator supports read with if none match. */
- get readWithIfNoneMatch(): boolean
- /** if operator supports read with override cache control. */
- get readWithOverrideCacheControl(): boolean
- /** if operator supports read with override content disposition. */
- get readWithOverrideContentDisposition(): boolean
- /** if operator supports read with override content type. */
- get readWithOverrideContentType(): boolean
- /** If operator supports write. */
- get write(): boolean
- /** If operator supports write can be called in multi times. */
- get writeCanMulti(): boolean
- /** If operator supports write with empty content. */
- get writeCanEmpty(): boolean
- /** If operator supports write by append. */
- get writeCanAppend(): boolean
- /** If operator supports write with content type. */
- get writeWithContentType(): boolean
- /** If operator supports write with content disposition. */
- get writeWithContentDisposition(): boolean
- /** If operator supports write with cache control. */
- get writeWithCacheControl(): boolean
- /**
- * write_multi_max_size is the max size that services support in write_multi.
- *
- * For example, AWS S3 supports 5GiB as max in write_multi.
- */
- get writeMultiMaxSize(): bigint | null
- /**
- * write_multi_min_size is the min size that services support in write_multi.
- *
- * For example, AWS S3 requires at least 5MiB in write_multi expect the last
one.
- */
- get writeMultiMinSize(): bigint | null
- /**
- * write_multi_align_size is the align size that services required in
write_multi.
- *
- * For example, Google GCS requires align size to 256KiB in write_multi.
- */
- get writeMultiAlignSize(): bigint | null
- /**
- * write_total_max_size is the max size that services support in write_total.
- *
- * For example, Cloudflare D1 supports 1MB as max in write_total.
- */
- get writeTotalMaxSize(): bigint | null
- /** If operator supports create dir. */
- get createDir(): boolean
- /** If operator supports delete. */
- get delete(): boolean
- /** If operator supports copy. */
- get copy(): boolean
- /** If operator supports rename. */
- get rename(): boolean
- /** If operator supports list. */
- get list(): boolean
- /** If backend supports list with limit. */
- get listWithLimit(): boolean
- /** If backend supports list with start after. */
- get listWithStartAfter(): boolean
- /** If backend supports list with recursive. */
- get listWithRecursive(): boolean
- /** If backend supports list without recursive. */
- get listWithoutRecursive(): boolean
- /** If operator supports presign. */
- get presign(): boolean
- /** If operator supports presign read. */
- get presignRead(): boolean
- /** If operator supports presign stat. */
- get presignStat(): boolean
- /** If operator supports presign write. */
- get presignWrite(): boolean
- /** If operator supports batch. */
- get batch(): boolean
- /** If operator supports batch delete. */
- get batchDelete(): boolean
- /** The max operations that operator supports in batch. */
- get batchMaxOperations(): bigint | null
- /** If operator supports blocking. */
- get blocking(): boolean
-}
-export class Operator {
- /** @see For a detailed definition of scheme, see
https://opendal.apache.org/docs/category/services */
- constructor(scheme: string, options?: Record<string, string> | undefined |
null)
- /** Get current operator(service)'s full capability. */
- capability(): Capability
- /**
- * Get current path's metadata **without cache** directly.
- *
- * ### Notes
- * Use stat if you:
- *
- * - Want detect the outside changes of path.
- * - Don’t want to read from cached metadata.
- *
- * You may want to use `metadata` if you are working with entries returned
by `Lister`. It’s highly possible that metadata you want has already been
cached.
- *
- * ### Example
- * ```javascript
- * const meta = await op.stat("test");
- * if (meta.isDir) {
- * // do something
- * }
- * ```
- */
- stat(path: string): Promise<Metadata>
- /**
- * Get current path's metadata **without cache** directly and synchronously.
- *
- * ### Example
- * ```javascript
- * const meta = op.statSync("test");
- * if (meta.isDir) {
- * // do something
- * }
- * ```
- */
- statSync(path: string): Metadata
- /**
- * Check if this operator can work correctly.
- *
- * We will send a `list` request to path and return any errors we met.
- *
- * ### Example
- * ```javascript
- * await op.check();
- * ```
- */
- check(): Promise<void>
- /**
- * Check if this path exists or not.
- *
- * ### Example
- * ```javascript
- * await op.isExist("test");
- * ```
- */
- isExist(path: string): Promise<boolean>
- /**
- * Check if this path exists or not synchronously.
- *
- * ### Example
- * ```javascript
- * op.isExistSync("test");
- * ```
- */
- isExistSync(path: string): boolean
- /**
- * Create dir with given path.
- *
- * ### Example
- * ```javascript
- * await op.createDir("path/to/dir/");
- * ```
- */
- createDir(path: string): Promise<void>
- /**
- * Create dir with given path synchronously.
- *
- * ### Example
- * ```javascript
- * op.createDirSync("path/to/dir/");
- * ```
- */
- createDirSync(path: string): void
- /**
- * Read the whole path into a buffer.
- *
- * ### Example
- * ```javascript
- * const buf = await op.read("path/to/file");
- * ```
- */
- read(path: string): Promise<Buffer>
- /**
- * Read the whole path into a buffer synchronously.
- *
- * ### Example
- * ```javascript
- * const buf = op.readSync("path/to/file");
- * ```
- */
- readSync(path: string): Buffer
- /**
- * Write bytes into path.
- *
- * ### Example
- * ```javascript
- * await op.write("path/to/file", Buffer.from("hello world"));
- * // or
- * await op.write("path/to/file", "hello world");
- * ```
- */
- write(path: string, content: Buffer | string): Promise<void>
- /**
- * Write bytes into path synchronously.
- *
- * ### Example
- * ```javascript
- * op.writeSync("path/to/file", Buffer.from("hello world"));
- * // or
- * op.writeSync("path/to/file", "hello world");
- * ```
- */
- writeSync(path: string, content: Buffer | string): void
- /**
- * Append bytes into path.
- *
- * ### Notes
- *
- * - It always appends content to the end of the file.
- * - It will create file if the path not exists.
- *
- * ### Example
- * ```javascript
- * await op.append("path/to/file", Buffer.from("hello world"));
- * // or
- * await op.append("path/to/file", "hello world");
- * ```
- */
- append(path: string, content: Buffer | string): Promise<void>
- /**
- * Copy file according to given `from` and `to` path.
- *
- * ### Example
- * ```javascript
- * await op.copy("path/to/file", "path/to/dest");
- * ```
- */
- copy(from: string, to: string): Promise<void>
- /**
- * Copy file according to given `from` and `to` path synchronously.
- *
- * ### Example
- * ```javascript
- * op.copySync("path/to/file", "path/to/dest");
- * ```
- */
- copySync(from: string, to: string): void
- /**
- * Rename file according to given `from` and `to` path.
- *
- * It's similar to `mv` command.
- *
- * ### Example
- * ```javascript
- * await op.rename("path/to/file", "path/to/dest");
- * ```
- */
- rename(from: string, to: string): Promise<void>
- /**
- * Rename file according to given `from` and `to` path synchronously.
- *
- * It's similar to `mv` command.
- *
- * ### Example
- * ```javascript
- * op.renameSync("path/to/file", "path/to/dest");
- * ```
- */
- renameSync(from: string, to: string): void
- /**
- * List dir in flat way.
- *
- * This function will create a new handle to list entries.
- *
- * An error will be returned if given path doesn't end with /.
- *
- * ### Example
- *
- * ```javascript
- * const lister = await op.scan("/path/to/dir/");
- * while (true) {
- * const entry = await lister.next();
- * if (entry === null) {
- * break;
- * }
- * let meta = await op.stat(entry.path);
- * if (meta.is_file) {
- * // do something
- * }
- * }
- * `````
- */
- scan(path: string): Promise<Lister>
- /**
- * List dir in flat way synchronously.
- *
- * This function will create a new handle to list entries.
- *
- * An error will be returned if given path doesn't end with /.
- *
- * ### Example
- * ```javascript
- * const lister = op.scan_sync(/path/to/dir/");
- * while (true) {
- * const entry = lister.next();
- * if (entry === null) {
- * break;
- * }
- * let meta = op.statSync(entry.path);
- * if (meta.is_file) {
- * // do something
- * }
- * }
- * `````
- */
- scanSync(path: string): BlockingLister
- /**
- * Delete the given path.
- *
- * ### Notes
- * Delete not existing error won’t return errors.
- *
- * ### Example
- * ```javascript
- * await op.delete("test");
- * ```
- */
- delete(path: string): Promise<void>
- /**
- * Delete the given path synchronously.
- *
- * ### Example
- * ```javascript
- * op.deleteSync("test");
- * ```
- */
- deleteSync(path: string): void
- /**
- * Remove given paths.
- *
- * ### Notes
- * If underlying services support delete in batch, we will use batch delete
instead.
- *
- * ### Examples
- * ```javascript
- * await op.remove(["abc", "def"]);
- * ```
- */
- remove(paths: Array<string>): Promise<void>
- /**
- * Remove the path and all nested dirs and files recursively.
- *
- * ### Notes
- * If underlying services support delete in batch, we will use batch delete
instead.
- *
- * ### Examples
- * ```javascript
- * await op.removeAll("path/to/dir/");
- * ```
- */
- removeAll(path: string): Promise<void>
- /**
- * List given path.
- *
- * This function will create a new handle to list entries.
- *
- * An error will be returned if given path doesn't end with `/`.
- *
- * ### Example
- * ```javascript
- * const lister = await op.list("path/to/dir/");
- * while (true) {
- * const entry = await lister.next();
- * if (entry === null) {
- * break;
- * }
- * let meta = await op.stat(entry.path);
- * if (meta.isFile) {
- * // do something
- * }
- * }
- * ```
- */
- list(path: string): Promise<Lister>
- /**
- * List given path synchronously.
- *
- * This function will create a new handle to list entries.
- *
- * An error will be returned if given path doesn't end with `/`.
- *
- * ### Example
- * ```javascript
- * const lister = op.listSync("path/to/dir/");
- * while (true) {
- * const entry = lister.next();
- * if (entry === null) {
- * break;
- * }
- * let meta = op.statSync(entry.path);
- * if (meta.isFile) {
- * // do something
- * }
- * }
- * ```
- */
- listSync(path: string): BlockingLister
- /**
- * Get a presigned request for read.
- *
- * Unit of expires is seconds.
- *
- * ### Example
- *
- * ```javascript
- * const req = await op.presignRead(path, parseInt(expires));
- *
- * console.log("method: ", req.method);
- * console.log("url: ", req.url);
- * console.log("headers: ", req.headers);
- * ```
- */
- presignRead(path: string, expires: number): Promise<PresignedRequest>
- /**
- * Get a presigned request for write.
- *
- * Unit of expires is seconds.
- *
- * ### Example
- *
- * ```javascript
- * const req = await op.presignWrite(path, parseInt(expires));
- *
- * console.log("method: ", req.method);
- * console.log("url: ", req.url);
- * console.log("headers: ", req.headers);
- * ```
- */
- presignWrite(path: string, expires: number): Promise<PresignedRequest>
- /**
- * Get a presigned request for stat.
- *
- * Unit of expires is seconds.
- *
- * ### Example
- *
- * ```javascript
- * const req = await op.presignStat(path, parseInt(expires));
- *
- * console.log("method: ", req.method);
- * console.log("url: ", req.url);
- * console.log("headers: ", req.headers);
- * ```
- */
- presignStat(path: string, expires: number): Promise<PresignedRequest>
- /** Add a layer to this operator. */
- layer(layer: ExternalObject<Layer>): this
-}
-export class Entry {
- /** Return the path of this entry. */
- path(): string
-}
-export class Metadata {
- /** Returns true if the <op.stat> object describes a file system directory.
*/
- isDirectory(): boolean
- /** Returns true if the <op.stat> object describes a regular file. */
- isFile(): boolean
- /** Content-Disposition of this object */
- get contentDisposition(): string | null
- /** Content Length of this object */
- get contentLength(): bigint | null
- /** Content MD5 of this object. */
- get contentMd5(): string | null
- /** Content Type of this object. */
- get contentType(): string | null
- /** ETag of this object. */
- get etag(): string | null
- /**
- * Last Modified of this object.
- *
- * We will output this time in RFC3339 format like
`1996-12-19T16:39:57+08:00`.
- */
- get lastModified(): string | null
-}
-export class Lister {
- /**
- * # Safety
- *
- * > &mut self in async napi methods should be marked as unsafe
- *
- * napi will make sure the function is safe, and we didn't do unsafe
- * thing internally.
- */
- next(): Promise<Entry | null>
-}
-export class BlockingLister {
- next(): Entry | null
-}
-/** A public layer wrapper */
-export class Layer { }
-/**
- * Retry layer
- *
- * Add retry for temporary failed operations.
- *
- * # Notes
- *
- * This layer will retry failed operations when [`Error::is_temporary`]
- * returns true. If operation still failed, this layer will set error to
- * `Persistent` which means error has been retried.
- *
- * `write` and `blocking_write` don't support retry so far, visit [this
issue](https://github.com/apache/incubator-opendal/issues/1223) for more
details.
- *
- * # Examples
- *
- * ```javascript
- * const op = new Operator("file", { root: "/tmp" })
- *
- * const retry = new RetryLayer();
- * retry.max_times = 3;
- * retry.jitter = true;
- *
- * op.layer(retry.build());
- * ```
- */
-export class RetryLayer {
- constructor()
- /**
- * Set jitter of current backoff.
- *
- * If jitter is enabled, ExponentialBackoff will add a random jitter in `[0,
min_delay)
- * to current delay.
- */
- set jitter(v: boolean)
- /**
- * Set max_times of current backoff.
- *
- * Backoff will return `None` if max times is reaching.
- */
- set maxTimes(v: number)
- /**
- * Set factor of current backoff.
- *
- * # Panics
- *
- * This function will panic if input factor smaller than `1.0`.
- */
- set factor(v: number)
- /**
- * Set max_delay of current backoff.
- *
- * Delay will not increasing if current delay is larger than max_delay.
- *
- * # Notes
- *
- * - The unit of max_delay is millisecond.
- */
- set maxDelay(v: number)
- /**
- * Set min_delay of current backoff.
- *
- * # Notes
- *
- * - The unit of min_delay is millisecond.
- */
- set minDelay(v: number)
- build(): ExternalObject<Layer>
-}
+
+export * from './generated'
diff --git a/bindings/nodejs/index.js b/bindings/nodejs/index.js
index c1af66747..22ef628e0 100644
--- a/bindings/nodejs/index.js
+++ b/bindings/nodejs/index.js
@@ -19,7 +19,123 @@
/// <reference types="node" />
-const { Operator, RetryLayer } = require('./generated.js')
+const { Writable, Readable } = require('node:stream')
+
+class ReadStream extends Readable {
+ constructor(reader, options) {
+ super(options)
+ this.reader = reader
+ }
+
+ _read(size) {
+ const buf = Buffer.alloc(size)
+ this.reader
+ .read(buf)
+ .then((s) => {
+ if (s === 0n) {
+ this.push(null)
+ } else {
+ this.push(buf.subarray(0, Number(s)))
+ }
+ })
+ .catch((e) => {
+ this.emit('error', e)
+ })
+ }
+}
+
+class BlockingReadStream extends Readable {
+ constructor(reader, options) {
+ super(options)
+ this.reader = reader
+ }
+
+ _read(size) {
+ try {
+ const buf = Buffer.alloc(size)
+ let s = this.reader.read(buf)
+ if (s === 0n) {
+ this.push(null)
+ } else {
+ this.push(buf.subarray(0, Number(s)))
+ }
+ } catch (e) {
+ this.emit('error', e)
+ }
+ }
+}
+
+class WriteStream extends Writable {
+ constructor(writer, options) {
+ super(options)
+ this.writer = writer
+ }
+
+ _write(chunk, encoding, callback) {
+ this.writer
+ .write(chunk)
+ .then(() => {
+ callback()
+ })
+ .catch((e) => {
+ callback(e)
+ })
+ }
+
+ _final(callback) {
+ this.writer
+ .close()
+ .then(() => {
+ callback()
+ })
+ .catch((e) => {
+ callback(e)
+ })
+ }
+}
+
+class BlockingWriteStream extends Writable {
+ constructor(writer, options) {
+ super(options)
+ this.writer = writer
+ }
+
+ _write(chunk, encoding, callback) {
+ try {
+ this.writer.write(chunk)
+ callback()
+ } catch (e) {
+ callback(e)
+ }
+ }
+
+ _final(callback) {
+ try {
+ this.writer.close()
+ callback()
+ } catch (e) {
+ callback(e)
+ }
+ }
+}
+
+const { Operator, RetryLayer, BlockingReader, Reader, BlockingWriter, Writer }
= require('./generated.js')
+
+BlockingReader.prototype.createReadStream = function (options) {
+ return new BlockingReadStream(this, options)
+}
+
+Reader.prototype.createReadStream = function (options) {
+ return new ReadStream(this, options)
+}
+
+BlockingWriter.prototype.createWriteStream = function (options) {
+ return new BlockingWriteStream(this, options)
+}
+
+Writer.prototype.createWriteStream = function (options) {
+ return new WriteStream(this, options)
+}
module.exports.Operator = Operator
module.exports.layers = {
diff --git a/bindings/nodejs/package.json b/bindings/nodejs/package.json
index 366172015..8e7b8739a 100644
--- a/bindings/nodejs/package.json
+++ b/bindings/nodejs/package.json
@@ -41,6 +41,7 @@
"files": [
"index.d.ts",
"index.js",
+ "generated.d.ts",
"generated.js",
"LICENSE",
"NOTICE"
@@ -55,7 +56,7 @@
"benny": "^3.7.1",
"dotenv": "^16.0.3",
"prettier": "^2.8.4",
- "typedoc": "^0.24",
+ "typedoc": "^0.25",
"typescript": "^5.0.2",
"vitest": "^0.34.6"
},
@@ -63,8 +64,8 @@
"node": ">= 10"
},
"scripts": {
- "build": "napi build --platform --features \"${NAPI_FEATURES:-}\" --target
\"${NAPI_TARGET:-}\" --release --js generated.js && node ./scripts/header.js",
- "build:debug": "napi build --platform --features \"${NAPI_FEATURES:-}\"
--target \"${NAPI_TARGET:-}\" --js generated.js && node ./scripts/header.js",
+ "build": "napi build --platform --features \"${NAPI_FEATURES:-}\" --target
\"${NAPI_TARGET:-}\" --release --js generated.js --dts generated.d.ts && node
./scripts/header.js",
+ "build:debug": "napi build --platform --features \"${NAPI_FEATURES:-}\"
--target \"${NAPI_TARGET:-}\" --js generated.js --dts generated.d.ts && node
./scripts/header.js",
"docs": "typedoc",
"format": "prettier --write .",
"test": "vitest",
diff --git a/bindings/nodejs/scripts/header.js
b/bindings/nodejs/scripts/header.js
index fd10f52b6..6d15af5c4 100644
--- a/bindings/nodejs/scripts/header.js
+++ b/bindings/nodejs/scripts/header.js
@@ -19,7 +19,7 @@
const fs = require('fs')
-let files = ['generated.js', 'index.d.ts']
+let files = ['generated.js', 'generated.d.ts']
for (path of files) {
let data = fs.readFileSync(path, 'utf8')
diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs
index e243ce8b1..bd3525bfc 100644
--- a/bindings/nodejs/src/lib.rs
+++ b/bindings/nodejs/src/lib.rs
@@ -24,6 +24,9 @@ use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;
+use opendal::raw::oio::BlockingRead;
+use opendal::raw::oio::ReadExt;
+
use futures::TryStreamExt;
use napi::bindgen_prelude::*;
@@ -176,6 +179,15 @@ impl Operator {
Ok(res.into())
}
+ /// Create a reader to read the given path.
+ ///
+ /// It could be used to read large file in a streaming way.
+ #[napi]
+ pub async fn reader(&self, path: String) -> Result<Reader> {
+ let r = self.0.reader(&path).await.map_err(format_napi_error)?;
+ Ok(Reader(r))
+ }
+
/// Read the whole path into a buffer synchronously.
///
/// ### Example
@@ -188,6 +200,15 @@ impl Operator {
Ok(res.into())
}
+ /// Create a reader to read the given path synchronously.
+ ///
+ /// It could be used to read large file in a streaming way.
+ #[napi]
+ pub fn reader_sync(&self, path: String) -> Result<BlockingReader> {
+ let r = self.0.blocking().reader(&path).map_err(format_napi_error)?;
+ Ok(BlockingReader(r))
+ }
+
/// Write bytes into path.
///
/// ### Example
@@ -205,6 +226,24 @@ impl Operator {
self.0.write(&path, c).await.map_err(format_napi_error)
}
+ /// Write multiple bytes into path.
+ ///
+ /// It could be used to write large file in a streaming way.
+ #[napi]
+ pub async fn writer(&self, path: String) -> Result<Writer> {
+ let w = self.0.writer(&path).await.map_err(format_napi_error)?;
+ Ok(Writer(w))
+ }
+
+ /// Write multiple bytes into path synchronously.
+ ///
+ /// It could be used to write large file in a streaming way.
+ #[napi]
+ pub fn writer_sync(&self, path: String) -> Result<BlockingWriter> {
+ let w = self.0.blocking().writer(&path).map_err(format_napi_error)?;
+ Ok(BlockingWriter(w))
+ }
+
/// Write bytes into path synchronously.
///
/// ### Example
@@ -548,6 +587,7 @@ impl Operator {
}
}
+/// Entry returned by Lister or BlockingLister to represent a path and it's
relative metadata.
#[napi]
pub struct Entry(opendal::Entry);
@@ -560,6 +600,7 @@ impl Entry {
}
}
+/// Metadata carries all metadata associated with a path.
#[napi]
pub struct Metadata(opendal::Metadata);
@@ -616,6 +657,132 @@ impl Metadata {
}
}
+/// BlockingReader is designed to read data from given path in an blocking
+/// manner.
+#[napi]
+pub struct BlockingReader(opendal::BlockingReader);
+
+#[napi]
+impl BlockingReader {
+ #[napi]
+ pub fn read(&mut self, mut buf: Buffer) -> Result<usize> {
+ self.0.read(buf.as_mut()).map_err(format_napi_error)
+ }
+}
+
+/// Reader is designed to read data from given path in an asynchronous
+/// manner.
+#[napi]
+pub struct Reader(opendal::Reader);
+
+#[napi]
+impl Reader {
+ /// # Safety
+ ///
+ /// > &mut self in async napi methods should be marked as unsafe
+ ///
+ /// Read bytes from this reader into given buffer.
+ #[napi]
+ pub async unsafe fn read(&mut self, mut buf: Buffer) -> Result<usize> {
+ self.0.read(buf.as_mut()).await.map_err(format_napi_error)
+ }
+}
+
+/// BlockingWriter is designed to write data into given path in an blocking
+/// manner.
+#[napi]
+pub struct BlockingWriter(opendal::BlockingWriter);
+
+#[napi]
+impl BlockingWriter {
+ /// # Safety
+ ///
+ /// > &mut self in async napi methods should be marked as unsafe
+ ///
+ /// Write bytes into this writer.
+ ///
+ /// ### Example
+ /// ```javascript
+ /// const writer = await op.writer("path/to/file");
+ /// await writer.write(Buffer.from("hello world"));
+ /// await writer.close();
+ /// ```
+ #[napi]
+ pub unsafe fn write(&mut self, content: Either<Buffer, String>) ->
Result<()> {
+ let c = match content {
+ Either::A(buf) => buf.as_ref().to_owned(),
+ Either::B(s) => s.into_bytes(),
+ };
+ self.0.write(c).map_err(format_napi_error)
+ }
+
+ /// # Safety
+ ///
+ /// > &mut self in async napi methods should be marked as unsafe
+ ///
+ /// Close this writer.
+ ///
+ /// ### Example
+ ///
+ /// ```javascript
+ /// const writer = op.writerSync("path/to/file");
+ /// writer.write(Buffer.from("hello world"));
+ /// writer.close();
+ /// ```
+ #[napi]
+ pub unsafe fn close(&mut self) -> Result<()> {
+ self.0.close().map_err(format_napi_error)
+ }
+}
+
+/// Writer is designed to write data into given path in an asynchronous
+/// manner.
+#[napi]
+pub struct Writer(opendal::Writer);
+
+#[napi]
+impl Writer {
+ /// # Safety
+ ///
+ /// > &mut self in async napi methods should be marked as unsafe
+ ///
+ /// Write bytes into this writer.
+ ///
+ /// ### Example
+ /// ```javascript
+ /// const writer = await op.writer("path/to/file");
+ /// await writer.write(Buffer.from("hello world"));
+ /// await writer.close();
+ /// ```
+ #[napi]
+ pub async unsafe fn write(&mut self, content: Either<Buffer, String>) ->
Result<()> {
+ let c = match content {
+ Either::A(buf) => buf.as_ref().to_owned(),
+ Either::B(s) => s.into_bytes(),
+ };
+ self.0.write(c).await.map_err(format_napi_error)
+ }
+
+ /// # Safety
+ ///
+ /// > &mut self in async napi methods should be marked as unsafe
+ ///
+ /// Close this writer.
+ ///
+ /// ### Example
+ /// ```javascript
+ /// const writer = await op.writer("path/to/file");
+ /// await writer.write(Buffer.from("hello world"));
+ /// await writer.close();
+ /// ```
+ #[napi]
+ pub async unsafe fn close(&mut self) -> Result<()> {
+ self.0.close().await.map_err(format_napi_error)
+ }
+}
+
+/// Lister is designed to list entries at given path in an asynchronous
+/// manner.
#[napi]
pub struct Lister(opendal::Lister);
@@ -638,6 +805,8 @@ impl Lister {
}
}
+/// BlockingLister is designed to list entries at given path in a blocking
+/// manner.
#[napi]
pub struct BlockingLister(opendal::BlockingLister);
@@ -658,6 +827,7 @@ impl BlockingLister {
}
}
+/// PresignedRequest is a presigned request return by `presign`.
#[napi(object)]
pub struct PresignedRequest {
/// HTTP method of this request.
diff --git a/bindings/nodejs/tests/suites/async.suite.mjs
b/bindings/nodejs/tests/suites/async.suite.mjs
index 4e6545661..b1196dda4 100644
--- a/bindings/nodejs/tests/suites/async.suite.mjs
+++ b/bindings/nodejs/tests/suites/async.suite.mjs
@@ -19,15 +19,67 @@
import { randomUUID } from 'node:crypto'
import { test } from 'vitest'
+import { generateBytes, generateFixedBytes } from '../utils.mjs'
+import { Readable, Writable } from 'node:stream'
+import { finished, pipeline } from 'node:stream/promises'
-export function run(operator) {
- test('async stat not exist files', async () => {
- const filename = `random_file_${randomUUID()}`
+export function run(op) {
+ describe('async tests', () => {
+ test('async stat not exist files', async () => {
+ const filename = `random_file_${randomUUID()}`
- try {
- await operator.stat(filename)
- } catch (error) {
- assert.include(error.message, 'NotFound')
- }
+ try {
+ await op.stat(filename)
+ } catch (error) {
+ assert.include(error.message, 'NotFound')
+ }
+ })
+
+ test.runIf(op.capability().write &&
op.capability().writeCanMulti)('reader/writer stream pipeline', async () => {
+ const filename = `random_file_${randomUUID()}`
+ const buf = generateFixedBytes(5 * 1024 * 1024)
+ const rs = Readable.from(buf, {
+ highWaterMark: 5 * 1024 * 1024, // to buffer 5MB data to read
+ })
+ const w = await op.writer(filename)
+ const ws = w.createWriteStream()
+ await pipeline(rs, ws)
+
+ await finished(ws)
+
+ const t = await op.stat(filename)
+ assert.equal(t.contentLength, buf.length)
+
+ const content = await op.read(filename)
+ assert.equal(Buffer.compare(content, buf), 0) // 0 means equal
+
+ await op.delete(filename)
+ })
+
+ test.runIf(op.capability().write)('read stream', async () => {
+ let c = generateFixedBytes(3 * 1024 * 1024)
+ const filename = `random_file_${randomUUID()}`
+
+ await op.write(filename, c)
+
+ const r = await op.reader(filename)
+ const rs = r.createReadStream()
+ let chunks = []
+ await pipeline(
+ rs,
+ new Writable({
+ write(chunk, encoding, callback) {
+ chunks.push(chunk)
+ callback()
+ },
+ }),
+ )
+
+ await finished(rs)
+ const buf = Buffer.concat(chunks)
+ assert.equal(Buffer.compare(buf, c), 0)
+
+ op.deleteSync(filename)
+ })
})
}
diff --git a/bindings/nodejs/tests/suites/index.mjs
b/bindings/nodejs/tests/suites/index.mjs
index cbfe739ee..e9f706f8a 100644
--- a/bindings/nodejs/tests/suites/index.mjs
+++ b/bindings/nodejs/tests/suites/index.mjs
@@ -34,7 +34,11 @@ export function runner(testName, scheme) {
const config = loadConfigFromEnv(scheme)
if (checkRandomRootEnabled()) {
- config.root = generateRandomRoot(config.root)
+ if (config.root) {
+ config.root = generateRandomRoot(config.root)
+ } else {
+ console.warn("The root is not set. Won't generate random root.")
+ }
}
let operator = scheme ? new Operator(scheme, config) : null
diff --git a/bindings/nodejs/tests/suites/sync.suite.mjs
b/bindings/nodejs/tests/suites/sync.suite.mjs
index d27a3895a..38c186d57 100644
--- a/bindings/nodejs/tests/suites/sync.suite.mjs
+++ b/bindings/nodejs/tests/suites/sync.suite.mjs
@@ -19,15 +19,66 @@
import { randomUUID } from 'node:crypto'
import { test } from 'vitest'
+import { WriteStream, ReadStream } from '../../index.js'
+import { generateFixedBytes } from '../utils.mjs'
+import { Readable } from 'node:stream'
-export function run(operator) {
- test('sync stat not exist files', () => {
- const filename = `random_file_${randomUUID()}`
+export function run(op) {
+ describe.runIf(op.capability().blocking)('sync tests', () => {
+ test('sync stat not exist files', () => {
+ const filename = `random_file_${randomUUID()}`
- try {
- operator.statSync(filename)
- } catch (error) {
- assert.include(error.message, 'NotFound')
- }
+ try {
+ op.statSync(filename)
+ } catch (error) {
+ assert.include(error.message, 'NotFound')
+ }
+ })
+
+ test.runIf(op.capability().write && op.capability().writeCanMulti)(
+ 'blocking reader/writer stream pipeline',
+ async () => {
+ const filename = `random_file_${randomUUID()}`
+ const buf = generateFixedBytes(5 * 1024 * 1024)
+ const rs = Readable.from(buf, {
+ highWaterMark: 5 * 1024 * 1024, // to buffer 5MB data to read
+ })
+ const w = op.writerSync(filename)
+ const ws = w.createWriteStream()
+ rs.pipe(ws)
+
+ ws.on('finish', () => {
+ const t = op.statSync(filename)
+ assert.equal(t.contentLength, buf.length)
+
+ const content = op.readSync(filename)
+ assert.equal(Buffer.compare(content, buf), 0) // 0 means equal
+
+ op.deleteSync(filename)
+ })
+ },
+ )
+
+ test.runIf(op.capability().write)('blocking read stream', async () => {
+ let c = generateFixedBytes(3 * 1024 * 1024)
+ const filename = `random_file_${randomUUID()}`
+
+ await op.write(filename, c)
+
+ const r = op.readerSync(filename)
+ const rs = r.createReadStream()
+
+ let chunks = []
+ rs.on('data', (chunk) => {
+ chunks.push(chunk)
+ })
+
+ rs.on('end', () => {
+ const buf = Buffer.concat(chunks)
+ assert.equal(Buffer.compare(buf, c), 0)
+
+ op.deleteSync(filename)
+ })
+ })
})
}
diff --git a/bindings/nodejs/tests/utils.mjs b/bindings/nodejs/tests/utils.mjs
index 6124cff98..bf685612e 100644
--- a/bindings/nodejs/tests/utils.mjs
+++ b/bindings/nodejs/tests/utils.mjs
@@ -19,15 +19,14 @@
import crypto from 'node:crypto'
+// Generate random bytes between 1 and 1024 KB
export function generateBytes() {
- const size = Math.floor(Math.random() * 1024) + 1
- const content = []
-
- for (let i = 0; i < size; i++) {
- content.push(Math.floor(Math.random() * 256))
- }
+ return crypto.randomBytes((Math.floor(Math.random() * 1024) + 1) * 1024)
+}
- return Buffer.from(content)
+// Generate random bytes with given size
+export function generateFixedBytes(size) {
+ return crypto.randomBytes(size)
}
export function loadTestSchemeFromEnv() {
diff --git a/bindings/nodejs/vitest.config.js b/bindings/nodejs/vitest.config.js
index baaf4bde4..6c38b2dcb 100644
--- a/bindings/nodejs/vitest.config.js
+++ b/bindings/nodejs/vitest.config.js
@@ -28,5 +28,6 @@ export default defineConfig({
environment: 'node',
dir: 'tests',
reporters: 'basic',
+ testTimeout: 300 * 1000,
},
})
diff --git a/bindings/nodejs/yarn.lock b/bindings/nodejs/yarn.lock
index 40a9b1cbf..e601368ff 100644
--- a/bindings/nodejs/yarn.lock
+++ b/bindings/nodejs/yarn.lock
@@ -2687,7 +2687,7 @@ __metadata:
languageName: node
linkType: hard
-"minimatch@npm:^9.0.0, minimatch@npm:^9.0.1":
+"minimatch@npm:^9.0.1, minimatch@npm:^9.0.3":
version: 9.0.3
resolution: "minimatch@npm:9.0.3"
dependencies:
@@ -2906,7 +2906,7 @@ __metadata:
benny: ^3.7.1
dotenv: ^16.0.3
prettier: ^2.8.4
- typedoc: ^0.24
+ typedoc: ^0.25
typescript: ^5.0.2
vitest: ^0.34.6
languageName: unknown
@@ -3396,19 +3396,19 @@ __metadata:
languageName: node
linkType: hard
-"typedoc@npm:^0.24":
- version: 0.24.8
- resolution: "typedoc@npm:0.24.8"
+"typedoc@npm:^0.25":
+ version: 0.25.4
+ resolution: "typedoc@npm:0.25.4"
dependencies:
lunr: ^2.3.9
marked: ^4.3.0
- minimatch: ^9.0.0
+ minimatch: ^9.0.3
shiki: ^0.14.1
peerDependencies:
- typescript: 4.6.x || 4.7.x || 4.8.x || 4.9.x || 5.0.x || 5.1.x
+ typescript: 4.6.x || 4.7.x || 4.8.x || 4.9.x || 5.0.x || 5.1.x || 5.2.x ||
5.3.x
bin:
typedoc: bin/typedoc
- checksum:
a46a14497f789fb3594e6c3af2e45276934ac46df40b7ed15a504ee51dc7a8013a2ffb3a54fd73abca6a2b71f97d3ec9ad356fa9aa81d29743e4645a965a2ae0
+ checksum:
6d441baa277c0db4d577db2932a7af316d175415841e2faf2e68e3eda6ad60356c54f56374f89c5233d7bd5c057b0337455e5d484d8463e1445e67c37a6d94eb
languageName: node
linkType: hard
diff --git a/core/src/services/swift/core.rs b/core/src/services/swift/core.rs
index e236339a2..ca98a518e 100644
--- a/core/src/services/swift/core.rs
+++ b/core/src/services/swift/core.rs
@@ -17,8 +17,9 @@
use std::fmt::Debug;
+use http::header;
+use http::Request;
use http::Response;
-use http::{header, Request};
use serde::Deserialize;
use crate::raw::*;
diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs
index 6f21a13ba..06721cc4d 100644
--- a/core/tests/behavior/main.rs
+++ b/core/tests/behavior/main.rs
@@ -55,7 +55,8 @@ use blocking_read_only::behavior_blocking_read_only_tests;
use blocking_rename::behavior_blocking_rename_tests;
use blocking_write::behavior_blocking_write_tests;
// External dependences
-use libtest_mimic::{Arguments, Trial};
+use libtest_mimic::Arguments;
+use libtest_mimic::Trial;
use opendal::raw::tests::init_test_service;
use opendal::*;