This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new d61179d70 feat(javascript): add support for streaming ingest (#4125)
d61179d70 is described below

commit d61179d702bb174017cf0a97f70407db33061378
Author: Kent Wu <[email protected]>
AuthorDate: Fri Mar 20 19:18:32 2026 -0400

    feat(javascript): add support for streaming ingest (#4125)
    
    Adds streaming ingest support to the Node.js ADBC driver manager.
    
    **`AdbcConnection`:**
    - `ingestStream(tableName, reader, options?)` — streams a
    `RecordBatchReader` into a table, handling the full ingest lifecycle
    
    **`AdbcStatement`:**
    - `bindStream(reader)` — streams a `RecordBatchReader` as bound
    parameters and leaves execution to the caller
    
    **Implementation:** `ingestStream` uses a Rust `mpsc` channel to bridge
    the JS push loop and the ADBC `execute_update` call running on the
    thread pool. JS serializes each batch to Arrow IPC and sends it to the
    `ChannelBatchReader` on the Rust side which deserializes it. The channel
    is unbounded to avoid blocking the JS event loop.
    
    **Test Plan**
    ```
    npm run test
    ```
    
    Closes #4117
---
 javascript/__test__/ingest.spec.ts |  97 +++++++++++++++++++++++++++++-
 javascript/binding.d.ts            |   4 ++
 javascript/lib/index.ts            |  88 +++++++++++++++++++++++----
 javascript/lib/types.ts            |  24 ++++++++
 javascript/src/client.rs           |  62 +++++++++++++++++++
 javascript/src/lib.rs              | 118 +++++++++++++++++++++++++++++++++++++
 6 files changed, 380 insertions(+), 13 deletions(-)

diff --git a/javascript/__test__/ingest.spec.ts 
b/javascript/__test__/ingest.spec.ts
index 2cfc8c907..4c13eee7d 100644
--- a/javascript/__test__/ingest.spec.ts
+++ b/javascript/__test__/ingest.spec.ts
@@ -18,8 +18,8 @@
 import { test, before, after } from 'node:test'
 import assert from 'node:assert/strict'
 import { createSqliteDatabase } from './test_utils'
-import { AdbcDatabase, AdbcConnection, IngestMode } from '../lib/index.js'
-import { tableFromArrays, Table } from 'apache-arrow'
+import { AdbcDatabase, AdbcConnection, AdbcError, IngestMode } from 
'../lib/index.js'
+import { tableFromArrays, Table, RecordBatchReader, tableToIPC } from 
'apache-arrow'
 
 let db: AdbcDatabase
 let conn: AdbcConnection
@@ -91,6 +91,99 @@ test('ingest: multi-batch table inserts all batches', async 
() => {
   assert.strictEqual(result.numRows, 2)
 })
 
+test('ingestStream: streams batches into a new table', async () => {
+  const data = tableFromArrays({ id: [1, 2, 3], name: ['alice', 'bob', 
'carol'] })
+  const reader = RecordBatchReader.from(tableToIPC(data, 'stream'))
+  const rowCount = await conn.ingestStream('ingest_stream_basic', reader)
+  assert.strictEqual(rowCount, 3)
+
+  const result = await conn.query('SELECT id, name FROM ingest_stream_basic 
ORDER BY id')
+  assert.strictEqual(result.numRows, 3)
+  assert.strictEqual(result.getChildAt(0)?.get(0), 1)
+  assert.strictEqual(result.getChildAt(1)?.get(2), 'carol')
+})
+
+test('ingestStream: streams multi-batch data', async () => {
+  const batch = tableFromArrays({ id: [1], name: ['alice'] }).batches[0]
+  const multiTable = new Table([batch, batch, batch])
+  const reader = RecordBatchReader.from(tableToIPC(multiTable, 'stream'))
+  const rowCount = await conn.ingestStream('ingest_stream_multi', reader)
+  assert.strictEqual(rowCount, 3)
+
+  const result = await conn.query('SELECT id FROM ingest_stream_multi')
+  assert.strictEqual(result.numRows, 3)
+})
+
+test('ingestStream: append mode with stream', async () => {
+  const initial = tableFromArrays({ id: [1] })
+  await conn.ingest('ingest_stream_append', initial)
+
+  const more = tableFromArrays({ id: [2] })
+  const reader = RecordBatchReader.from(tableToIPC(more, 'stream'))
+  const rowCount = await conn.ingestStream('ingest_stream_append', reader, {
+    mode: IngestMode.Append,
+  })
+  assert.strictEqual(rowCount, 1)
+
+  const result = await conn.query('SELECT id FROM ingest_stream_append ORDER 
BY id')
+  assert.strictEqual(result.numRows, 2)
+})
+
+test('ingestStream: empty reader creates table with no rows', async () => {
+  const empty = tableFromArrays({ id: [] as number[] })
+  const reader = RecordBatchReader.from(tableToIPC(empty, 'stream'))
+  const rowCount = await conn.ingestStream('ingest_stream_empty', reader)
+  assert.strictEqual(rowCount, 0)
+
+  const result = await conn.query('SELECT id FROM ingest_stream_empty')
+  assert.strictEqual(result.numRows, 0)
+})
+
+test('ingestStream: schema mismatch on append surfaces AdbcError', async () => 
{
+  const initial = tableFromArrays({ id: [1] })
+  await conn.ingest('ingest_stream_mismatch', initial)
+
+  const bad = tableFromArrays({ id: [2], extra: ['oops'] })
+  const reader = RecordBatchReader.from(tableToIPC(bad, 'stream'))
+  await assert.rejects(
+    () => conn.ingestStream('ingest_stream_mismatch', reader, { mode: 
IngestMode.Append }),
+    (e: unknown) => {
+      assert.ok(e instanceof AdbcError)
+      assert.match(e.message, /no column named extra/i)
+      return true
+    },
+  )
+})
+
+test('ingestStream: many small batches', async () => {
+  const oneBatch = tableFromArrays({ id: [1] }).batches[0]
+  const batches = Array.from({ length: 100 }, () => oneBatch)
+  const bigTable = new Table(batches)
+  const reader = RecordBatchReader.from(tableToIPC(bigTable, 'stream'))
+  const rowCount = await conn.ingestStream('ingest_stream_many', reader)
+  assert.strictEqual(rowCount, 100)
+
+  const result = await conn.query('SELECT count(*) as cnt FROM 
ingest_stream_many')
+  assert.strictEqual(result.getChildAt(0)?.get(0), 100n)
+})
+
+test('ingestStream: reader error mid-iteration propagates', async () => {
+  async function* failingGenerator() {
+    yield tableToIPC(tableFromArrays({ id: [1] }), 'stream')
+    throw new Error('reader exploded')
+  }
+  const reader = await RecordBatchReader.from(failingGenerator())
+
+  await assert.rejects(
+    () => conn.ingestStream('ingest_stream_fail', reader),
+    (e: unknown) => {
+      assert.ok(e instanceof Error)
+      assert.match(e.message, /reader exploded/)
+      return true
+    },
+  )
+})
+
 test('ingest: create_append mode creates table if not exists then appends', 
async () => {
   const data = tableFromArrays({ id: [1] })
   await conn.ingest('ingest_create_append', data, { mode: 
IngestMode.CreateAppend })
diff --git a/javascript/binding.d.ts b/javascript/binding.d.ts
index c771d3ea7..b6891e90a 100644
--- a/javascript/binding.d.ts
+++ b/javascript/binding.d.ts
@@ -32,6 +32,10 @@ export declare class NativeAdbcStatement {
   executeQuery(): Promise<unknown>
   executeUpdate(): Promise<unknown>
   bind(data: Buffer): Promise<unknown>
+  startBindStreamExecute(schemaBytes: Buffer): Promise<unknown>
+  startBindStream(schemaBytes: Buffer): Promise<unknown>
+  pushBatch(data: Buffer): void
+  endStream(): void
   close(): void
 }
 export type _NativeAdbcStatement = NativeAdbcStatement
diff --git a/javascript/lib/index.ts b/javascript/lib/index.ts
index baa84326a..ea35ac725 100644
--- a/javascript/lib/index.ts
+++ b/javascript/lib/index.ts
@@ -215,20 +215,28 @@ export class AdbcConnection implements 
AdbcConnectionInterface {
     }
   }
 
+  private setIngestOptions(
+    stmt: { setOption(key: string, value: string): void },
+    tableName: string,
+    options?: IngestOptions,
+  ): void {
+    stmt.setOption('adbc.ingest.target_table', tableName)
+    stmt.setOption('adbc.ingest.mode', options?.mode ?? IngestMode.Create)
+    if (options?.catalog !== undefined) {
+      stmt.setOption('adbc.ingest.target_catalog', options.catalog)
+    }
+    if (options?.dbSchema !== undefined) {
+      stmt.setOption('adbc.ingest.target_db_schema', options.dbSchema)
+    }
+    if (options?.temporary === true) {
+      stmt.setOption('adbc.ingest.temporary', 'true')
+    }
+  }
+
   async ingest(tableName: string, data: Table, options?: IngestOptions): 
Promise<number> {
     const stmt = await this.createStatement()
     try {
-      stmt.setOption('adbc.ingest.target_table', tableName)
-      stmt.setOption('adbc.ingest.mode', options?.mode ?? IngestMode.Create)
-      if (options?.catalog !== undefined) {
-        stmt.setOption('adbc.ingest.target_catalog', options.catalog)
-      }
-      if (options?.dbSchema !== undefined) {
-        stmt.setOption('adbc.ingest.target_db_schema', options.dbSchema)
-      }
-      if (options?.temporary === true) {
-        stmt.setOption('adbc.ingest.temporary', 'true')
-      }
+      this.setIngestOptions(stmt, tableName, options)
       await stmt.bind(data)
       return await stmt.executeUpdate()
     } finally {
@@ -236,6 +244,38 @@ export class AdbcConnection implements 
AdbcConnectionInterface {
     }
   }
 
+  async ingestStream(tableName: string, reader: RecordBatchReader, options?: 
IngestOptions): Promise<number> {
+    const nativeStmt = (await this._inner.createStatement()) as 
NativeAdbcStatement
+    try {
+      this.setIngestOptions(nativeStmt, tableName, options)
+
+      await reader.open()
+      const schemaTable = new Table(reader.schema, [])
+      const schemaBytes = tableToIPC(schemaTable, 'stream')
+      const promise = 
nativeStmt.startBindStreamExecute(Buffer.from(schemaBytes))
+
+      let pushError: unknown
+      try {
+        for await (const batch of reader) {
+          const batchBytes = tableToIPC(new Table([batch]), 'stream')
+          nativeStmt.pushBatch(Buffer.from(batchBytes))
+        }
+      } catch (e) {
+        pushError = e
+      } finally {
+        nativeStmt.endStream()
+      }
+
+      const result = (await promise) as number
+      if (pushError) throw pushError
+      return result
+    } catch (e) {
+      throw AdbcError.fromError(e)
+    } finally {
+      nativeStmt.close()
+    }
+  }
+
   async execute(sql: string, params?: Table): Promise<number> {
     const stmt = await this.createStatement()
     try {
@@ -337,6 +377,32 @@ export class AdbcStatement implements 
AdbcStatementInterface {
     }
   }
 
+  async bindStream(reader: RecordBatchReader): Promise<void> {
+    try {
+      await reader.open()
+      const schemaTable = new Table(reader.schema, [])
+      const schemaBytes = tableToIPC(schemaTable, 'stream')
+      const bindPromise = this._inner.startBindStream(Buffer.from(schemaBytes))
+
+      let pushError: unknown
+      try {
+        for await (const batch of reader) {
+          const batchBytes = tableToIPC(new Table([batch]), 'stream')
+          this._inner.pushBatch(Buffer.from(batchBytes))
+        }
+      } catch (e) {
+        pushError = e
+      } finally {
+        this._inner.endStream()
+      }
+
+      await bindPromise
+      if (pushError) throw pushError
+    } catch (e) {
+      throw AdbcError.fromError(e)
+    }
+  }
+
   async close(): Promise<void> {
     try {
       await this._inner.close()
diff --git a/javascript/lib/types.ts b/javascript/lib/types.ts
index 59fb7487e..43b64856a 100644
--- a/javascript/lib/types.ts
+++ b/javascript/lib/types.ts
@@ -331,6 +331,20 @@ export interface AdbcConnection {
    */
   ingest(tableName: string, data: Table, options?: IngestOptions): 
Promise<number>
 
+  /**
+   * Ingest Arrow data from a stream into a database table.
+   *
+   * Unlike {@link ingest}, this method streams data batch-by-batch, avoiding
+   * full materialization in memory. Use this for large datasets that should
+   * not be buffered entirely.
+   *
+   * @param tableName The target table name.
+   * @param reader Arrow RecordBatchReader to stream.
+   * @param options Ingestion options (mode, catalog, dbSchema, temporary).
+   * @returns A Promise resolving to the number of rows ingested, or -1 if 
unknown.
+   */
+  ingestStream(tableName: string, reader: RecordBatchReader, options?: 
IngestOptions): Promise<number>
+
   /**
    * Execute a SQL statement (INSERT, UPDATE, DELETE, DDL) and return the row 
count.
    *
@@ -407,6 +421,16 @@ export interface AdbcStatement {
    */
   bind(data: Table): Promise<void>
 
+  /**
+   * Bind a stream of data for ingestion or parameterized queries.
+   *
+   * Streams batches one at a time to the driver, avoiding full
+   * materialization of the reader in memory.
+   *
+   * @param reader Arrow RecordBatchReader to bind.
+   */
+  bindStream(reader: RecordBatchReader): Promise<void>
+
   /**
    * Close the statement and release resources.
    */
diff --git a/javascript/src/client.rs b/javascript/src/client.rs
index b6bc9ac1e..be3e627e4 100644
--- a/javascript/src/client.rs
+++ b/javascript/src/client.rs
@@ -17,6 +17,7 @@
 
 use std::collections::{HashMap, HashSet};
 use std::path::PathBuf;
+use std::sync::mpsc;
 use std::sync::Mutex;
 
 use adbc_core::{
@@ -30,6 +31,7 @@ use adbc_driver_manager::{ManagedConnection, ManagedDatabase, 
ManagedDriver, Man
 use arrow_array::RecordBatchReader;
 use arrow_ipc::reader::StreamReader;
 use arrow_ipc::writer::StreamWriter;
+use arrow_schema::SchemaRef;
 
 #[derive(Debug, thiserror::Error)]
 pub enum ClientError {
@@ -287,6 +289,26 @@ impl AdbcStatementCore {
     self.inner.bind_stream(Box::new(reader))?;
     Ok(())
   }
+
+  pub fn bind_channel_stream(
+    &mut self,
+    schema_bytes: Vec<u8>,
+    receiver: mpsc::Receiver<Vec<u8>>,
+  ) -> Result<()> {
+    let reader = ChannelBatchReader::new(schema_bytes, receiver)?;
+    self.inner.bind_stream(Box::new(reader))?;
+    Ok(())
+  }
+
+  pub fn bind_stream_and_execute(
+    &mut self,
+    schema_bytes: Vec<u8>,
+    receiver: mpsc::Receiver<Vec<u8>>,
+  ) -> Result<i64> {
+    self.bind_channel_stream(schema_bytes, receiver)?;
+    let rows = self.inner.execute_update()?;
+    Ok(rows.unwrap_or(-1))
+  }
 }
 
 pub struct AdbcResultIteratorCore {
@@ -312,6 +334,46 @@ impl AdbcResultIteratorCore {
   }
 }
 
+/// A `RecordBatchReader` backed by a channel. Batches arrive as IPC bytes
+/// from the JS main thread and are deserialized on demand by the thread pool.
+pub struct ChannelBatchReader {
+  schema: SchemaRef,
+  receiver: mpsc::Receiver<Vec<u8>>,
+}
+
+impl ChannelBatchReader {
+  pub fn new(schema_bytes: Vec<u8>, receiver: mpsc::Receiver<Vec<u8>>) -> 
Result<Self> {
+    let ipc_reader = StreamReader::try_new(std::io::Cursor::new(schema_bytes), 
None)
+      .map_err(ClientError::Arrow)?;
+    let schema = ipc_reader.schema();
+    Ok(Self { schema, receiver })
+  }
+}
+
+impl Iterator for ChannelBatchReader {
+  type Item = std::result::Result<arrow_array::RecordBatch, 
arrow_schema::ArrowError>;
+
+  fn next(&mut self) -> Option<Self::Item> {
+    let bytes = self.receiver.recv().ok()?;
+    let mut ipc_reader = match 
StreamReader::try_new(std::io::Cursor::new(bytes), None) {
+      Ok(r) => r,
+      Err(e) => return Some(Err(e)),
+    };
+    match ipc_reader.next() {
+      Some(result) => Some(result),
+      None => Some(Err(arrow_schema::ArrowError::IpcError(
+        "Received IPC stream with no record batches".to_string(),
+      ))),
+    }
+  }
+}
+
+impl RecordBatchReader for ChannelBatchReader {
+  fn schema(&self) -> SchemaRef {
+    self.schema.clone()
+  }
+}
+
 fn map_database_options(
   opts: HashMap<String, String>,
 ) -> impl Iterator<Item = (OptionDatabase, OptionValue)> {
diff --git a/javascript/src/lib.rs b/javascript/src/lib.rs
index 94d5bc7a8..136e9292a 100644
--- a/javascript/src/lib.rs
+++ b/javascript/src/lib.rs
@@ -375,6 +375,7 @@ impl Task for CreateStatementTask {
   fn resolve(&mut self, _env: Env, output: Self::Output) -> 
Result<Self::JsValue> {
     Ok(_NativeAdbcStatement {
       inner: Some(Arc::new(Mutex::new(output))),
+      stream_sender: None,
     })
   }
 
@@ -536,6 +537,7 @@ impl Task for RollbackTask {
 #[napi]
 pub struct _NativeAdbcStatement {
   inner: Option<Arc<Mutex<CoreStatement>>>,
+  stream_sender: Option<std::sync::mpsc::Sender<Vec<u8>>>,
 }
 
 #[napi]
@@ -590,11 +592,127 @@ impl _NativeAdbcStatement {
     }))
   }
 
+  #[napi]
+  pub fn start_bind_stream_execute(
+    &mut self,
+    schema_bytes: Buffer,
+  ) -> Result<AsyncTask<BindStreamExecuteTask>> {
+    let mutex = self.inner.as_ref().ok_or_else(closed_err)?;
+    // Unbounded channel: a bounded channel would block the JS main thread.
+    let (sender, receiver) = std::sync::mpsc::channel::<Vec<u8>>();
+    self.stream_sender = Some(sender);
+    Ok(AsyncTask::new(BindStreamExecuteTask {
+      statement: mutex.clone(),
+      schema_bytes: schema_bytes.to_vec(),
+      receiver: Some(receiver),
+      adbc_err: None,
+    }))
+  }
+
+  #[napi]
+  pub fn start_bind_stream(&mut self, schema_bytes: Buffer) -> 
Result<AsyncTask<BindStreamTask>> {
+    let mutex = self.inner.as_ref().ok_or_else(closed_err)?;
+    // Unbounded channel: a bounded channel would block the JS main thread.
+    let (sender, receiver) = std::sync::mpsc::channel::<Vec<u8>>();
+    self.stream_sender = Some(sender);
+    Ok(AsyncTask::new(BindStreamTask {
+      statement: mutex.clone(),
+      schema_bytes: schema_bytes.to_vec(),
+      receiver: Some(receiver),
+      adbc_err: None,
+    }))
+  }
+
+  #[napi]
+  pub fn push_batch(&self, data: Buffer) -> Result<()> {
+    let sender = self
+      .stream_sender
+      .as_ref()
+      .ok_or_else(|| Error::new(Status::GenericFailure, "No active bind 
stream"))?;
+    sender
+      .send(data.to_vec())
+      .map_err(|_| Error::new(Status::GenericFailure, "Stream channel 
closed"))?;
+    Ok(())
+  }
+
+  #[napi]
+  pub fn end_stream(&mut self) -> Result<()> {
+    self.stream_sender.take();
+    Ok(())
+  }
+
   #[napi]
   pub fn close(&mut self) -> Result<()> {
     self.inner.take();
+    self.stream_sender.take();
+    Ok(())
+  }
+}
+
+pub struct BindStreamExecuteTask {
+  statement: Arc<Mutex<CoreStatement>>,
+  schema_bytes: Vec<u8>,
+  receiver: Option<std::sync::mpsc::Receiver<Vec<u8>>>,
+  adbc_err: Option<adbc_core::error::Error>,
+}
+
+impl Task for BindStreamExecuteTask {
+  type Output = i64;
+  type JsValue = i64;
+
+  fn compute(&mut self) -> Result<Self::Output> {
+    let mut stmt = self
+      .statement
+      .lock()
+      .map_err(|e| Error::from_reason(e.to_string()))?;
+    let receiver = self.receiver.take().expect("compute called twice");
+    let schema_bytes = std::mem::take(&mut self.schema_bytes);
+    capture_adbc_err(
+      stmt.bind_stream_and_execute(schema_bytes, receiver),
+      &mut self.adbc_err,
+    )
+  }
+
+  fn resolve(&mut self, _env: Env, output: Self::Output) -> 
Result<Self::JsValue> {
+    Ok(output)
+  }
+
+  fn reject(&mut self, env: Env, err: Error) -> Result<Self::JsValue> {
+    reject_adbc(env, &mut self.adbc_err, err)
+  }
+}
+
+pub struct BindStreamTask {
+  statement: Arc<Mutex<CoreStatement>>,
+  schema_bytes: Vec<u8>,
+  receiver: Option<std::sync::mpsc::Receiver<Vec<u8>>>,
+  adbc_err: Option<adbc_core::error::Error>,
+}
+
+impl Task for BindStreamTask {
+  type Output = ();
+  type JsValue = ();
+
+  fn compute(&mut self) -> Result<Self::Output> {
+    let mut stmt = self
+      .statement
+      .lock()
+      .map_err(|e| Error::from_reason(e.to_string()))?;
+    let receiver = self.receiver.take().expect("compute called twice");
+    let schema_bytes = std::mem::take(&mut self.schema_bytes);
+    capture_adbc_err(
+      stmt.bind_channel_stream(schema_bytes, receiver),
+      &mut self.adbc_err,
+    )
+  }
+
+  fn resolve(&mut self, _env: Env, _output: Self::Output) -> 
Result<Self::JsValue> {
     Ok(())
   }
+
+  fn reject(&mut self, env: Env, err: Error) -> Result<Self::JsValue> {
+    reject_adbc(env, &mut self.adbc_err, err)
+  }
 }
 
 pub struct ExecuteQueryTask {

Reply via email to