This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 4096719cb feat: add convenience API for bulk ingest (#4116)
4096719cb is described below
commit 4096719cbf78a951d145605f54fb68485fd3d0bd
Author: Kent Wu <[email protected]>
AuthorDate: Tue Mar 17 21:36:37 2026 -0400
feat: add convenience API for bulk ingest (#4116)
This PR adds a higher-level API for bulk ingest on AdbcConnection,
wrapping the low-level statement option + bind + execute_update pattern.
```ts
await conn.ingest('my_table', data)
await conn.ingest('my_table', data, { mode: IngestMode.Append })
await conn.ingest('my_table', data, { mode: IngestMode.Replace, dbSchema:
'public' })
```
Also fixes the underlying `bind()` implementation to use
`AdbcStatementBindStream` instead of `AdbcStatementBind`, removing the
previous single-batch limitation. `bind()` now accepts `Tables` with any
number of batches.
Changes
- `AdbcConnection.ingest(tableName, data, options?)` convenience method
- `IngestMode` and `IngestOptions` exported types
- `AdbcStatement.bind()` now accepts `Table` only (was `RecordBatch |
Table`)
- `AdbcStatement.bind()` upgraded to use `AdbcStatementBindStream` in
the Rust layer, removing the single-batch limitation
**Test Plan**
```sh
npm test
```
closes #4100
---
javascript/__test__/bind.spec.ts | 37 +++++++++++---
javascript/__test__/ingest.spec.ts | 101 +++++++++++++++++++++++++++++++++++++
javascript/lib/index.ts | 48 ++++++++++--------
javascript/lib/types.ts | 56 ++++++++++++++++++--
javascript/src/client.rs | 18 +------
5 files changed, 213 insertions(+), 47 deletions(-)
diff --git a/javascript/__test__/bind.spec.ts b/javascript/__test__/bind.spec.ts
index 2599b5272..a4467ecd0 100644
--- a/javascript/__test__/bind.spec.ts
+++ b/javascript/__test__/bind.spec.ts
@@ -70,17 +70,40 @@ test('statement: bind and query data', async () => {
assert.strictEqual(rowCount, 1)
})
-test('statement: bind multi-batch table throws descriptive error', async () =>
{
- // Both batches must share the same schema instance for Table to accept them
+test('statement: bind empty table inserts 0 rows', async () => {
+ const empty = tableFromArrays({ id: [] as number[], name: [] as string[] })
+ assert.strictEqual(empty.numRows, 0)
+
+ const stmt2 = await conn.createStatement()
+ await stmt2.setSqlQuery('INSERT INTO bind_test (id, name) VALUES (?, ?)')
+ await stmt2.bind(empty)
+ const affected = await stmt2.executeUpdate()
+ assert.strictEqual(affected, 0)
+ await stmt2.close()
+})
+
+test('statement: bind single-batch table', async () => {
+ const data = tableFromArrays({ id: [100], name: ['single'] })
+ assert.strictEqual(data.batches.length, 1)
+
+ const stmt2 = await conn.createStatement()
+ await stmt2.setSqlQuery('INSERT INTO bind_test (id, name) VALUES (?, ?)')
+ await stmt2.bind(data)
+ const affected = await stmt2.executeUpdate()
+ assert.strictEqual(affected, 1)
+ await stmt2.close()
+})
+
+test('statement: bind multi-batch table', async () => {
const base = tableFromArrays({ id: [10], name: ['first'] })
const batch1 = base.batches[0]
- const batch2 = base.batches[0] // same schema, reused to construct a
multi-batch Table
- const multiTable = new Table([batch1, batch2])
+ const multiTable = new Table([batch1, batch1])
assert.strictEqual(multiTable.batches.length, 2)
const stmt2 = await conn.createStatement()
- const error = await stmt2.bind(multiTable).catch((e) => e)
- assert.ok(error instanceof Error)
- assert.match(error.message, /bind\(\).*batches|batches.*bind\(\)/i)
+ await stmt2.setSqlQuery('INSERT INTO bind_test (id, name) VALUES (?, ?)')
+ await stmt2.bind(multiTable)
+ const affected = await stmt2.executeUpdate()
+ assert.strictEqual(affected, 2)
await stmt2.close()
})
diff --git a/javascript/__test__/ingest.spec.ts
b/javascript/__test__/ingest.spec.ts
new file mode 100644
index 000000000..2cfc8c907
--- /dev/null
+++ b/javascript/__test__/ingest.spec.ts
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import { test, before, after } from 'node:test'
+import assert from 'node:assert/strict'
+import { createSqliteDatabase } from './test_utils'
+import { AdbcDatabase, AdbcConnection, IngestMode } from '../lib/index.js'
+import { tableFromArrays, Table } from 'apache-arrow'
+
+let db: AdbcDatabase
+let conn: AdbcConnection
+
+before(async () => {
+ db = await createSqliteDatabase()
+ conn = await db.connect()
+})
+
+after(async () => {
+ try {
+ await conn?.close()
+ await db?.close()
+ } catch {
+ // ignore
+ }
+})
+
+test('ingest: create mode inserts data into a new table', async () => {
+ const data = tableFromArrays({ id: [1, 2, 3], name: ['alice', 'bob',
'carol'] })
+ const rowCount = await conn.ingest('ingest_create', data)
+ assert.strictEqual(rowCount, 3)
+
+ const result = await conn.query('SELECT id, name FROM ingest_create ORDER BY
id')
+ assert.strictEqual(result.numRows, 3)
+ assert.strictEqual(result.getChildAt(0)?.get(0), 1)
+ assert.strictEqual(result.getChildAt(1)?.get(0), 'alice')
+})
+
+test('ingest: create mode fails if table already exists', async () => {
+ const data = tableFromArrays({ id: [1] })
+ await conn.ingest('ingest_create_dup', data)
+ await assert.rejects(() => conn.ingest('ingest_create_dup', data))
+})
+
+test('ingest: append mode adds rows to an existing table', async () => {
+ const initial = tableFromArrays({ id: [1], name: ['alice'] })
+ await conn.ingest('ingest_append', initial)
+
+ const more = tableFromArrays({ id: [2], name: ['bob'] })
+ const rowCount = await conn.ingest('ingest_append', more, { mode:
IngestMode.Append })
+ assert.strictEqual(rowCount, 1)
+
+ const result = await conn.query('SELECT id FROM ingest_append ORDER BY id')
+ assert.strictEqual(result.numRows, 2)
+})
+
+test('ingest: replace mode drops and recreates the table', async () => {
+ const initial = tableFromArrays({ id: [1, 2, 3] })
+ await conn.ingest('ingest_replace', initial)
+
+ const replacement = tableFromArrays({ id: [99] })
+ await conn.ingest('ingest_replace', replacement, { mode: IngestMode.Replace
})
+
+ const result = await conn.query('SELECT id FROM ingest_replace')
+ assert.strictEqual(result.numRows, 1)
+ assert.strictEqual(result.getChildAt(0)?.get(0), 99)
+})
+
+test('ingest: multi-batch table inserts all batches', async () => {
+ const batch = tableFromArrays({ id: [1], name: ['alice'] }).batches[0]
+ const data = new Table([batch, batch])
+ assert.strictEqual(data.batches.length, 2)
+
+ const rowCount = await conn.ingest('ingest_multi_batch', data)
+ assert.strictEqual(rowCount, 2)
+
+ const result = await conn.query('SELECT id FROM ingest_multi_batch')
+ assert.strictEqual(result.numRows, 2)
+})
+
+test('ingest: create_append mode creates table if not exists then appends',
async () => {
+ const data = tableFromArrays({ id: [1] })
+ await conn.ingest('ingest_create_append', data, { mode:
IngestMode.CreateAppend })
+ await conn.ingest('ingest_create_append', data, { mode:
IngestMode.CreateAppend })
+
+ const result = await conn.query('SELECT id FROM ingest_create_append')
+ assert.strictEqual(result.numRows, 2)
+})
diff --git a/javascript/lib/index.ts b/javascript/lib/index.ts
index 0215b104a..baa84326a 100644
--- a/javascript/lib/index.ts
+++ b/javascript/lib/index.ts
@@ -23,10 +23,11 @@ import type {
AdbcStatement as AdbcStatementInterface,
ConnectOptions,
GetObjectsOptions,
+ IngestOptions,
} from './types.js'
-import { LoadFlags, ObjectDepth, InfoCode } from './types.js'
+import { LoadFlags, ObjectDepth, InfoCode, IngestMode } from './types.js'
-import { RecordBatchReader, RecordBatch, Table, tableToIPC, Schema } from
'apache-arrow'
+import { RecordBatch, RecordBatchReader, Table, tableToIPC, Schema } from
'apache-arrow'
import { AdbcError } from './error.js'
// Safely define Symbol.asyncDispose for compatibility with Node.js
environments older than v21.
@@ -75,8 +76,8 @@ async function iteratorToReader(iterator: NativeIterator):
Promise<RecordBatchRe
}
// Export Options types, constants, and Error class
-export type { ConnectOptions, GetObjectsOptions }
-export { AdbcError, LoadFlags, ObjectDepth, InfoCode }
+export type { ConnectOptions, GetObjectsOptions, IngestOptions }
+export { AdbcError, LoadFlags, ObjectDepth, InfoCode, IngestMode }
/**
* Represents an ADBC Database.
@@ -214,6 +215,27 @@ export class AdbcConnection implements
AdbcConnectionInterface {
}
}
+ async ingest(tableName: string, data: Table, options?: IngestOptions):
Promise<number> {
+ const stmt = await this.createStatement()
+ try {
+ stmt.setOption('adbc.ingest.target_table', tableName)
+ stmt.setOption('adbc.ingest.mode', options?.mode ?? IngestMode.Create)
+ if (options?.catalog !== undefined) {
+ stmt.setOption('adbc.ingest.target_catalog', options.catalog)
+ }
+ if (options?.dbSchema !== undefined) {
+ stmt.setOption('adbc.ingest.target_db_schema', options.dbSchema)
+ }
+ if (options?.temporary === true) {
+ stmt.setOption('adbc.ingest.temporary', 'true')
+ }
+ await stmt.bind(data)
+ return await stmt.executeUpdate()
+ } finally {
+ await stmt.close()
+ }
+ }
+
async execute(sql: string, params?: Table): Promise<number> {
const stmt = await this.createStatement()
try {
@@ -306,23 +328,9 @@ export class AdbcStatement implements
AdbcStatementInterface {
}
}
- async bind(data: RecordBatch | Table): Promise<void> {
+ async bind(data: Table): Promise<void> {
try {
- let table: Table
- if (data instanceof Table) {
- table = data
- } else {
- table = new Table(data)
- }
-
- if (table.batches.length > 1) {
- throw new Error(
- `bind() requires a single-batch Table or RecordBatch, but received
${table.batches.length} batches. ` +
- `Concatenate the table into one batch first (e.g.
tableFromArrays(...)).`,
- )
- }
-
- const ipcBytes = tableToIPC(table, 'stream')
+ const ipcBytes = tableToIPC(data, 'stream')
await this._inner.bind(Buffer.from(ipcBytes))
} catch (e) {
throw AdbcError.fromError(e)
diff --git a/javascript/lib/types.ts b/javascript/lib/types.ts
index 524d98d73..59fb7487e 100644
--- a/javascript/lib/types.ts
+++ b/javascript/lib/types.ts
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-import { RecordBatch, RecordBatchReader, Table, Schema } from 'apache-arrow'
+import { RecordBatchReader, Table, Schema } from 'apache-arrow'
/**
* Bitmask flags controlling how the driver manager resolves a driver name.
@@ -144,6 +144,41 @@ export interface ConnectOptions {
databaseOptions?: Record<string, string>
}
+/**
+ * Ingestion modes for the `ingest` convenience method.
+ *
+ * These correspond to the `adbc.ingest.mode.*` option values in the ADBC spec.
+ *
+ * @example
+ * await conn.ingest('my_table', data, { mode: IngestMode.Append })
+ */
+export const IngestMode = {
+ /** Append to an existing table. Fails if the table does not exist. */
+ Append: 'adbc.ingest.mode.append',
+ /** Create a new table and insert. Fails if the table already exists. */
+ Create: 'adbc.ingest.mode.create',
+ /** Create the table if it does not exist, then append. */
+ CreateAppend: 'adbc.ingest.mode.create_append',
+ /** Drop the existing table (if any) and recreate it, then insert. */
+ Replace: 'adbc.ingest.mode.replace',
+} as const
+export type IngestMode = (typeof IngestMode)[keyof typeof IngestMode]
+
+/** Options for the `ingest` convenience method. */
+export interface IngestOptions {
+ /**
+ * How to handle an existing table.
+ * Defaults to {@link IngestMode.Create}.
+ */
+ mode?: IngestMode
+ /** The catalog to create/locate the target table in (optional). */
+ catalog?: string
+ /** The database schema to create/locate the target table in (optional). */
+ dbSchema?: string
+ /** Whether to ingest into a temporary table (optional). */
+ temporary?: boolean
+}
+
/** Options for getObjects metadata call. */
export interface GetObjectsOptions {
/**
@@ -282,6 +317,20 @@ export interface AdbcConnection {
*/
queryStream(sql: string, params?: Table): Promise<RecordBatchReader>
+ /**
+ * Ingest Arrow data into a database table.
+ *
+ * Convenience method that sets the ingestion options, binds the data, and
+ * calls executeUpdate. Depending on the driver, this can avoid per-row
+ * overhead compared to a prepare-bind-insert loop.
+ *
+ * @param tableName The target table name.
+ * @param data Arrow Table to ingest.
+ * @param options Ingestion options (mode, catalog, dbSchema, temporary).
+ * @returns A Promise resolving to the number of rows ingested, or -1 if
unknown.
+ */
+ ingest(tableName: string, data: Table, options?: IngestOptions):
Promise<number>
+
/**
* Execute a SQL statement (INSERT, UPDATE, DELETE, DDL) and return the row
count.
*
@@ -352,12 +401,11 @@ export interface AdbcStatement {
/**
* Bind parameters or data for ingestion.
*
- * This binds an Arrow RecordBatch or Table to the statement.
* This is used for bulk ingestion or parameterized queries.
*
- * @param data Arrow RecordBatch or Table containing the data to bind.
+ * @param data Arrow Table containing the data to bind.
*/
- bind(data: RecordBatch | Table): Promise<void>
+ bind(data: Table): Promise<void>
/**
* Close the statement and release resources.
diff --git a/javascript/src/client.rs b/javascript/src/client.rs
index 3bc2e8dbb..b6bc9ac1e 100644
--- a/javascript/src/client.rs
+++ b/javascript/src/client.rs
@@ -282,23 +282,9 @@ impl AdbcStatementCore {
}
pub fn bind(&mut self, c_data: Vec<u8>) -> Result<()> {
- let mut reader =
+ let reader =
StreamReader::try_new(std::io::Cursor::new(c_data),
None).map_err(ClientError::Arrow)?;
- let batch = match reader.next() {
- Some(Ok(b)) => b,
- Some(Err(e)) => return Err(ClientError::Arrow(e)),
- None => {
- return Err(ClientError::Other(
- "bind() received an empty record batch stream".to_string(),
- ))
- }
- };
- if reader.next().is_some() {
- return Err(ClientError::Other(
- "bind() received multiple record batches; concatenate into one batch
first".to_string(),
- ));
- }
- self.inner.bind(batch)?;
+ self.inner.bind_stream(Box::new(reader))?;
Ok(())
}
}