This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 74d26e22c feat(go/adbc/driver/flightsql): support bulk ingest (#3808)
74d26e22c is described below
commit 74d26e22c20451aab33e9c060b0baeccd384a0c1
Author: Philip Moore <[email protected]>
AuthorDate: Sun Jan 4 19:46:04 2026 -0500
feat(go/adbc/driver/flightsql): support bulk ingest (#3808)
Closes #1107.
---
.env | 3 +
.github/workflows/integration.yml | 10 +-
.github/workflows/native-unix.yml | 2 +-
compose.yaml | 21 ++
.../driver/flightsql/flightsql_adbc_server_test.go | 406 +++++++++++++++++++++
go/adbc/driver/flightsql/flightsql_adbc_test.go | 9 +-
go/adbc/driver/flightsql/flightsql_bulk_ingest.go | 146 ++++++++
go/adbc/driver/flightsql/flightsql_statement.go | 115 +++++-
python/adbc_driver_flightsql/tests/conftest.py | 52 +++
.../tests/test_bulk_ingest.py | 380 +++++++++++++++++++
10 files changed, 1133 insertions(+), 11 deletions(-)
diff --git a/.env b/.env
index ac0b0e631..b11f0f030 100644
--- a/.env
+++ b/.env
@@ -58,3 +58,6 @@ ADBC_JDBC_POSTGRESQL_DATABASE=postgres
ADBC_POSTGRESQL_TEST_URI="postgresql://localhost:5432/postgres?user=postgres&password=password"
ADBC_SQLITE_FLIGHTSQL_URI=grpc+tcp://localhost:8080
ADBC_TEST_FLIGHTSQL_URI=grpc+tcp://localhost:41414
+ADBC_GIZMOSQL_URI=grpc+tcp://localhost:31337
+ADBC_GIZMOSQL_USER=adbc_test_user
+ADBC_GIZMOSQL_PASSWORD=adbc_test_password
diff --git a/.github/workflows/integration.yml
b/.github/workflows/integration.yml
index aa4569717..36669f71b 100644
--- a/.github/workflows/integration.yml
+++ b/.github/workflows/integration.yml
@@ -101,7 +101,7 @@ jobs:
./ci/scripts/cpp_test.sh "$(pwd)/build"
flightsql:
- name: "FlightSQL Integration Tests (Dremio and SQLite)"
+ name: "FlightSQL Integration Tests (Dremio, SQLite, and GizmoSQL)"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
@@ -140,9 +140,9 @@ jobs:
- name: Work around ASAN issue (GH-1617)
run: |
sudo sysctl vm.mmap_rnd_bits=28
- - name: Start SQLite server and Dremio
+ - name: Start SQLite server, Dremio, and GizmoSQL
run: |
- docker compose up --detach flightsql-test flightsql-sqlite-test
dremio dremio-init
+ docker compose up --detach flightsql-test flightsql-sqlite-test
dremio dremio-init gizmosql-test
pip install python-dotenv[cli]
python -m dotenv -f .env list --format simple | tee -a $GITHUB_ENV
@@ -161,14 +161,14 @@ jobs:
run: |
./ci/scripts/cpp_build.sh "$(pwd)" "$(pwd)/build"
./ci/scripts/cpp_test.sh "$(pwd)/build"
- - name: Test Python Flight SQL driver against Dremio
+ - name: Test Python Flight SQL driver against Dremio and GizmoSQL
env:
BUILD_ALL: "0"
BUILD_DRIVER_FLIGHTSQL: "1"
PYTEST_ADDOPTS: "--error-for-skips"
run: |
./ci/scripts/python_test.sh "$(pwd)" "$(pwd)/build"
- - name: Stop SQLite server and Dremio
+ - name: Stop SQLite server, Dremio, and GizmoSQL
run: |
docker compose down
diff --git a/.github/workflows/native-unix.yml
b/.github/workflows/native-unix.yml
index ebbbbaa73..814e17654 100644
--- a/.github/workflows/native-unix.yml
+++ b/.github/workflows/native-unix.yml
@@ -728,7 +728,7 @@ jobs:
docs/source/python/recipe/*.py
- name: Test Recipes (Python)
run: |
- docker compose up --detach --wait dremio flightsql-sqlite-test
postgres-test
+ docker compose up --detach --wait dremio flightsql-sqlite-test
postgres-test gizmosql-test
docker compose run --rm dremio-init
export ADBC_CPP_RECIPE_BIN=~/local/bin
# Needed for the combined C++/Python driver example
diff --git a/compose.yaml b/compose.yaml
index 92e9af5ab..358fd392c 100644
--- a/compose.yaml
+++ b/compose.yaml
@@ -341,6 +341,27 @@ services:
- "50051:50051"
- "8090:8090"
+ gizmosql-test:
+ container_name: adbc_gizmosql_test
+ image: gizmodata/gizmosql:latest-slim
+ init: true
+ tty: true
+ environment:
+ GIZMOSQL_USERNAME: ${ADBC_GIZMOSQL_USER:-adbc_test_user}
+ GIZMOSQL_PASSWORD: ${ADBC_GIZMOSQL_PASSWORD:-adbc_test_password}
+ TLS_ENABLED: "0"
+ PRINT_QUERIES: "1"
+ DATABASE_FILENAME: "adbc_test.db"
+ GIZMOSQL_LOG_LEVEL: "DEBUG"
+ healthcheck:
+ test: ["CMD-SHELL", "bash -c '</dev/tcp/localhost/31337'"]
+ interval: 5s
+ timeout: 5s
+ retries: 10
+ start_period: 10s
+ ports:
+ - "31337:31337"
+
################################ Verification
################################
verify-all-ubuntu:
diff --git a/go/adbc/driver/flightsql/flightsql_adbc_server_test.go
b/go/adbc/driver/flightsql/flightsql_adbc_server_test.go
index 52a2c3e09..f9cdcdee8 100644
--- a/go/adbc/driver/flightsql/flightsql_adbc_server_test.go
+++ b/go/adbc/driver/flightsql/flightsql_adbc_server_test.go
@@ -37,6 +37,7 @@ import (
"net/http/httptest"
"net/textproto"
"os"
+ "slices"
"strconv"
"strings"
"sync"
@@ -221,6 +222,10 @@ func TestGetObjects(t *testing.T) {
suite.Run(t, &GetObjectsTests{})
}
+func TestBulkIngest(t *testing.T) {
+ suite.Run(t, &BulkIngestTests{})
+}
+
func TestOauth(t *testing.T) {
suite.Run(t, &OAuthTests{})
}
@@ -2692,3 +2697,404 @@ func (suite *GetObjectsTests)
TestMetadataGetObjectsColumnsXdbc() {
})
}
}
+
+// ---- Bulk Ingest Tests --------------------
+
+// BulkIngestTestServer implements a FlightSQL server that supports bulk
ingestion
+type BulkIngestTestServer struct {
+ flightsql.BaseServer
+
+ mu sync.Mutex
+ ingestedData []arrow.RecordBatch
+ ingestRequests []flightsql.StatementIngest
+}
+
+func (srv *BulkIngestTestServer) DoPutCommandStatementIngest(ctx
context.Context, cmd flightsql.StatementIngest, rdr flight.MessageReader)
(int64, error) {
+ srv.mu.Lock()
+ defer srv.mu.Unlock()
+
+ // Store the ingest request for validation
+ srv.ingestRequests = append(srv.ingestRequests, cmd)
+
+ var totalRows int64
+ for rdr.Next() {
+ rec := rdr.RecordBatch()
+ rec.Retain()
+ srv.ingestedData = append(srv.ingestedData, rec)
+ totalRows += rec.NumRows()
+ }
+
+ if err := rdr.Err(); err != nil {
+ return -1, err
+ }
+
+ return totalRows, nil
+}
+
+func (srv *BulkIngestTestServer) GetIngestedData() []arrow.RecordBatch {
+ srv.mu.Lock()
+ defer srv.mu.Unlock()
+ return slices.Clone(srv.ingestedData)
+}
+
+func (srv *BulkIngestTestServer) GetIngestRequests()
[]flightsql.StatementIngest {
+ srv.mu.Lock()
+ defer srv.mu.Unlock()
+ return slices.Clone(srv.ingestRequests)
+}
+
+func (srv *BulkIngestTestServer) Clear() {
+ srv.mu.Lock()
+ defer srv.mu.Unlock()
+ for _, rec := range srv.ingestedData {
+ rec.Release()
+ }
+ srv.ingestedData = nil
+ srv.ingestRequests = nil
+}
+
+type BulkIngestTests struct {
+ ServerBasedTests
+ server *BulkIngestTestServer
+}
+
+func (suite *BulkIngestTests) SetupSuite() {
+ suite.server = &BulkIngestTestServer{}
+ suite.DoSetupSuite(suite.server, nil, nil)
+}
+
+func (suite *BulkIngestTests) TearDownTest() {
+ suite.server.Clear()
+ suite.ServerBasedTests.TearDownTest()
+}
+
+func (suite *BulkIngestTests) TestBulkIngestCreate() {
+ stmt, err := suite.cnxn.NewStatement()
+ suite.Require().NoError(err)
+ defer validation.CheckedClose(suite.T(), stmt)
+
+ // Set up for bulk ingest
+ suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable,
"test_table"))
+ suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode,
adbc.OptionValueIngestModeCreate))
+
+ // Create sample data
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ {Name: "name", Type: arrow.BinaryTypes.String, Nullable: true},
+ }, nil)
+
+ bldr := array.NewRecordBuilder(memory.DefaultAllocator, schema)
+ defer bldr.Release()
+
+ bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
+ bldr.Field(1).(*array.StringBuilder).AppendValues([]string{"Alice",
"Bob", "Charlie"}, nil)
+
+ rec := bldr.NewRecordBatch()
+ defer rec.Release()
+
+ // Bind the data
+ suite.Require().NoError(stmt.Bind(context.Background(), rec))
+
+ // Execute the ingest
+ nRows, err := stmt.ExecuteUpdate(context.Background())
+ suite.Require().NoError(err)
+ suite.Equal(int64(3), nRows)
+
+ // Verify data was ingested
+ ingestedData := suite.server.GetIngestedData()
+ suite.Require().Len(ingestedData, 1)
+ suite.Equal(int64(3), ingestedData[0].NumRows())
+
+ // Verify request parameters
+ requests := suite.server.GetIngestRequests()
+ suite.Require().Len(requests, 1)
+ suite.Equal("test_table", requests[0].GetTable())
+}
+
+func (suite *BulkIngestTests) TestBulkIngestAppend() {
+ stmt, err := suite.cnxn.NewStatement()
+ suite.Require().NoError(err)
+ defer validation.CheckedClose(suite.T(), stmt)
+
+ // Set up for bulk ingest with append mode
+ suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable,
"existing_table"))
+ suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode,
adbc.OptionValueIngestModeAppend))
+
+ // Create sample data
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "value", Type: arrow.PrimitiveTypes.Float64, Nullable:
false},
+ }, nil)
+
+ bldr := array.NewRecordBuilder(memory.DefaultAllocator, schema)
+ defer bldr.Release()
+
+ bldr.Field(0).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2,
3.3, 4.4}, nil)
+
+ rec := bldr.NewRecordBatch()
+ defer rec.Release()
+
+ // Bind and execute
+ suite.Require().NoError(stmt.Bind(context.Background(), rec))
+ nRows, err := stmt.ExecuteUpdate(context.Background())
+ suite.Require().NoError(err)
+ suite.Equal(int64(4), nRows)
+
+ // Verify request parameters
+ requests := suite.server.GetIngestRequests()
+ suite.Require().Len(requests, 1)
+ suite.Equal("existing_table", requests[0].GetTable())
+
+ tableDefOpts := requests[0].GetTableDefinitionOptions()
+ suite.Require().NotNil(tableDefOpts)
+ // Append mode: fail if table doesn't exist, append if it does
+
suite.Equal(flightproto.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_FAIL,
tableDefOpts.IfNotExist)
+
suite.Equal(flightproto.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_APPEND,
tableDefOpts.IfExists)
+}
+
+func (suite *BulkIngestTests) TestBulkIngestReplace() {
+ stmt, err := suite.cnxn.NewStatement()
+ suite.Require().NoError(err)
+ defer validation.CheckedClose(suite.T(), stmt)
+
+ // Set up for bulk ingest with replace mode
+ suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable,
"replace_table"))
+ suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode,
adbc.OptionValueIngestModeReplace))
+
+ // Create sample data
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "col1", Type: arrow.PrimitiveTypes.Int32, Nullable:
false},
+ }, nil)
+
+ bldr := array.NewRecordBuilder(memory.DefaultAllocator, schema)
+ defer bldr.Release()
+
+ bldr.Field(0).(*array.Int32Builder).AppendValues([]int32{10, 20}, nil)
+
+ rec := bldr.NewRecordBatch()
+ defer rec.Release()
+
+ // Bind and execute
+ suite.Require().NoError(stmt.Bind(context.Background(), rec))
+ nRows, err := stmt.ExecuteUpdate(context.Background())
+ suite.Require().NoError(err)
+ suite.Equal(int64(2), nRows)
+
+ // Verify request parameters
+ requests := suite.server.GetIngestRequests()
+ suite.Require().Len(requests, 1)
+
+ tableDefOpts := requests[0].GetTableDefinitionOptions()
+ suite.Require().NotNil(tableDefOpts)
+ // Replace mode: create if doesn't exist, replace if it does
+
suite.Equal(flightproto.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_CREATE,
tableDefOpts.IfNotExist)
+
suite.Equal(flightproto.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_REPLACE,
tableDefOpts.IfExists)
+}
+
+func (suite *BulkIngestTests) TestBulkIngestCreateAppend() {
+ stmt, err := suite.cnxn.NewStatement()
+ suite.Require().NoError(err)
+ defer validation.CheckedClose(suite.T(), stmt)
+
+ // Set up for bulk ingest with create_append mode
+ suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable,
"create_append_table"))
+ suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode,
adbc.OptionValueIngestModeCreateAppend))
+
+ // Create sample data
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "data", Type: arrow.BinaryTypes.String, Nullable: false},
+ }, nil)
+
+ bldr := array.NewRecordBuilder(memory.DefaultAllocator, schema)
+ defer bldr.Release()
+
+ bldr.Field(0).(*array.StringBuilder).AppendValues([]string{"test1",
"test2"}, nil)
+
+ rec := bldr.NewRecordBatch()
+ defer rec.Release()
+
+ // Bind and execute
+ suite.Require().NoError(stmt.Bind(context.Background(), rec))
+ nRows, err := stmt.ExecuteUpdate(context.Background())
+ suite.Require().NoError(err)
+ suite.Equal(int64(2), nRows)
+
+ // Verify request parameters
+ requests := suite.server.GetIngestRequests()
+ suite.Require().Len(requests, 1)
+
+ tableDefOpts := requests[0].GetTableDefinitionOptions()
+ suite.Require().NotNil(tableDefOpts)
+ // CreateAppend mode: create if doesn't exist, append if it does
+
suite.Equal(flightproto.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_CREATE,
tableDefOpts.IfNotExist)
+
suite.Equal(flightproto.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_APPEND,
tableDefOpts.IfExists)
+}
+
+func (suite *BulkIngestTests) TestBulkIngestWithCatalogAndSchema() {
+ stmt, err := suite.cnxn.NewStatement()
+ suite.Require().NoError(err)
+ defer validation.CheckedClose(suite.T(), stmt)
+
+ // Set up for bulk ingest with catalog and schema
+ suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable,
"qualified_table"))
+ suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode,
adbc.OptionValueIngestModeCreate))
+
suite.Require().NoError(stmt.SetOption(adbc.OptionValueIngestTargetCatalog,
"my_catalog"))
+
suite.Require().NoError(stmt.SetOption(adbc.OptionValueIngestTargetDBSchema,
"my_schema"))
+
+ // Create sample data
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ }, nil)
+
+ bldr := array.NewRecordBuilder(memory.DefaultAllocator, schema)
+ defer bldr.Release()
+
+ bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{100}, nil)
+
+ rec := bldr.NewRecordBatch()
+ defer rec.Release()
+
+ // Bind and execute
+ suite.Require().NoError(stmt.Bind(context.Background(), rec))
+ nRows, err := stmt.ExecuteUpdate(context.Background())
+ suite.Require().NoError(err)
+ suite.Equal(int64(1), nRows)
+
+ // Verify request parameters
+ requests := suite.server.GetIngestRequests()
+ suite.Require().Len(requests, 1)
+ suite.Equal("qualified_table", requests[0].GetTable())
+ suite.Equal("my_catalog", requests[0].GetCatalog())
+ suite.Equal("my_schema", requests[0].GetSchema())
+}
+
+func (suite *BulkIngestTests) TestBulkIngestTemporary() {
+ stmt, err := suite.cnxn.NewStatement()
+ suite.Require().NoError(err)
+ defer validation.CheckedClose(suite.T(), stmt)
+
+ // Set up for temporary table ingest
+ suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable,
"temp_table"))
+ suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode,
adbc.OptionValueIngestModeCreate))
+ suite.Require().NoError(stmt.SetOption(adbc.OptionValueIngestTemporary,
adbc.OptionValueEnabled))
+
+ // Create sample data
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "temp_col", Type: arrow.PrimitiveTypes.Int32, Nullable:
false},
+ }, nil)
+
+ bldr := array.NewRecordBuilder(memory.DefaultAllocator, schema)
+ defer bldr.Release()
+
+ bldr.Field(0).(*array.Int32Builder).AppendValues([]int32{1}, nil)
+
+ rec := bldr.NewRecordBatch()
+ defer rec.Release()
+
+ // Bind and execute
+ suite.Require().NoError(stmt.Bind(context.Background(), rec))
+ nRows, err := stmt.ExecuteUpdate(context.Background())
+ suite.Require().NoError(err)
+ suite.Equal(int64(1), nRows)
+
+ // Verify request parameters
+ requests := suite.server.GetIngestRequests()
+ suite.Require().Len(requests, 1)
+ suite.True(requests[0].GetTemporary())
+}
+
+func (suite *BulkIngestTests) TestBulkIngestWithStream() {
+ stmt, err := suite.cnxn.NewStatement()
+ suite.Require().NoError(err)
+ defer validation.CheckedClose(suite.T(), stmt)
+
+ // Set up for bulk ingest
+ suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable,
"stream_table"))
+ suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode,
adbc.OptionValueIngestModeCreate))
+
+ // Create sample data with multiple batches
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "batch_id", Type: arrow.PrimitiveTypes.Int32, Nullable:
false},
+ }, nil)
+
+ bldr := array.NewRecordBuilder(memory.DefaultAllocator, schema)
+
+ bldr.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3}, nil)
+ rec1 := bldr.NewRecordBatch()
+
+ bldr.Field(0).(*array.Int32Builder).AppendValues([]int32{4, 5}, nil)
+ rec2 := bldr.NewRecordBatch()
+
+ bldr.Release()
+
+ // Create record reader from multiple batches
+ rdr, err := array.NewRecordReader(schema, []arrow.RecordBatch{rec1,
rec2})
+ suite.Require().NoError(err)
+
+ // Bind stream and execute
+ suite.Require().NoError(stmt.BindStream(context.Background(), rdr))
+ nRows, err := stmt.ExecuteUpdate(context.Background())
+ suite.Require().NoError(err)
+ suite.Equal(int64(5), nRows)
+
+ // Verify data was ingested
+ ingestedData := suite.server.GetIngestedData()
+ suite.Require().Len(ingestedData, 2)
+
+ var totalRows int64
+ for _, rec := range ingestedData {
+ totalRows += rec.NumRows()
+ }
+ suite.Equal(int64(5), totalRows)
+}
+
+func (suite *BulkIngestTests) TestBulkIngestWithoutBind() {
+ stmt, err := suite.cnxn.NewStatement()
+ suite.Require().NoError(err)
+ defer validation.CheckedClose(suite.T(), stmt)
+
+ // Set up for bulk ingest without binding data
+ suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable,
"no_data_table"))
+ suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode,
adbc.OptionValueIngestModeCreate))
+
+ // Try to execute without binding - should fail
+ _, err = stmt.ExecuteUpdate(context.Background())
+ suite.Require().Error(err)
+ suite.Contains(err.Error(), "must call Bind before bulk ingestion")
+}
+
+func (suite *BulkIngestTests) TestBulkIngestViaExecuteQuery() {
+ stmt, err := suite.cnxn.NewStatement()
+ suite.Require().NoError(err)
+ defer validation.CheckedClose(suite.T(), stmt)
+
+ // Set up for bulk ingest
+ suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable,
"query_table"))
+ suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode,
adbc.OptionValueIngestModeCreate))
+
+ // Create sample data
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ }, nil)
+
+ bldr := array.NewRecordBuilder(memory.DefaultAllocator, schema)
+ defer bldr.Release()
+
+ bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2}, nil)
+
+ rec := bldr.NewRecordBatch()
+ defer rec.Release()
+
+ // Bind the data
+ suite.Require().NoError(stmt.Bind(context.Background(), rec))
+
+ // Execute via ExecuteQuery (should work for ingest too)
+ rdr, nRows, err := stmt.ExecuteQuery(context.Background())
+ suite.Require().NoError(err)
+ suite.Nil(rdr) // No result set for ingest
+ suite.Equal(int64(2), nRows)
+
+ // Verify data was ingested
+ ingestedData := suite.server.GetIngestedData()
+ suite.Require().Len(ingestedData, 1)
+ suite.Equal(int64(2), ingestedData[0].NumRows())
+}
diff --git a/go/adbc/driver/flightsql/flightsql_adbc_test.go
b/go/adbc/driver/flightsql/flightsql_adbc_test.go
index d29a3f997..93d8b2b68 100644
--- a/go/adbc/driver/flightsql/flightsql_adbc_test.go
+++ b/go/adbc/driver/flightsql/flightsql_adbc_test.go
@@ -235,8 +235,13 @@ func (s *FlightSQLQuirks) DropTable(cnxn adbc.Connection,
tblname string) (err e
return err
}
-func (s *FlightSQLQuirks) Alloc() memory.Allocator { return s.mem }
-func (s *FlightSQLQuirks) BindParameter(_ int) string { return "?" }
+func (s *FlightSQLQuirks) Alloc() memory.Allocator { return s.mem }
+func (s *FlightSQLQuirks) BindParameter(_ int) string { return "?" }
+
+// SupportsBulkIngest returns false because the example SQLite test server does
+// not implement DoPutCommandStatementIngest. The driver itself supports bulk
+// ingest via FlightSQL's ExecuteIngest when connected to a server that
supports
+// it - see BulkIngestTestServer and TestBulkIngest for those tests.
func (s *FlightSQLQuirks) SupportsBulkIngest(string) bool { return false }
func (s *FlightSQLQuirks) SupportsConcurrentStatements() bool { return true }
func (s *FlightSQLQuirks) SupportsCurrentCatalogSchema() bool { return false }
diff --git a/go/adbc/driver/flightsql/flightsql_bulk_ingest.go
b/go/adbc/driver/flightsql/flightsql_bulk_ingest.go
new file mode 100644
index 000000000..9ae22ba84
--- /dev/null
+++ b/go/adbc/driver/flightsql/flightsql_bulk_ingest.go
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package flightsql
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/apache/arrow-adbc/go/adbc"
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/flight/flightsql"
+ pb "github.com/apache/arrow-go/v18/arrow/flight/gen/flight"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+)
+
+// ingestOptions holds the configuration for bulk ingestion operations.
+type ingestOptions struct {
+ targetTable string
+ mode string
+ catalog *string
+ dbSchema *string
+ temporary bool
+}
+
+// buildTableDefinitionOptions maps ADBC ingest modes to FlightSQL
TableDefinitionOptions.
+func buildTableDefinitionOptions(mode string)
*flightsql.TableDefinitionOptions {
+ opts := &flightsql.TableDefinitionOptions{}
+
+ switch mode {
+ case adbc.OptionValueIngestModeCreate:
+ // Create new table, fail if exists
+ opts.IfNotExist =
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_CREATE
+ opts.IfExists =
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_FAIL
+ case adbc.OptionValueIngestModeAppend:
+ // Append to existing table, fail if not exists
+ opts.IfNotExist =
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_FAIL
+ opts.IfExists =
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_APPEND
+ case adbc.OptionValueIngestModeReplace:
+ // Replace table if exists, create if not
+ opts.IfNotExist =
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_CREATE
+ opts.IfExists =
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_REPLACE
+ case adbc.OptionValueIngestModeCreateAppend:
+ // Create table if not exists, append if exists
+ opts.IfNotExist =
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_CREATE
+ opts.IfExists =
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_APPEND
+ default:
+ // Default to create mode
+ opts.IfNotExist =
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_CREATE
+ opts.IfExists =
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_FAIL
+ }
+
+ return opts
+}
+
+// buildExecuteIngestOpts creates ExecuteIngestOpts from ingestOptions.
+func buildExecuteIngestOpts(opts ingestOptions) *flightsql.ExecuteIngestOpts {
+ tableDefOpts := buildTableDefinitionOptions(opts.mode)
+
+ ingestOpts := &flightsql.ExecuteIngestOpts{
+ TableDefinitionOptions: tableDefOpts,
+ Table: opts.targetTable,
+ Temporary: opts.temporary,
+ }
+
+ if opts.catalog != nil {
+ ingestOpts.Catalog = opts.catalog
+ }
+ if opts.dbSchema != nil {
+ ingestOpts.Schema = opts.dbSchema
+ }
+
+ return ingestOpts
+}
+
+// createRecordReaderFromBatch converts a single record batch to a
RecordReader.
+func createRecordReaderFromBatch(batch arrow.RecordBatch) (array.RecordReader,
error) {
+ rdr, err := array.NewRecordReader(batch.Schema(),
[]arrow.RecordBatch{batch})
+ if err != nil {
+ return nil, adbc.Error{
+ Msg: fmt.Sprintf("[Flight SQL Statement] failed to
create record reader: %s", err),
+ Code: adbc.StatusInternal,
+ }
+ }
+ return rdr, nil
+}
+
+// executeIngest performs bulk ingestion using the FlightSQL client's
ExecuteIngest method.
+// This is called from the statement when a target table has been set for bulk
ingest.
+func (s *statement) executeIngest(ctx context.Context) (int64, error) {
+ if s.streamBind == nil && s.bound == nil {
+ return -1, adbc.Error{
+ Msg: "[Flight SQL Statement] must call Bind before
bulk ingestion",
+ Code: adbc.StatusInvalidState,
+ }
+ }
+
+ opts := ingestOptions{
+ targetTable: s.targetTable,
+ mode: s.ingestMode,
+ catalog: s.catalog,
+ dbSchema: s.dbSchema,
+ temporary: s.temporary,
+ }
+
+ ingestOpts := buildExecuteIngestOpts(opts)
+
+ // Get the record reader to ingest
+ var rdr array.RecordReader
+ var err error
+ if s.streamBind != nil {
+ rdr = s.streamBind
+ } else {
+ rdr, err = createRecordReaderFromBatch(s.bound)
+ if err != nil {
+ return -1, err
+ }
+ }
+
+ ctx = metadata.NewOutgoingContext(ctx, s.hdrs)
+ var header, trailer metadata.MD
+ callOpts := append([]grpc.CallOption{}, grpc.Header(&header),
grpc.Trailer(&trailer), s.timeouts)
+
+ nRows, err := s.cnxn.cl.ExecuteIngest(ctx, rdr, ingestOpts, callOpts...)
+ if err != nil {
+ return -1, adbcFromFlightStatusWithDetails(err, header,
trailer, "ExecuteIngest")
+ }
+
+ return nRows, nil
+}
diff --git a/go/adbc/driver/flightsql/flightsql_statement.go
b/go/adbc/driver/flightsql/flightsql_statement.go
index 30363af25..85f750a0c 100644
--- a/go/adbc/driver/flightsql/flightsql_statement.go
+++ b/go/adbc/driver/flightsql/flightsql_statement.go
@@ -168,6 +168,17 @@ type statement struct {
progress float64
// may seem redundant, but incrementalState isn't locked
lastInfo atomic.Pointer[flight.FlightInfo]
+
+ // Bulk ingest fields
+ targetTable string
+ ingestMode string
+ catalog *string
+ dbSchema *string
+ temporary bool
+
+ // Bound data for bulk ingest
+ bound arrow.RecordBatch
+ streamBind array.RecordReader
}
func (s *statement) closePreparedStatement() error {
@@ -215,6 +226,15 @@ func (s *statement) Close() (err error) {
}
}
+ if s.bound != nil {
+ s.bound.Release()
+ s.bound = nil
+ }
+ if s.streamBind != nil {
+ s.streamBind.Release()
+ s.streamBind = nil
+ }
+
s.clientCache = nil
s.cnxn = nil
@@ -360,6 +380,53 @@ func (s *statement) SetOption(key string, val string)
error {
Code: adbc.StatusInvalidArgument,
}
}
+ case adbc.OptionKeyIngestTargetTable:
+ if s.prepared != nil {
+ if err := s.closePreparedStatement(); err != nil {
+ return err
+ }
+ s.prepared = nil
+ }
+ s.query.sqlQuery = ""
+ s.query.substraitPlan = nil
+ s.targetTable = val
+ case adbc.OptionKeyIngestMode:
+ switch val {
+ case adbc.OptionValueIngestModeCreate,
+ adbc.OptionValueIngestModeAppend,
+ adbc.OptionValueIngestModeReplace,
+ adbc.OptionValueIngestModeCreateAppend:
+ s.ingestMode = val
+ default:
+ return adbc.Error{
+ Msg: fmt.Sprintf("[Flight SQL] Invalid ingest
mode '%s'", val),
+ Code: adbc.StatusInvalidArgument,
+ }
+ }
+ case adbc.OptionValueIngestTargetCatalog:
+ if val == "" {
+ s.catalog = nil
+ } else {
+ s.catalog = &val
+ }
+ case adbc.OptionValueIngestTargetDBSchema:
+ if val == "" {
+ s.dbSchema = nil
+ } else {
+ s.dbSchema = &val
+ }
+ case adbc.OptionValueIngestTemporary:
+ switch val {
+ case adbc.OptionValueEnabled:
+ s.temporary = true
+ case adbc.OptionValueDisabled:
+ s.temporary = false
+ default:
+ return adbc.Error{
+ Msg: fmt.Sprintf("[Flight SQL] Invalid
statement option value %s=%s", key, val),
+ Code: adbc.StatusInvalidArgument,
+ }
+ }
default:
return adbc.Error{
Msg: "[Flight SQL] Unknown statement option '" + key +
"'",
@@ -421,6 +488,7 @@ func (s *statement) SetSqlQuery(query string) error {
if err := s.clearIncrementalQuery(); err != nil {
return err
}
+ s.targetTable = ""
s.query.setSqlQuery(query)
return nil
}
@@ -435,6 +503,12 @@ func (s *statement) ExecuteQuery(ctx context.Context) (rdr
array.RecordReader, n
return nil, -1, err
}
+ // Handle bulk ingest
+ if s.targetTable != "" {
+ nrec, err = s.executeIngest(ctx)
+ return nil, nrec, err
+ }
+
ctx = metadata.NewOutgoingContext(ctx, s.hdrs)
var info *flight.FlightInfo
var header, trailer metadata.MD
@@ -461,6 +535,11 @@ func (s *statement) ExecuteUpdate(ctx context.Context) (n
int64, err error) {
return -1, err
}
+ // Handle bulk ingest
+ if s.targetTable != "" {
+ return s.executeIngest(ctx)
+ }
+
ctx = metadata.NewOutgoingContext(ctx, s.hdrs)
var header, trailer metadata.MD
opts := append([]grpc.CallOption{}, grpc.Header(&header),
grpc.Trailer(&trailer), s.timeouts)
@@ -521,11 +600,25 @@ func (s *statement) SetSubstraitPlan(plan []byte) error {
// but it may not do this until the statement is closed or another
// record is bound.
func (s *statement) Bind(_ context.Context, values arrow.RecordBatch) error {
- // TODO: handle bulk insert situation
+ // For bulk ingest, bind to the statement
+ if s.targetTable != "" {
+ if s.streamBind != nil {
+ s.streamBind.Release()
+ s.streamBind = nil
+ }
+ if s.bound != nil {
+ s.bound.Release()
+ }
+ s.bound = values
+ if s.bound != nil {
+ s.bound.Retain()
+ }
+ return nil
+ }
if s.prepared == nil {
return adbc.Error{
- Msg: "[Flight SQL Statement] must call Prepare before
calling Bind",
+ Msg: "[Flight SQL Statement] must call Prepare or set
IngestTargetTable before calling Bind",
Code: adbc.StatusInvalidState}
}
@@ -540,9 +633,25 @@ func (s *statement) Bind(_ context.Context, values
arrow.RecordBatch) error {
// The driver will call Release on the record reader, but may not do this
// until Close is called.
func (s *statement) BindStream(_ context.Context, stream array.RecordReader)
error {
+ // For bulk ingest, bind to the statement
+ if s.targetTable != "" {
+ if s.bound != nil {
+ s.bound.Release()
+ s.bound = nil
+ }
+ if s.streamBind != nil {
+ s.streamBind.Release()
+ }
+ s.streamBind = stream
+ if s.streamBind != nil {
+ s.streamBind.Retain()
+ }
+ return nil
+ }
+
if s.prepared == nil {
return adbc.Error{
- Msg: "[Flight SQL Statement] must call Prepare before
calling Bind",
+ Msg: "[Flight SQL Statement] must call Prepare or set
IngestTargetTable before calling Bind",
Code: adbc.StatusInvalidState}
}
diff --git a/python/adbc_driver_flightsql/tests/conftest.py
b/python/adbc_driver_flightsql/tests/conftest.py
index 96105a6f2..4c775d8e6 100644
--- a/python/adbc_driver_flightsql/tests/conftest.py
+++ b/python/adbc_driver_flightsql/tests/conftest.py
@@ -85,3 +85,55 @@ def test_dbapi():
autocommit=True,
) as conn:
yield conn
+
+
[email protected](scope="session")
+def gizmosql_uri() -> str:
+ uri = os.environ.get("ADBC_GIZMOSQL_URI")
+ if not uri:
+ pytest.skip("Set ADBC_GIZMOSQL_URI to run tests")
+ return uri
+
+
[email protected](scope="session")
+def gizmosql_user() -> str:
+ username = os.environ.get("ADBC_GIZMOSQL_USER")
+ if not username:
+ pytest.skip("Set ADBC_GIZMOSQL_USER to run tests")
+ return username
+
+
[email protected](scope="session")
+def gizmosql_pass() -> str:
+ password = os.environ.get("ADBC_GIZMOSQL_PASSWORD")
+ if not password:
+ pytest.skip("Set ADBC_GIZMOSQL_PASSWORD to run tests")
+ return password
+
+
[email protected]
+def gizmosql(gizmosql_uri, gizmosql_user, gizmosql_pass):
+ """Create a low-level ADBC connection to the GizmoSQL server."""
+ with adbc_driver_flightsql.connect(
+ gizmosql_uri,
+ db_kwargs={
+ adbc_driver_manager.DatabaseOptions.USERNAME.value: gizmosql_user,
+ adbc_driver_manager.DatabaseOptions.PASSWORD.value: gizmosql_pass,
+ },
+ ) as db:
+ with adbc_driver_manager.AdbcConnection(db) as conn:
+ yield conn
+
+
[email protected]
+def gizmosql_dbapi(gizmosql_uri, gizmosql_user, gizmosql_pass):
+ """Create a DBAPI (PEP 249) connection to the GizmoSQL server."""
+ with adbc_driver_flightsql.dbapi.connect(
+ gizmosql_uri,
+ db_kwargs={
+ adbc_driver_manager.DatabaseOptions.USERNAME.value: gizmosql_user,
+ adbc_driver_manager.DatabaseOptions.PASSWORD.value: gizmosql_pass,
+ },
+ autocommit=True,
+ ) as conn:
+ yield conn
diff --git a/python/adbc_driver_flightsql/tests/test_bulk_ingest.py
b/python/adbc_driver_flightsql/tests/test_bulk_ingest.py
new file mode 100644
index 000000000..1db468c50
--- /dev/null
+++ b/python/adbc_driver_flightsql/tests/test_bulk_ingest.py
@@ -0,0 +1,380 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Tests for bulk ingest functionality using GizmoSQL."""
+
+import pyarrow as pa
+
+import adbc_driver_manager
+
+# Aliases for long constant names to keep lines under 88 chars
+INGEST_TARGET_TABLE =
adbc_driver_manager.StatementOptions.INGEST_TARGET_TABLE.value
+INGEST_MODE = adbc_driver_manager.StatementOptions.INGEST_MODE.value
+
+
+def test_ingest_create(gizmosql):
+ """Test creating a new table via bulk ingest."""
+ table_name = "test_ingest_create"
+
+ # Create sample data
+ data = pa.table(
+ {
+ "id": pa.array([1, 2, 3], type=pa.int64()),
+ "name": pa.array(["Alice", "Bob", "Charlie"], type=pa.string()),
+ "value": pa.array([1.1, 2.2, 3.3], type=pa.float64()),
+ }
+ )
+
+ with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+ # Configure for bulk ingest - create mode
+ stmt.set_options(
+ **{
+ INGEST_TARGET_TABLE: table_name,
+ INGEST_MODE: "adbc.ingest.mode.create",
+ }
+ )
+
+ # Bind the data
+ stmt.bind_stream(data.to_reader())
+
+ # Execute the ingest
+ rows_affected = stmt.execute_update()
+ assert rows_affected == 3
+
+ # Verify the data was ingested by querying it back
+ with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+ stmt.set_sql_query(f"SELECT * FROM {table_name} ORDER BY id")
+ stream, _ = stmt.execute_query()
+ reader = pa.RecordBatchReader._import_from_c(stream.address)
+ result = reader.read_all()
+
+ assert result.num_rows == 3
+ assert result.column("id").to_pylist() == [1, 2, 3]
+ assert result.column("name").to_pylist() == ["Alice", "Bob", "Charlie"]
+
+
+def test_ingest_append(gizmosql):
+ """Test appending to an existing table via bulk ingest."""
+ table_name = "test_ingest_append"
+
+ # First create the table with initial data
+ initial_data = pa.table(
+ {
+ "id": pa.array([1, 2], type=pa.int64()),
+ "value": pa.array([10, 20], type=pa.int32()),
+ }
+ )
+
+ with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+ stmt.set_options(
+ **{
+ INGEST_TARGET_TABLE: table_name,
+ INGEST_MODE: "adbc.ingest.mode.create",
+ }
+ )
+ stmt.bind_stream(initial_data.to_reader())
+ stmt.execute_update()
+
+ # Now append more data
+ append_data = pa.table(
+ {
+ "id": pa.array([3, 4], type=pa.int64()),
+ "value": pa.array([30, 40], type=pa.int32()),
+ }
+ )
+
+ with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+ stmt.set_options(
+ **{
+ INGEST_TARGET_TABLE: table_name,
+ INGEST_MODE: "adbc.ingest.mode.append",
+ }
+ )
+ stmt.bind_stream(append_data.to_reader())
+ rows_affected = stmt.execute_update()
+ assert rows_affected == 2
+
+ # Verify all data is present
+ with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+ stmt.set_sql_query(f"SELECT COUNT(*) as cnt FROM {table_name}")
+ stream, _ = stmt.execute_query()
+ reader = pa.RecordBatchReader._import_from_c(stream.address)
+ result = reader.read_all()
+ assert result.column("cnt")[0].as_py() == 4
+
+
+def test_ingest_replace(gizmosql):
+ """Test replacing a table via bulk ingest."""
+ table_name = "test_ingest_replace"
+
+ # First create the table with initial data
+ initial_data = pa.table(
+ {
+ "id": pa.array([1, 2, 3], type=pa.int64()),
+ }
+ )
+
+ with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+ stmt.set_options(
+ **{
+ INGEST_TARGET_TABLE: table_name,
+ INGEST_MODE: "adbc.ingest.mode.create",
+ }
+ )
+ stmt.bind_stream(initial_data.to_reader())
+ stmt.execute_update()
+
+ # Now replace with new data
+ replace_data = pa.table(
+ {
+ "id": pa.array([100, 200], type=pa.int64()),
+ }
+ )
+
+ with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+ stmt.set_options(
+ **{
+ INGEST_TARGET_TABLE: table_name,
+ INGEST_MODE: "adbc.ingest.mode.replace",
+ }
+ )
+ stmt.bind_stream(replace_data.to_reader())
+ rows_affected = stmt.execute_update()
+ assert rows_affected == 2
+
+ # Verify only the new data is present
+ with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+ stmt.set_sql_query(f"SELECT * FROM {table_name} ORDER BY id")
+ stream, _ = stmt.execute_query()
+ reader = pa.RecordBatchReader._import_from_c(stream.address)
+ result = reader.read_all()
+ assert result.num_rows == 2
+ assert result.column("id").to_pylist() == [100, 200]
+
+
+def test_ingest_create_append(gizmosql):
+ """Test create_append mode - creates if not exists, appends if exists."""
+ table_name = "test_ingest_create_append"
+
+ data = pa.table(
+ {
+ "id": pa.array([1, 2], type=pa.int64()),
+ }
+ )
+
+ # First call should create the table
+ with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+ stmt.set_options(
+ **{
+ INGEST_TARGET_TABLE: table_name,
+ INGEST_MODE: "adbc.ingest.mode.create_append",
+ }
+ )
+ stmt.bind_stream(data.to_reader())
+ rows_affected = stmt.execute_update()
+ assert rows_affected == 2
+
+ # Second call should append
+ with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+ stmt.set_options(
+ **{
+ INGEST_TARGET_TABLE: table_name,
+ INGEST_MODE: "adbc.ingest.mode.create_append",
+ }
+ )
+ stmt.bind_stream(data.to_reader())
+ rows_affected = stmt.execute_update()
+ assert rows_affected == 2
+
+ # Verify we have 4 rows total
+ with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+ stmt.set_sql_query(f"SELECT COUNT(*) as cnt FROM {table_name}")
+ stream, _ = stmt.execute_query()
+ reader = pa.RecordBatchReader._import_from_c(stream.address)
+ result = reader.read_all()
+ assert result.column("cnt")[0].as_py() == 4
+
+
+def test_ingest_multiple_batches(gizmosql):
+ """Test ingesting data with multiple record batches."""
+ table_name = "test_ingest_multi_batch"
+
+ # Create data with multiple batches
+ schema = pa.schema(
+ [
+ ("id", pa.int64()),
+ ("value", pa.string()),
+ ]
+ )
+
+ batches = [
+ pa.record_batch(
+ [
+ pa.array([1, 2, 3], type=pa.int64()),
+ pa.array(["a", "b", "c"], type=pa.string()),
+ ],
+ schema=schema,
+ ),
+ pa.record_batch(
+ [
+ pa.array([4, 5], type=pa.int64()),
+ pa.array(["d", "e"], type=pa.string()),
+ ],
+ schema=schema,
+ ),
+ ]
+
+ reader = pa.RecordBatchReader.from_batches(schema, batches)
+
+ with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+ stmt.set_options(
+ **{
+ INGEST_TARGET_TABLE: table_name,
+ INGEST_MODE: "adbc.ingest.mode.create",
+ }
+ )
+ stmt.bind_stream(reader)
+ rows_affected = stmt.execute_update()
+ assert rows_affected == 5
+
+ # Verify all data was ingested
+ with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+ stmt.set_sql_query(f"SELECT COUNT(*) as cnt FROM {table_name}")
+ stream, _ = stmt.execute_query()
+ reader = pa.RecordBatchReader._import_from_c(stream.address)
+ result = reader.read_all()
+ assert result.column("cnt")[0].as_py() == 5
+
+
+def test_ingest_via_dbapi_with_reader(gizmosql_dbapi):
+ """Test bulk ingest using the DBAPI interface with RecordBatchReader."""
+ table_name = "test_ingest_dbapi_reader"
+
+ data = pa.table(
+ {
+ "id": pa.array([1, 2, 3], type=pa.int64()),
+ "name": pa.array(["x", "y", "z"], type=pa.string()),
+ }
+ )
+
+ # Test using the cursor's adbc_ingest method with explicit reader
+ with gizmosql_dbapi.cursor() as cur:
+ rows_affected = cur.adbc_ingest(table_name, data.to_reader())
+ assert rows_affected == 3
+
+ # Verify via DBAPI query
+ with gizmosql_dbapi.cursor() as cur:
+ cur.execute(f"SELECT COUNT(*) FROM {table_name}")
+ result = cur.fetchone()
+ assert result[0] == 3
+
+
+def test_ingest_via_dbapi_with_table(gizmosql_dbapi):
+ """Test bulk ingest using the DBAPI interface with Table directly."""
+ table_name = "test_ingest_dbapi_table"
+
+ data = pa.table(
+ {
+ "id": pa.array([1, 2, 3], type=pa.int64()),
+ "name": pa.array(["x", "y", "z"], type=pa.string()),
+ }
+ )
+
+ # Test using the cursor's adbc_ingest method with Table directly
+ with gizmosql_dbapi.cursor() as cur:
+ rows_affected = cur.adbc_ingest(table_name, data)
+ assert rows_affected == 3
+
+ # Verify via DBAPI query
+ with gizmosql_dbapi.cursor() as cur:
+ cur.execute(f"SELECT COUNT(*) FROM {table_name}")
+ result = cur.fetchone()
+ assert result[0] == 3
+
+
+def test_ingest_dbapi_modes(gizmosql_dbapi):
+ """Test different ingest modes via DBAPI (append, replace,
create_append)."""
+ table_name = "test_ingest_dbapi_modes"
+
+ # Create initial table
+ initial_data = pa.table(
+ {
+ "id": pa.array([1, 2], type=pa.int64()),
+ }
+ )
+
+ with gizmosql_dbapi.cursor() as cur:
+ rows_affected = cur.adbc_ingest(table_name, initial_data,
mode="create")
+ assert rows_affected == 2
+
+ # Test append mode
+ append_data = pa.table(
+ {
+ "id": pa.array([3, 4], type=pa.int64()),
+ }
+ )
+
+ with gizmosql_dbapi.cursor() as cur:
+ rows_affected = cur.adbc_ingest(table_name, append_data, mode="append")
+ assert rows_affected == 2
+
+ # Verify 4 rows total
+ with gizmosql_dbapi.cursor() as cur:
+ cur.execute(f"SELECT COUNT(*) FROM {table_name}")
+ assert cur.fetchone()[0] == 4
+
+ # Test replace mode
+ replace_data = pa.table(
+ {
+ "id": pa.array([100, 200, 300], type=pa.int64()),
+ }
+ )
+
+ with gizmosql_dbapi.cursor() as cur:
+ rows_affected = cur.adbc_ingest(table_name, replace_data,
mode="replace")
+ assert rows_affected == 3
+
+ # Verify 3 rows (replaced)
+ with gizmosql_dbapi.cursor() as cur:
+ cur.execute(f"SELECT COUNT(*) FROM {table_name}")
+ assert cur.fetchone()[0] == 3
+
+
+def test_ingest_dbapi_create_append(gizmosql_dbapi):
+ """Test create_append mode via DBAPI."""
+ table_name = "test_ingest_dbapi_create_append"
+
+ data = pa.table(
+ {
+ "id": pa.array([1, 2], type=pa.int64()),
+ }
+ )
+
+ # First call should create the table
+ with gizmosql_dbapi.cursor() as cur:
+ rows_affected = cur.adbc_ingest(table_name, data, mode="create_append")
+ assert rows_affected == 2
+
+ # Second call should append to existing table
+ with gizmosql_dbapi.cursor() as cur:
+ rows_affected = cur.adbc_ingest(table_name, data, mode="create_append")
+ assert rows_affected == 2
+
+ # Verify 4 rows total
+ with gizmosql_dbapi.cursor() as cur:
+ cur.execute(f"SELECT COUNT(*) FROM {table_name}")
+ assert cur.fetchone()[0] == 4