This is an automated email from the ASF dual-hosted git repository.
suyanhanx pushed a commit to branch nodejs-stream
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/nodejs-stream by this push:
new 3cc9e094c try read stream
3cc9e094c is described below
commit 3cc9e094cca90659ec457ace1e1b7434ac67bca2
Author: suyanhanx <[email protected]>
AuthorDate: Sun Nov 26 13:43:54 2023 +0800
try read stream
Signed-off-by: suyanhanx <[email protected]>
---
bindings/nodejs/generated.js | 3 ++-
bindings/nodejs/index.d.ts | 4 ++++
bindings/nodejs/index.js | 22 ++++++++++++++++++++++
bindings/nodejs/src/lib.rs | 19 +++++++++++++++++++
bindings/nodejs/tests/suites/sync.suite.mjs | 22 +++++++++++++++++++++-
5 files changed, 68 insertions(+), 2 deletions(-)
diff --git a/bindings/nodejs/generated.js b/bindings/nodejs/generated.js
index 6f1f80175..4af2ebbb3 100644
--- a/bindings/nodejs/generated.js
+++ b/bindings/nodejs/generated.js
@@ -271,12 +271,13 @@ if (!nativeBinding) {
throw new Error(`Failed to load native binding`)
}
-const { Capability, Operator, Entry, Metadata, BlockingWriter, Writer, Lister,
BlockingLister, Layer, RetryLayer } = nativeBinding
+const { Capability, Operator, Entry, Metadata, BlockingReader, BlockingWriter,
Writer, Lister, BlockingLister, Layer, RetryLayer } = nativeBinding
module.exports.Capability = Capability
module.exports.Operator = Operator
module.exports.Entry = Entry
module.exports.Metadata = Metadata
+module.exports.BlockingReader = BlockingReader
module.exports.BlockingWriter = BlockingWriter
module.exports.Writer = Writer
module.exports.Lister = Lister
diff --git a/bindings/nodejs/index.d.ts b/bindings/nodejs/index.d.ts
index 560cd0eda..28e480328 100644
--- a/bindings/nodejs/index.d.ts
+++ b/bindings/nodejs/index.d.ts
@@ -248,6 +248,7 @@ export class Operator {
* ```
*/
readSync(path: string): Buffer
+ readerSync(path: string): BlockingReader
/**
* Write bytes into path.
*
@@ -543,6 +544,9 @@ export class Metadata {
*/
get lastModified(): string | null
}
+export class BlockingReader {
+ read(buf: Buffer): bigint
+}
export class BlockingWriter {
/**
* # Safety
diff --git a/bindings/nodejs/index.js b/bindings/nodejs/index.js
index 6c28368fd..67eea2c1d 100644
--- a/bindings/nodejs/index.js
+++ b/bindings/nodejs/index.js
@@ -21,6 +21,27 @@
const { Writable, Readable } = require('node:stream')
+class ReaderStream extends Readable {
+ constructor(operator, path, options) {
+ super(options)
+ this.reader = operator.readerSync(path)
+ }
+
+ _read(size) {
+ try {
+ const buf = Buffer.alloc(size)
+ let s = this.reader.read(buf)
+ if (s === 0n) {
+ this.push(null)
+ } else {
+ this.push(buf.subarray(0, Number(s)))
+ }
+ } catch (e) {
+ this.emit('error', e)
+ }
+ }
+}
+
class WriterStream extends Writable {
constructor(operator, path, options) {
super(options)
@@ -53,3 +74,4 @@ module.exports.layers = {
RetryLayer,
}
module.exports.WriterStream = WriterStream
+module.exports.ReaderStream = ReaderStream
diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs
index 027f574bb..0b9204668 100644
--- a/bindings/nodejs/src/lib.rs
+++ b/bindings/nodejs/src/lib.rs
@@ -24,6 +24,8 @@ use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;
+use opendal::raw::oio::BlockingRead;
+
use futures::TryStreamExt;
use napi::bindgen_prelude::*;
@@ -188,6 +190,12 @@ impl Operator {
Ok(res.into())
}
+ #[napi]
+ pub fn reader_sync(&self, path: String) -> Result<BlockingReader> {
+ let r = self.0.blocking().reader(&path).map_err(format_napi_error)?;
+ Ok(BlockingReader(r))
+ }
+
/// Write bytes into path.
///
/// ### Example
@@ -628,6 +636,17 @@ impl Metadata {
}
}
+#[napi]
+pub struct BlockingReader(opendal::BlockingReader);
+
+#[napi]
+impl BlockingReader {
+ #[napi]
+ pub fn read(&mut self, mut buf: Buffer) -> Result<usize> {
+ self.0.read(buf.as_mut()).map_err(format_napi_error)
+ }
+}
+
#[napi]
pub struct BlockingWriter(opendal::BlockingWriter);
diff --git a/bindings/nodejs/tests/suites/sync.suite.mjs
b/bindings/nodejs/tests/suites/sync.suite.mjs
index dcf847aa2..9bd4262d1 100644
--- a/bindings/nodejs/tests/suites/sync.suite.mjs
+++ b/bindings/nodejs/tests/suites/sync.suite.mjs
@@ -19,7 +19,7 @@
import { randomUUID } from 'node:crypto'
import { test } from 'vitest'
-import { WriterStream } from '../../index.js'
+import { WriterStream, ReaderStream } from '../../index.js'
import { BufferStream, generateFixedBytes } from '../utils.mjs'
export function run(op) {
@@ -53,5 +53,25 @@ export function run(op) {
op.deleteSync(filename)
})
})
+
+ test('read stream', async () => {
+ let c = generateFixedBytes(5 * 1024 * 1024)
+ const filename = `random_file_${randomUUID()}`
+
+ op.writeSync(filename, c)
+
+ const r = new ReaderStream(op, filename)
+ let chunks = []
+ r.on('data', (chunk) => {
+ chunks.push(chunk)
+ })
+
+ r.on('end', () => {
+ const buf = Buffer.concat(chunks)
+ assert.equal(Buffer.compare(buf, c), 0)
+
+ op.deleteSync(filename)
+ })
+ })
})
}