This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new d61179d70 feat(javascript): add support for streaming ingest (#4125)
d61179d70 is described below
commit d61179d702bb174017cf0a97f70407db33061378
Author: Kent Wu <[email protected]>
AuthorDate: Fri Mar 20 19:18:32 2026 -0400
feat(javascript): add support for streaming ingest (#4125)
Adds streaming ingest support to the Node.js ADBC driver manager.
**`AdbcConnection`:**
- `ingestStream(tableName, reader, options?)` — streams a
`RecordBatchReader` into a table, handling the full ingest lifecycle
**`AdbcStatement`:**
- `bindStream(reader)` — streams a `RecordBatchReader` as bound
parameters and leaves execution to the caller
**Implementation:** `ingestStream` uses a Rust `mpsc` channel to bridge
the JS push loop and the ADBC `execute_update` call running on the
thread pool. JS serializes each batch to Arrow IPC and sends it to the
`ChannelBatchReader` on the Rust side which deserializes it. The channel
is unbounded to avoid blocking the JS event loop.
**Test Plan**
```
npm run test
```
Closes #4117
---
javascript/__test__/ingest.spec.ts | 97 +++++++++++++++++++++++++++++-
javascript/binding.d.ts | 4 ++
javascript/lib/index.ts | 88 +++++++++++++++++++++++----
javascript/lib/types.ts | 24 ++++++++
javascript/src/client.rs | 62 +++++++++++++++++++
javascript/src/lib.rs | 118 +++++++++++++++++++++++++++++++++++++
6 files changed, 380 insertions(+), 13 deletions(-)
diff --git a/javascript/__test__/ingest.spec.ts
b/javascript/__test__/ingest.spec.ts
index 2cfc8c907..4c13eee7d 100644
--- a/javascript/__test__/ingest.spec.ts
+++ b/javascript/__test__/ingest.spec.ts
@@ -18,8 +18,8 @@
import { test, before, after } from 'node:test'
import assert from 'node:assert/strict'
import { createSqliteDatabase } from './test_utils'
-import { AdbcDatabase, AdbcConnection, IngestMode } from '../lib/index.js'
-import { tableFromArrays, Table } from 'apache-arrow'
+import { AdbcDatabase, AdbcConnection, AdbcError, IngestMode } from
'../lib/index.js'
+import { tableFromArrays, Table, RecordBatchReader, tableToIPC } from
'apache-arrow'
let db: AdbcDatabase
let conn: AdbcConnection
@@ -91,6 +91,99 @@ test('ingest: multi-batch table inserts all batches', async
() => {
assert.strictEqual(result.numRows, 2)
})
+test('ingestStream: streams batches into a new table', async () => {
+ const data = tableFromArrays({ id: [1, 2, 3], name: ['alice', 'bob',
'carol'] })
+ const reader = RecordBatchReader.from(tableToIPC(data, 'stream'))
+ const rowCount = await conn.ingestStream('ingest_stream_basic', reader)
+ assert.strictEqual(rowCount, 3)
+
+ const result = await conn.query('SELECT id, name FROM ingest_stream_basic
ORDER BY id')
+ assert.strictEqual(result.numRows, 3)
+ assert.strictEqual(result.getChildAt(0)?.get(0), 1)
+ assert.strictEqual(result.getChildAt(1)?.get(2), 'carol')
+})
+
+test('ingestStream: streams multi-batch data', async () => {
+ const batch = tableFromArrays({ id: [1], name: ['alice'] }).batches[0]
+ const multiTable = new Table([batch, batch, batch])
+ const reader = RecordBatchReader.from(tableToIPC(multiTable, 'stream'))
+ const rowCount = await conn.ingestStream('ingest_stream_multi', reader)
+ assert.strictEqual(rowCount, 3)
+
+ const result = await conn.query('SELECT id FROM ingest_stream_multi')
+ assert.strictEqual(result.numRows, 3)
+})
+
+test('ingestStream: append mode with stream', async () => {
+ const initial = tableFromArrays({ id: [1] })
+ await conn.ingest('ingest_stream_append', initial)
+
+ const more = tableFromArrays({ id: [2] })
+ const reader = RecordBatchReader.from(tableToIPC(more, 'stream'))
+ const rowCount = await conn.ingestStream('ingest_stream_append', reader, {
+ mode: IngestMode.Append,
+ })
+ assert.strictEqual(rowCount, 1)
+
+ const result = await conn.query('SELECT id FROM ingest_stream_append ORDER
BY id')
+ assert.strictEqual(result.numRows, 2)
+})
+
+test('ingestStream: empty reader creates table with no rows', async () => {
+ const empty = tableFromArrays({ id: [] as number[] })
+ const reader = RecordBatchReader.from(tableToIPC(empty, 'stream'))
+ const rowCount = await conn.ingestStream('ingest_stream_empty', reader)
+ assert.strictEqual(rowCount, 0)
+
+ const result = await conn.query('SELECT id FROM ingest_stream_empty')
+ assert.strictEqual(result.numRows, 0)
+})
+
+test('ingestStream: schema mismatch on append surfaces AdbcError', async () =>
{
+ const initial = tableFromArrays({ id: [1] })
+ await conn.ingest('ingest_stream_mismatch', initial)
+
+ const bad = tableFromArrays({ id: [2], extra: ['oops'] })
+ const reader = RecordBatchReader.from(tableToIPC(bad, 'stream'))
+ await assert.rejects(
+ () => conn.ingestStream('ingest_stream_mismatch', reader, { mode:
IngestMode.Append }),
+ (e: unknown) => {
+ assert.ok(e instanceof AdbcError)
+ assert.match(e.message, /no column named extra/i)
+ return true
+ },
+ )
+})
+
+test('ingestStream: many small batches', async () => {
+ const oneBatch = tableFromArrays({ id: [1] }).batches[0]
+ const batches = Array.from({ length: 100 }, () => oneBatch)
+ const bigTable = new Table(batches)
+ const reader = RecordBatchReader.from(tableToIPC(bigTable, 'stream'))
+ const rowCount = await conn.ingestStream('ingest_stream_many', reader)
+ assert.strictEqual(rowCount, 100)
+
+ const result = await conn.query('SELECT count(*) as cnt FROM
ingest_stream_many')
+ assert.strictEqual(result.getChildAt(0)?.get(0), 100n)
+})
+
+test('ingestStream: reader error mid-iteration propagates', async () => {
+ async function* failingGenerator() {
+ yield tableToIPC(tableFromArrays({ id: [1] }), 'stream')
+ throw new Error('reader exploded')
+ }
+ const reader = await RecordBatchReader.from(failingGenerator())
+
+ await assert.rejects(
+ () => conn.ingestStream('ingest_stream_fail', reader),
+ (e: unknown) => {
+ assert.ok(e instanceof Error)
+ assert.match(e.message, /reader exploded/)
+ return true
+ },
+ )
+})
+
test('ingest: create_append mode creates table if not exists then appends',
async () => {
const data = tableFromArrays({ id: [1] })
await conn.ingest('ingest_create_append', data, { mode:
IngestMode.CreateAppend })
diff --git a/javascript/binding.d.ts b/javascript/binding.d.ts
index c771d3ea7..b6891e90a 100644
--- a/javascript/binding.d.ts
+++ b/javascript/binding.d.ts
@@ -32,6 +32,10 @@ export declare class NativeAdbcStatement {
executeQuery(): Promise<unknown>
executeUpdate(): Promise<unknown>
bind(data: Buffer): Promise<unknown>
+ startBindStreamExecute(schemaBytes: Buffer): Promise<unknown>
+ startBindStream(schemaBytes: Buffer): Promise<unknown>
+ pushBatch(data: Buffer): void
+ endStream(): void
close(): void
}
export type _NativeAdbcStatement = NativeAdbcStatement
diff --git a/javascript/lib/index.ts b/javascript/lib/index.ts
index baa84326a..ea35ac725 100644
--- a/javascript/lib/index.ts
+++ b/javascript/lib/index.ts
@@ -215,20 +215,28 @@ export class AdbcConnection implements
AdbcConnectionInterface {
}
}
+ private setIngestOptions(
+ stmt: { setOption(key: string, value: string): void },
+ tableName: string,
+ options?: IngestOptions,
+ ): void {
+ stmt.setOption('adbc.ingest.target_table', tableName)
+ stmt.setOption('adbc.ingest.mode', options?.mode ?? IngestMode.Create)
+ if (options?.catalog !== undefined) {
+ stmt.setOption('adbc.ingest.target_catalog', options.catalog)
+ }
+ if (options?.dbSchema !== undefined) {
+ stmt.setOption('adbc.ingest.target_db_schema', options.dbSchema)
+ }
+ if (options?.temporary === true) {
+ stmt.setOption('adbc.ingest.temporary', 'true')
+ }
+ }
+
async ingest(tableName: string, data: Table, options?: IngestOptions):
Promise<number> {
const stmt = await this.createStatement()
try {
- stmt.setOption('adbc.ingest.target_table', tableName)
- stmt.setOption('adbc.ingest.mode', options?.mode ?? IngestMode.Create)
- if (options?.catalog !== undefined) {
- stmt.setOption('adbc.ingest.target_catalog', options.catalog)
- }
- if (options?.dbSchema !== undefined) {
- stmt.setOption('adbc.ingest.target_db_schema', options.dbSchema)
- }
- if (options?.temporary === true) {
- stmt.setOption('adbc.ingest.temporary', 'true')
- }
+ this.setIngestOptions(stmt, tableName, options)
await stmt.bind(data)
return await stmt.executeUpdate()
} finally {
@@ -236,6 +244,38 @@ export class AdbcConnection implements
AdbcConnectionInterface {
}
}
+ async ingestStream(tableName: string, reader: RecordBatchReader, options?:
IngestOptions): Promise<number> {
+ const nativeStmt = (await this._inner.createStatement()) as
NativeAdbcStatement
+ try {
+ this.setIngestOptions(nativeStmt, tableName, options)
+
+ await reader.open()
+ const schemaTable = new Table(reader.schema, [])
+ const schemaBytes = tableToIPC(schemaTable, 'stream')
+ const promise =
nativeStmt.startBindStreamExecute(Buffer.from(schemaBytes))
+
+ let pushError: unknown
+ try {
+ for await (const batch of reader) {
+ const batchBytes = tableToIPC(new Table([batch]), 'stream')
+ nativeStmt.pushBatch(Buffer.from(batchBytes))
+ }
+ } catch (e) {
+ pushError = e
+ } finally {
+ nativeStmt.endStream()
+ }
+
+ const result = (await promise) as number
+ if (pushError) throw pushError
+ return result
+ } catch (e) {
+ throw AdbcError.fromError(e)
+ } finally {
+ nativeStmt.close()
+ }
+ }
+
async execute(sql: string, params?: Table): Promise<number> {
const stmt = await this.createStatement()
try {
@@ -337,6 +377,32 @@ export class AdbcStatement implements
AdbcStatementInterface {
}
}
+ async bindStream(reader: RecordBatchReader): Promise<void> {
+ try {
+ await reader.open()
+ const schemaTable = new Table(reader.schema, [])
+ const schemaBytes = tableToIPC(schemaTable, 'stream')
+ const bindPromise = this._inner.startBindStream(Buffer.from(schemaBytes))
+
+ let pushError: unknown
+ try {
+ for await (const batch of reader) {
+ const batchBytes = tableToIPC(new Table([batch]), 'stream')
+ this._inner.pushBatch(Buffer.from(batchBytes))
+ }
+ } catch (e) {
+ pushError = e
+ } finally {
+ this._inner.endStream()
+ }
+
+ await bindPromise
+ if (pushError) throw pushError
+ } catch (e) {
+ throw AdbcError.fromError(e)
+ }
+ }
+
async close(): Promise<void> {
try {
await this._inner.close()
diff --git a/javascript/lib/types.ts b/javascript/lib/types.ts
index 59fb7487e..43b64856a 100644
--- a/javascript/lib/types.ts
+++ b/javascript/lib/types.ts
@@ -331,6 +331,20 @@ export interface AdbcConnection {
*/
ingest(tableName: string, data: Table, options?: IngestOptions):
Promise<number>
+ /**
+ * Ingest Arrow data from a stream into a database table.
+ *
+ * Unlike {@link ingest}, this method streams data batch-by-batch, avoiding
+ * full materialization in memory. Use this for large datasets that should
+ * not be buffered entirely.
+ *
+ * @param tableName The target table name.
+ * @param reader Arrow RecordBatchReader to stream.
+ * @param options Ingestion options (mode, catalog, dbSchema, temporary).
+ * @returns A Promise resolving to the number of rows ingested, or -1 if
unknown.
+ */
+ ingestStream(tableName: string, reader: RecordBatchReader, options?:
IngestOptions): Promise<number>
+
/**
* Execute a SQL statement (INSERT, UPDATE, DELETE, DDL) and return the row
count.
*
@@ -407,6 +421,16 @@ export interface AdbcStatement {
*/
bind(data: Table): Promise<void>
+ /**
+ * Bind a stream of data for ingestion or parameterized queries.
+ *
+ * Streams batches one at a time to the driver, avoiding full
+ * materialization of the reader in memory.
+ *
+ * @param reader Arrow RecordBatchReader to bind.
+ */
+ bindStream(reader: RecordBatchReader): Promise<void>
+
/**
* Close the statement and release resources.
*/
diff --git a/javascript/src/client.rs b/javascript/src/client.rs
index b6bc9ac1e..be3e627e4 100644
--- a/javascript/src/client.rs
+++ b/javascript/src/client.rs
@@ -17,6 +17,7 @@
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
+use std::sync::mpsc;
use std::sync::Mutex;
use adbc_core::{
@@ -30,6 +31,7 @@ use adbc_driver_manager::{ManagedConnection, ManagedDatabase,
ManagedDriver, Man
use arrow_array::RecordBatchReader;
use arrow_ipc::reader::StreamReader;
use arrow_ipc::writer::StreamWriter;
+use arrow_schema::SchemaRef;
#[derive(Debug, thiserror::Error)]
pub enum ClientError {
@@ -287,6 +289,26 @@ impl AdbcStatementCore {
self.inner.bind_stream(Box::new(reader))?;
Ok(())
}
+
+ pub fn bind_channel_stream(
+ &mut self,
+ schema_bytes: Vec<u8>,
+ receiver: mpsc::Receiver<Vec<u8>>,
+ ) -> Result<()> {
+ let reader = ChannelBatchReader::new(schema_bytes, receiver)?;
+ self.inner.bind_stream(Box::new(reader))?;
+ Ok(())
+ }
+
+ pub fn bind_stream_and_execute(
+ &mut self,
+ schema_bytes: Vec<u8>,
+ receiver: mpsc::Receiver<Vec<u8>>,
+ ) -> Result<i64> {
+ self.bind_channel_stream(schema_bytes, receiver)?;
+ let rows = self.inner.execute_update()?;
+ Ok(rows.unwrap_or(-1))
+ }
}
pub struct AdbcResultIteratorCore {
@@ -312,6 +334,46 @@ impl AdbcResultIteratorCore {
}
}
+/// A `RecordBatchReader` backed by a channel. Batches arrive as IPC bytes
+/// from the JS main thread and are deserialized on demand by the thread pool.
+pub struct ChannelBatchReader {
+ schema: SchemaRef,
+ receiver: mpsc::Receiver<Vec<u8>>,
+}
+
+impl ChannelBatchReader {
+ pub fn new(schema_bytes: Vec<u8>, receiver: mpsc::Receiver<Vec<u8>>) ->
Result<Self> {
+ let ipc_reader = StreamReader::try_new(std::io::Cursor::new(schema_bytes),
None)
+ .map_err(ClientError::Arrow)?;
+ let schema = ipc_reader.schema();
+ Ok(Self { schema, receiver })
+ }
+}
+
+impl Iterator for ChannelBatchReader {
+ type Item = std::result::Result<arrow_array::RecordBatch,
arrow_schema::ArrowError>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ let bytes = self.receiver.recv().ok()?;
+ let mut ipc_reader = match
StreamReader::try_new(std::io::Cursor::new(bytes), None) {
+ Ok(r) => r,
+ Err(e) => return Some(Err(e)),
+ };
+ match ipc_reader.next() {
+ Some(result) => Some(result),
+ None => Some(Err(arrow_schema::ArrowError::IpcError(
+ "Received IPC stream with no record batches".to_string(),
+ ))),
+ }
+ }
+}
+
+impl RecordBatchReader for ChannelBatchReader {
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+}
+
fn map_database_options(
opts: HashMap<String, String>,
) -> impl Iterator<Item = (OptionDatabase, OptionValue)> {
diff --git a/javascript/src/lib.rs b/javascript/src/lib.rs
index 94d5bc7a8..136e9292a 100644
--- a/javascript/src/lib.rs
+++ b/javascript/src/lib.rs
@@ -375,6 +375,7 @@ impl Task for CreateStatementTask {
fn resolve(&mut self, _env: Env, output: Self::Output) ->
Result<Self::JsValue> {
Ok(_NativeAdbcStatement {
inner: Some(Arc::new(Mutex::new(output))),
+ stream_sender: None,
})
}
@@ -536,6 +537,7 @@ impl Task for RollbackTask {
#[napi]
pub struct _NativeAdbcStatement {
inner: Option<Arc<Mutex<CoreStatement>>>,
+ stream_sender: Option<std::sync::mpsc::Sender<Vec<u8>>>,
}
#[napi]
@@ -590,11 +592,127 @@ impl _NativeAdbcStatement {
}))
}
+ #[napi]
+ pub fn start_bind_stream_execute(
+ &mut self,
+ schema_bytes: Buffer,
+ ) -> Result<AsyncTask<BindStreamExecuteTask>> {
+ let mutex = self.inner.as_ref().ok_or_else(closed_err)?;
+ // Unbounded channel: a bounded channel would block the JS main thread.
+ let (sender, receiver) = std::sync::mpsc::channel::<Vec<u8>>();
+ self.stream_sender = Some(sender);
+ Ok(AsyncTask::new(BindStreamExecuteTask {
+ statement: mutex.clone(),
+ schema_bytes: schema_bytes.to_vec(),
+ receiver: Some(receiver),
+ adbc_err: None,
+ }))
+ }
+
+ #[napi]
+ pub fn start_bind_stream(&mut self, schema_bytes: Buffer) ->
Result<AsyncTask<BindStreamTask>> {
+ let mutex = self.inner.as_ref().ok_or_else(closed_err)?;
+ // Unbounded channel: a bounded channel would block the JS main thread.
+ let (sender, receiver) = std::sync::mpsc::channel::<Vec<u8>>();
+ self.stream_sender = Some(sender);
+ Ok(AsyncTask::new(BindStreamTask {
+ statement: mutex.clone(),
+ schema_bytes: schema_bytes.to_vec(),
+ receiver: Some(receiver),
+ adbc_err: None,
+ }))
+ }
+
+ #[napi]
+ pub fn push_batch(&self, data: Buffer) -> Result<()> {
+ let sender = self
+ .stream_sender
+ .as_ref()
+ .ok_or_else(|| Error::new(Status::GenericFailure, "No active bind
stream"))?;
+ sender
+ .send(data.to_vec())
+ .map_err(|_| Error::new(Status::GenericFailure, "Stream channel
closed"))?;
+ Ok(())
+ }
+
+ #[napi]
+ pub fn end_stream(&mut self) -> Result<()> {
+ self.stream_sender.take();
+ Ok(())
+ }
+
#[napi]
pub fn close(&mut self) -> Result<()> {
self.inner.take();
+ self.stream_sender.take();
+ Ok(())
+ }
+}
+
+pub struct BindStreamExecuteTask {
+ statement: Arc<Mutex<CoreStatement>>,
+ schema_bytes: Vec<u8>,
+ receiver: Option<std::sync::mpsc::Receiver<Vec<u8>>>,
+ adbc_err: Option<adbc_core::error::Error>,
+}
+
+impl Task for BindStreamExecuteTask {
+ type Output = i64;
+ type JsValue = i64;
+
+ fn compute(&mut self) -> Result<Self::Output> {
+ let mut stmt = self
+ .statement
+ .lock()
+ .map_err(|e| Error::from_reason(e.to_string()))?;
+ let receiver = self.receiver.take().expect("compute called twice");
+ let schema_bytes = std::mem::take(&mut self.schema_bytes);
+ capture_adbc_err(
+ stmt.bind_stream_and_execute(schema_bytes, receiver),
+ &mut self.adbc_err,
+ )
+ }
+
+ fn resolve(&mut self, _env: Env, output: Self::Output) ->
Result<Self::JsValue> {
+ Ok(output)
+ }
+
+ fn reject(&mut self, env: Env, err: Error) -> Result<Self::JsValue> {
+ reject_adbc(env, &mut self.adbc_err, err)
+ }
+}
+
+pub struct BindStreamTask {
+ statement: Arc<Mutex<CoreStatement>>,
+ schema_bytes: Vec<u8>,
+ receiver: Option<std::sync::mpsc::Receiver<Vec<u8>>>,
+ adbc_err: Option<adbc_core::error::Error>,
+}
+
+impl Task for BindStreamTask {
+ type Output = ();
+ type JsValue = ();
+
+ fn compute(&mut self) -> Result<Self::Output> {
+ let mut stmt = self
+ .statement
+ .lock()
+ .map_err(|e| Error::from_reason(e.to_string()))?;
+ let receiver = self.receiver.take().expect("compute called twice");
+ let schema_bytes = std::mem::take(&mut self.schema_bytes);
+ capture_adbc_err(
+ stmt.bind_channel_stream(schema_bytes, receiver),
+ &mut self.adbc_err,
+ )
+ }
+
+ fn resolve(&mut self, _env: Env, _output: Self::Output) ->
Result<Self::JsValue> {
Ok(())
}
+
+ fn reject(&mut self, env: Env, err: Error) -> Result<Self::JsValue> {
+ reject_adbc(env, &mut self.adbc_err, err)
+ }
}
pub struct ExecuteQueryTask {