This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 74d26e22c feat(go/adbc/driver/flightsql): support bulk ingest (#3808)
74d26e22c is described below

commit 74d26e22c20451aab33e9c060b0baeccd384a0c1
Author: Philip Moore <[email protected]>
AuthorDate: Sun Jan 4 19:46:04 2026 -0500

    feat(go/adbc/driver/flightsql): support bulk ingest (#3808)
    
    Closes #1107.
---
 .env                                               |   3 +
 .github/workflows/integration.yml                  |  10 +-
 .github/workflows/native-unix.yml                  |   2 +-
 compose.yaml                                       |  21 ++
 .../driver/flightsql/flightsql_adbc_server_test.go | 406 +++++++++++++++++++++
 go/adbc/driver/flightsql/flightsql_adbc_test.go    |   9 +-
 go/adbc/driver/flightsql/flightsql_bulk_ingest.go  | 146 ++++++++
 go/adbc/driver/flightsql/flightsql_statement.go    | 115 +++++-
 python/adbc_driver_flightsql/tests/conftest.py     |  52 +++
 .../tests/test_bulk_ingest.py                      | 380 +++++++++++++++++++
 10 files changed, 1133 insertions(+), 11 deletions(-)

diff --git a/.env b/.env
index ac0b0e631..b11f0f030 100644
--- a/.env
+++ b/.env
@@ -58,3 +58,6 @@ ADBC_JDBC_POSTGRESQL_DATABASE=postgres
 
ADBC_POSTGRESQL_TEST_URI="postgresql://localhost:5432/postgres?user=postgres&password=password"
 ADBC_SQLITE_FLIGHTSQL_URI=grpc+tcp://localhost:8080
 ADBC_TEST_FLIGHTSQL_URI=grpc+tcp://localhost:41414
+ADBC_GIZMOSQL_URI=grpc+tcp://localhost:31337
+ADBC_GIZMOSQL_USER=adbc_test_user
+ADBC_GIZMOSQL_PASSWORD=adbc_test_password
diff --git a/.github/workflows/integration.yml 
b/.github/workflows/integration.yml
index aa4569717..36669f71b 100644
--- a/.github/workflows/integration.yml
+++ b/.github/workflows/integration.yml
@@ -101,7 +101,7 @@ jobs:
           ./ci/scripts/cpp_test.sh "$(pwd)/build"
 
   flightsql:
-    name: "FlightSQL Integration Tests (Dremio and SQLite)"
+    name: "FlightSQL Integration Tests (Dremio, SQLite, and GizmoSQL)"
     runs-on: ubuntu-latest
     steps:
       - uses: actions/checkout@v6
@@ -140,9 +140,9 @@ jobs:
       - name: Work around ASAN issue (GH-1617)
         run: |
           sudo sysctl vm.mmap_rnd_bits=28
-      - name: Start SQLite server and Dremio
+      - name: Start SQLite server, Dremio, and GizmoSQL
         run: |
-          docker compose up --detach flightsql-test flightsql-sqlite-test 
dremio dremio-init
+          docker compose up --detach flightsql-test flightsql-sqlite-test 
dremio dremio-init gizmosql-test
           pip install python-dotenv[cli]
           python -m dotenv -f .env list --format simple | tee -a $GITHUB_ENV
 
@@ -161,14 +161,14 @@ jobs:
         run: |
           ./ci/scripts/cpp_build.sh "$(pwd)" "$(pwd)/build"
           ./ci/scripts/cpp_test.sh "$(pwd)/build"
-      - name: Test Python Flight SQL driver against Dremio
+      - name: Test Python Flight SQL driver against Dremio and GizmoSQL
         env:
           BUILD_ALL: "0"
           BUILD_DRIVER_FLIGHTSQL: "1"
           PYTEST_ADDOPTS: "--error-for-skips"
         run: |
           ./ci/scripts/python_test.sh "$(pwd)" "$(pwd)/build"
-      - name: Stop SQLite server and Dremio
+      - name: Stop SQLite server, Dremio, and GizmoSQL
         run: |
           docker compose down
 
diff --git a/.github/workflows/native-unix.yml 
b/.github/workflows/native-unix.yml
index ebbbbaa73..814e17654 100644
--- a/.github/workflows/native-unix.yml
+++ b/.github/workflows/native-unix.yml
@@ -728,7 +728,7 @@ jobs:
             docs/source/python/recipe/*.py
       - name: Test Recipes (Python)
         run: |
-          docker compose up --detach --wait dremio flightsql-sqlite-test 
postgres-test
+          docker compose up --detach --wait dremio flightsql-sqlite-test 
postgres-test gizmosql-test
           docker compose run --rm dremio-init
           export ADBC_CPP_RECIPE_BIN=~/local/bin
           # Needed for the combined C++/Python driver example
diff --git a/compose.yaml b/compose.yaml
index 92e9af5ab..358fd392c 100644
--- a/compose.yaml
+++ b/compose.yaml
@@ -341,6 +341,27 @@ services:
       - "50051:50051"
       - "8090:8090"
 
+  gizmosql-test:
+    container_name: adbc_gizmosql_test
+    image: gizmodata/gizmosql:latest-slim
+    init: true
+    tty: true
+    environment:
+      GIZMOSQL_USERNAME: ${ADBC_GIZMOSQL_USER:-adbc_test_user}
+      GIZMOSQL_PASSWORD: ${ADBC_GIZMOSQL_PASSWORD:-adbc_test_password}
+      TLS_ENABLED: "0"
+      PRINT_QUERIES: "1"
+      DATABASE_FILENAME: "adbc_test.db"
+      GIZMOSQL_LOG_LEVEL: "DEBUG"
+    healthcheck:
+      test: ["CMD-SHELL", "bash -c '</dev/tcp/localhost/31337'"]
+      interval: 5s
+      timeout: 5s
+      retries: 10
+      start_period: 10s
+    ports:
+      - "31337:31337"
+
   ################################ Verification 
################################
 
   verify-all-ubuntu:
diff --git a/go/adbc/driver/flightsql/flightsql_adbc_server_test.go 
b/go/adbc/driver/flightsql/flightsql_adbc_server_test.go
index 52a2c3e09..f9cdcdee8 100644
--- a/go/adbc/driver/flightsql/flightsql_adbc_server_test.go
+++ b/go/adbc/driver/flightsql/flightsql_adbc_server_test.go
@@ -37,6 +37,7 @@ import (
        "net/http/httptest"
        "net/textproto"
        "os"
+       "slices"
        "strconv"
        "strings"
        "sync"
@@ -221,6 +222,10 @@ func TestGetObjects(t *testing.T) {
        suite.Run(t, &GetObjectsTests{})
 }
 
+func TestBulkIngest(t *testing.T) {
+       suite.Run(t, &BulkIngestTests{})
+}
+
 func TestOauth(t *testing.T) {
        suite.Run(t, &OAuthTests{})
 }
@@ -2692,3 +2697,404 @@ func (suite *GetObjectsTests) 
TestMetadataGetObjectsColumnsXdbc() {
                })
        }
 }
+
+// ---- Bulk Ingest Tests --------------------
+
+// BulkIngestTestServer implements a FlightSQL server that supports bulk 
ingestion
+type BulkIngestTestServer struct {
+       flightsql.BaseServer
+
+       mu             sync.Mutex
+       ingestedData   []arrow.RecordBatch
+       ingestRequests []flightsql.StatementIngest
+}
+
+func (srv *BulkIngestTestServer) DoPutCommandStatementIngest(ctx 
context.Context, cmd flightsql.StatementIngest, rdr flight.MessageReader) 
(int64, error) {
+       srv.mu.Lock()
+       defer srv.mu.Unlock()
+
+       // Store the ingest request for validation
+       srv.ingestRequests = append(srv.ingestRequests, cmd)
+
+       var totalRows int64
+       for rdr.Next() {
+               rec := rdr.RecordBatch()
+               rec.Retain()
+               srv.ingestedData = append(srv.ingestedData, rec)
+               totalRows += rec.NumRows()
+       }
+
+       if err := rdr.Err(); err != nil {
+               return -1, err
+       }
+
+       return totalRows, nil
+}
+
+func (srv *BulkIngestTestServer) GetIngestedData() []arrow.RecordBatch {
+       srv.mu.Lock()
+       defer srv.mu.Unlock()
+       return slices.Clone(srv.ingestedData)
+}
+
+func (srv *BulkIngestTestServer) GetIngestRequests() 
[]flightsql.StatementIngest {
+       srv.mu.Lock()
+       defer srv.mu.Unlock()
+       return slices.Clone(srv.ingestRequests)
+}
+
+func (srv *BulkIngestTestServer) Clear() {
+       srv.mu.Lock()
+       defer srv.mu.Unlock()
+       for _, rec := range srv.ingestedData {
+               rec.Release()
+       }
+       srv.ingestedData = nil
+       srv.ingestRequests = nil
+}
+
+type BulkIngestTests struct {
+       ServerBasedTests
+       server *BulkIngestTestServer
+}
+
+func (suite *BulkIngestTests) SetupSuite() {
+       suite.server = &BulkIngestTestServer{}
+       suite.DoSetupSuite(suite.server, nil, nil)
+}
+
+func (suite *BulkIngestTests) TearDownTest() {
+       suite.server.Clear()
+       suite.ServerBasedTests.TearDownTest()
+}
+
+func (suite *BulkIngestTests) TestBulkIngestCreate() {
+       stmt, err := suite.cnxn.NewStatement()
+       suite.Require().NoError(err)
+       defer validation.CheckedClose(suite.T(), stmt)
+
+       // Set up for bulk ingest
+       suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, 
"test_table"))
+       suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode, 
adbc.OptionValueIngestModeCreate))
+
+       // Create sample data
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: "name", Type: arrow.BinaryTypes.String, Nullable: true},
+       }, nil)
+
+       bldr := array.NewRecordBuilder(memory.DefaultAllocator, schema)
+       defer bldr.Release()
+
+       bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
+       bldr.Field(1).(*array.StringBuilder).AppendValues([]string{"Alice", 
"Bob", "Charlie"}, nil)
+
+       rec := bldr.NewRecordBatch()
+       defer rec.Release()
+
+       // Bind the data
+       suite.Require().NoError(stmt.Bind(context.Background(), rec))
+
+       // Execute the ingest
+       nRows, err := stmt.ExecuteUpdate(context.Background())
+       suite.Require().NoError(err)
+       suite.Equal(int64(3), nRows)
+
+       // Verify data was ingested
+       ingestedData := suite.server.GetIngestedData()
+       suite.Require().Len(ingestedData, 1)
+       suite.Equal(int64(3), ingestedData[0].NumRows())
+
+       // Verify request parameters
+       requests := suite.server.GetIngestRequests()
+       suite.Require().Len(requests, 1)
+       suite.Equal("test_table", requests[0].GetTable())
+}
+
+func (suite *BulkIngestTests) TestBulkIngestAppend() {
+       stmt, err := suite.cnxn.NewStatement()
+       suite.Require().NoError(err)
+       defer validation.CheckedClose(suite.T(), stmt)
+
+       // Set up for bulk ingest with append mode
+       suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, 
"existing_table"))
+       suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode, 
adbc.OptionValueIngestModeAppend))
+
+       // Create sample data
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "value", Type: arrow.PrimitiveTypes.Float64, Nullable: 
false},
+       }, nil)
+
+       bldr := array.NewRecordBuilder(memory.DefaultAllocator, schema)
+       defer bldr.Release()
+
+       bldr.Field(0).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2, 
3.3, 4.4}, nil)
+
+       rec := bldr.NewRecordBatch()
+       defer rec.Release()
+
+       // Bind and execute
+       suite.Require().NoError(stmt.Bind(context.Background(), rec))
+       nRows, err := stmt.ExecuteUpdate(context.Background())
+       suite.Require().NoError(err)
+       suite.Equal(int64(4), nRows)
+
+       // Verify request parameters
+       requests := suite.server.GetIngestRequests()
+       suite.Require().Len(requests, 1)
+       suite.Equal("existing_table", requests[0].GetTable())
+
+       tableDefOpts := requests[0].GetTableDefinitionOptions()
+       suite.Require().NotNil(tableDefOpts)
+       // Append mode: fail if table doesn't exist, append if it does
+       
suite.Equal(flightproto.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_FAIL,
 tableDefOpts.IfNotExist)
+       
suite.Equal(flightproto.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_APPEND,
 tableDefOpts.IfExists)
+}
+
+func (suite *BulkIngestTests) TestBulkIngestReplace() {
+       stmt, err := suite.cnxn.NewStatement()
+       suite.Require().NoError(err)
+       defer validation.CheckedClose(suite.T(), stmt)
+
+       // Set up for bulk ingest with replace mode
+       suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, 
"replace_table"))
+       suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode, 
adbc.OptionValueIngestModeReplace))
+
+       // Create sample data
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "col1", Type: arrow.PrimitiveTypes.Int32, Nullable: 
false},
+       }, nil)
+
+       bldr := array.NewRecordBuilder(memory.DefaultAllocator, schema)
+       defer bldr.Release()
+
+       bldr.Field(0).(*array.Int32Builder).AppendValues([]int32{10, 20}, nil)
+
+       rec := bldr.NewRecordBatch()
+       defer rec.Release()
+
+       // Bind and execute
+       suite.Require().NoError(stmt.Bind(context.Background(), rec))
+       nRows, err := stmt.ExecuteUpdate(context.Background())
+       suite.Require().NoError(err)
+       suite.Equal(int64(2), nRows)
+
+       // Verify request parameters
+       requests := suite.server.GetIngestRequests()
+       suite.Require().Len(requests, 1)
+
+       tableDefOpts := requests[0].GetTableDefinitionOptions()
+       suite.Require().NotNil(tableDefOpts)
+       // Replace mode: create if doesn't exist, replace if it does
+       
suite.Equal(flightproto.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_CREATE,
 tableDefOpts.IfNotExist)
+       
suite.Equal(flightproto.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_REPLACE,
 tableDefOpts.IfExists)
+}
+
+func (suite *BulkIngestTests) TestBulkIngestCreateAppend() {
+       stmt, err := suite.cnxn.NewStatement()
+       suite.Require().NoError(err)
+       defer validation.CheckedClose(suite.T(), stmt)
+
+       // Set up for bulk ingest with create_append mode
+       suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, 
"create_append_table"))
+       suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode, 
adbc.OptionValueIngestModeCreateAppend))
+
+       // Create sample data
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "data", Type: arrow.BinaryTypes.String, Nullable: false},
+       }, nil)
+
+       bldr := array.NewRecordBuilder(memory.DefaultAllocator, schema)
+       defer bldr.Release()
+
+       bldr.Field(0).(*array.StringBuilder).AppendValues([]string{"test1", 
"test2"}, nil)
+
+       rec := bldr.NewRecordBatch()
+       defer rec.Release()
+
+       // Bind and execute
+       suite.Require().NoError(stmt.Bind(context.Background(), rec))
+       nRows, err := stmt.ExecuteUpdate(context.Background())
+       suite.Require().NoError(err)
+       suite.Equal(int64(2), nRows)
+
+       // Verify request parameters
+       requests := suite.server.GetIngestRequests()
+       suite.Require().Len(requests, 1)
+
+       tableDefOpts := requests[0].GetTableDefinitionOptions()
+       suite.Require().NotNil(tableDefOpts)
+       // CreateAppend mode: create if doesn't exist, append if it does
+       
suite.Equal(flightproto.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_CREATE,
 tableDefOpts.IfNotExist)
+       
suite.Equal(flightproto.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_APPEND,
 tableDefOpts.IfExists)
+}
+
+func (suite *BulkIngestTests) TestBulkIngestWithCatalogAndSchema() {
+       stmt, err := suite.cnxn.NewStatement()
+       suite.Require().NoError(err)
+       defer validation.CheckedClose(suite.T(), stmt)
+
+       // Set up for bulk ingest with catalog and schema
+       suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, 
"qualified_table"))
+       suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode, 
adbc.OptionValueIngestModeCreate))
+       
suite.Require().NoError(stmt.SetOption(adbc.OptionValueIngestTargetCatalog, 
"my_catalog"))
+       
suite.Require().NoError(stmt.SetOption(adbc.OptionValueIngestTargetDBSchema, 
"my_schema"))
+
+       // Create sample data
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+       }, nil)
+
+       bldr := array.NewRecordBuilder(memory.DefaultAllocator, schema)
+       defer bldr.Release()
+
+       bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{100}, nil)
+
+       rec := bldr.NewRecordBatch()
+       defer rec.Release()
+
+       // Bind and execute
+       suite.Require().NoError(stmt.Bind(context.Background(), rec))
+       nRows, err := stmt.ExecuteUpdate(context.Background())
+       suite.Require().NoError(err)
+       suite.Equal(int64(1), nRows)
+
+       // Verify request parameters
+       requests := suite.server.GetIngestRequests()
+       suite.Require().Len(requests, 1)
+       suite.Equal("qualified_table", requests[0].GetTable())
+       suite.Equal("my_catalog", requests[0].GetCatalog())
+       suite.Equal("my_schema", requests[0].GetSchema())
+}
+
+func (suite *BulkIngestTests) TestBulkIngestTemporary() {
+       stmt, err := suite.cnxn.NewStatement()
+       suite.Require().NoError(err)
+       defer validation.CheckedClose(suite.T(), stmt)
+
+       // Set up for temporary table ingest
+       suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, 
"temp_table"))
+       suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode, 
adbc.OptionValueIngestModeCreate))
+       suite.Require().NoError(stmt.SetOption(adbc.OptionValueIngestTemporary, 
adbc.OptionValueEnabled))
+
+       // Create sample data
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "temp_col", Type: arrow.PrimitiveTypes.Int32, Nullable: 
false},
+       }, nil)
+
+       bldr := array.NewRecordBuilder(memory.DefaultAllocator, schema)
+       defer bldr.Release()
+
+       bldr.Field(0).(*array.Int32Builder).AppendValues([]int32{1}, nil)
+
+       rec := bldr.NewRecordBatch()
+       defer rec.Release()
+
+       // Bind and execute
+       suite.Require().NoError(stmt.Bind(context.Background(), rec))
+       nRows, err := stmt.ExecuteUpdate(context.Background())
+       suite.Require().NoError(err)
+       suite.Equal(int64(1), nRows)
+
+       // Verify request parameters
+       requests := suite.server.GetIngestRequests()
+       suite.Require().Len(requests, 1)
+       suite.True(requests[0].GetTemporary())
+}
+
+func (suite *BulkIngestTests) TestBulkIngestWithStream() {
+       stmt, err := suite.cnxn.NewStatement()
+       suite.Require().NoError(err)
+       defer validation.CheckedClose(suite.T(), stmt)
+
+       // Set up for bulk ingest
+       suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, 
"stream_table"))
+       suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode, 
adbc.OptionValueIngestModeCreate))
+
+       // Create sample data with multiple batches
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "batch_id", Type: arrow.PrimitiveTypes.Int32, Nullable: 
false},
+       }, nil)
+
+       bldr := array.NewRecordBuilder(memory.DefaultAllocator, schema)
+
+       bldr.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3}, nil)
+       rec1 := bldr.NewRecordBatch()
+
+       bldr.Field(0).(*array.Int32Builder).AppendValues([]int32{4, 5}, nil)
+       rec2 := bldr.NewRecordBatch()
+
+       bldr.Release()
+
+       // Create record reader from multiple batches
+       rdr, err := array.NewRecordReader(schema, []arrow.RecordBatch{rec1, 
rec2})
+       suite.Require().NoError(err)
+
+       // Bind stream and execute
+       suite.Require().NoError(stmt.BindStream(context.Background(), rdr))
+       nRows, err := stmt.ExecuteUpdate(context.Background())
+       suite.Require().NoError(err)
+       suite.Equal(int64(5), nRows)
+
+       // Verify data was ingested
+       ingestedData := suite.server.GetIngestedData()
+       suite.Require().Len(ingestedData, 2)
+
+       var totalRows int64
+       for _, rec := range ingestedData {
+               totalRows += rec.NumRows()
+       }
+       suite.Equal(int64(5), totalRows)
+}
+
+func (suite *BulkIngestTests) TestBulkIngestWithoutBind() {
+       stmt, err := suite.cnxn.NewStatement()
+       suite.Require().NoError(err)
+       defer validation.CheckedClose(suite.T(), stmt)
+
+       // Set up for bulk ingest without binding data
+       suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, 
"no_data_table"))
+       suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode, 
adbc.OptionValueIngestModeCreate))
+
+       // Try to execute without binding - should fail
+       _, err = stmt.ExecuteUpdate(context.Background())
+       suite.Require().Error(err)
+       suite.Contains(err.Error(), "must call Bind before bulk ingestion")
+}
+
+func (suite *BulkIngestTests) TestBulkIngestViaExecuteQuery() {
+       stmt, err := suite.cnxn.NewStatement()
+       suite.Require().NoError(err)
+       defer validation.CheckedClose(suite.T(), stmt)
+
+       // Set up for bulk ingest
+       suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, 
"query_table"))
+       suite.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode, 
adbc.OptionValueIngestModeCreate))
+
+       // Create sample data
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+       }, nil)
+
+       bldr := array.NewRecordBuilder(memory.DefaultAllocator, schema)
+       defer bldr.Release()
+
+       bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2}, nil)
+
+       rec := bldr.NewRecordBatch()
+       defer rec.Release()
+
+       // Bind the data
+       suite.Require().NoError(stmt.Bind(context.Background(), rec))
+
+       // Execute via ExecuteQuery (should work for ingest too)
+       rdr, nRows, err := stmt.ExecuteQuery(context.Background())
+       suite.Require().NoError(err)
+       suite.Nil(rdr) // No result set for ingest
+       suite.Equal(int64(2), nRows)
+
+       // Verify data was ingested
+       ingestedData := suite.server.GetIngestedData()
+       suite.Require().Len(ingestedData, 1)
+       suite.Equal(int64(2), ingestedData[0].NumRows())
+}
diff --git a/go/adbc/driver/flightsql/flightsql_adbc_test.go 
b/go/adbc/driver/flightsql/flightsql_adbc_test.go
index d29a3f997..93d8b2b68 100644
--- a/go/adbc/driver/flightsql/flightsql_adbc_test.go
+++ b/go/adbc/driver/flightsql/flightsql_adbc_test.go
@@ -235,8 +235,13 @@ func (s *FlightSQLQuirks) DropTable(cnxn adbc.Connection, 
tblname string) (err e
        return err
 }
 
-func (s *FlightSQLQuirks) Alloc() memory.Allocator            { return s.mem }
-func (s *FlightSQLQuirks) BindParameter(_ int) string         { return "?" }
+func (s *FlightSQLQuirks) Alloc() memory.Allocator    { return s.mem }
+func (s *FlightSQLQuirks) BindParameter(_ int) string { return "?" }
+
+// SupportsBulkIngest returns false because the example SQLite test server does
+// not implement DoPutCommandStatementIngest. The driver itself supports bulk
+// ingest via FlightSQL's ExecuteIngest when connected to a server that 
supports
+// it - see BulkIngestTestServer and TestBulkIngest for those tests.
 func (s *FlightSQLQuirks) SupportsBulkIngest(string) bool     { return false }
 func (s *FlightSQLQuirks) SupportsConcurrentStatements() bool { return true }
 func (s *FlightSQLQuirks) SupportsCurrentCatalogSchema() bool { return false }
diff --git a/go/adbc/driver/flightsql/flightsql_bulk_ingest.go 
b/go/adbc/driver/flightsql/flightsql_bulk_ingest.go
new file mode 100644
index 000000000..9ae22ba84
--- /dev/null
+++ b/go/adbc/driver/flightsql/flightsql_bulk_ingest.go
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package flightsql
+
+import (
+       "context"
+       "fmt"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/flight/flightsql"
+       pb "github.com/apache/arrow-go/v18/arrow/flight/gen/flight"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/metadata"
+)
+
+// ingestOptions holds the configuration for bulk ingestion operations.
+type ingestOptions struct {
+       targetTable string
+       mode        string
+       catalog     *string
+       dbSchema    *string
+       temporary   bool
+}
+
+// buildTableDefinitionOptions maps ADBC ingest modes to FlightSQL 
TableDefinitionOptions.
+func buildTableDefinitionOptions(mode string) 
*flightsql.TableDefinitionOptions {
+       opts := &flightsql.TableDefinitionOptions{}
+
+       switch mode {
+       case adbc.OptionValueIngestModeCreate:
+               // Create new table, fail if exists
+               opts.IfNotExist = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_CREATE
+               opts.IfExists = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_FAIL
+       case adbc.OptionValueIngestModeAppend:
+               // Append to existing table, fail if not exists
+               opts.IfNotExist = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_FAIL
+               opts.IfExists = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_APPEND
+       case adbc.OptionValueIngestModeReplace:
+               // Replace table if exists, create if not
+               opts.IfNotExist = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_CREATE
+               opts.IfExists = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_REPLACE
+       case adbc.OptionValueIngestModeCreateAppend:
+               // Create table if not exists, append if exists
+               opts.IfNotExist = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_CREATE
+               opts.IfExists = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_APPEND
+       default:
+               // Default to create mode
+               opts.IfNotExist = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_CREATE
+               opts.IfExists = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_FAIL
+       }
+
+       return opts
+}
+
+// buildExecuteIngestOpts creates ExecuteIngestOpts from ingestOptions.
+func buildExecuteIngestOpts(opts ingestOptions) *flightsql.ExecuteIngestOpts {
+       tableDefOpts := buildTableDefinitionOptions(opts.mode)
+
+       ingestOpts := &flightsql.ExecuteIngestOpts{
+               TableDefinitionOptions: tableDefOpts,
+               Table:                  opts.targetTable,
+               Temporary:              opts.temporary,
+       }
+
+       if opts.catalog != nil {
+               ingestOpts.Catalog = opts.catalog
+       }
+       if opts.dbSchema != nil {
+               ingestOpts.Schema = opts.dbSchema
+       }
+
+       return ingestOpts
+}
+
+// createRecordReaderFromBatch converts a single record batch to a 
RecordReader.
+func createRecordReaderFromBatch(batch arrow.RecordBatch) (array.RecordReader, 
error) {
+       rdr, err := array.NewRecordReader(batch.Schema(), 
[]arrow.RecordBatch{batch})
+       if err != nil {
+               return nil, adbc.Error{
+                       Msg:  fmt.Sprintf("[Flight SQL Statement] failed to 
create record reader: %s", err),
+                       Code: adbc.StatusInternal,
+               }
+       }
+       return rdr, nil
+}
+
+// executeIngest performs bulk ingestion using the FlightSQL client's 
ExecuteIngest method.
+// This is called from the statement when a target table has been set for bulk 
ingest.
+func (s *statement) executeIngest(ctx context.Context) (int64, error) {
+       if s.streamBind == nil && s.bound == nil {
+               return -1, adbc.Error{
+                       Msg:  "[Flight SQL Statement] must call Bind before 
bulk ingestion",
+                       Code: adbc.StatusInvalidState,
+               }
+       }
+
+       opts := ingestOptions{
+               targetTable: s.targetTable,
+               mode:        s.ingestMode,
+               catalog:     s.catalog,
+               dbSchema:    s.dbSchema,
+               temporary:   s.temporary,
+       }
+
+       ingestOpts := buildExecuteIngestOpts(opts)
+
+       // Get the record reader to ingest
+       var rdr array.RecordReader
+       var err error
+       if s.streamBind != nil {
+               rdr = s.streamBind
+       } else {
+               rdr, err = createRecordReaderFromBatch(s.bound)
+               if err != nil {
+                       return -1, err
+               }
+       }
+
+       ctx = metadata.NewOutgoingContext(ctx, s.hdrs)
+       var header, trailer metadata.MD
+       callOpts := append([]grpc.CallOption{}, grpc.Header(&header), 
grpc.Trailer(&trailer), s.timeouts)
+
+       nRows, err := s.cnxn.cl.ExecuteIngest(ctx, rdr, ingestOpts, callOpts...)
+       if err != nil {
+               return -1, adbcFromFlightStatusWithDetails(err, header, 
trailer, "ExecuteIngest")
+       }
+
+       return nRows, nil
+}
diff --git a/go/adbc/driver/flightsql/flightsql_statement.go 
b/go/adbc/driver/flightsql/flightsql_statement.go
index 30363af25..85f750a0c 100644
--- a/go/adbc/driver/flightsql/flightsql_statement.go
+++ b/go/adbc/driver/flightsql/flightsql_statement.go
@@ -168,6 +168,17 @@ type statement struct {
        progress         float64
        // may seem redundant, but incrementalState isn't locked
        lastInfo atomic.Pointer[flight.FlightInfo]
+
+       // Bulk ingest fields
+       targetTable string
+       ingestMode  string
+       catalog     *string
+       dbSchema    *string
+       temporary   bool
+
+       // Bound data for bulk ingest
+       bound      arrow.RecordBatch
+       streamBind array.RecordReader
 }
 
 func (s *statement) closePreparedStatement() error {
@@ -215,6 +226,15 @@ func (s *statement) Close() (err error) {
                }
        }
 
+       if s.bound != nil {
+               s.bound.Release()
+               s.bound = nil
+       }
+       if s.streamBind != nil {
+               s.streamBind.Release()
+               s.streamBind = nil
+       }
+
        s.clientCache = nil
        s.cnxn = nil
 
@@ -360,6 +380,53 @@ func (s *statement) SetOption(key string, val string) 
error {
                                Code: adbc.StatusInvalidArgument,
                        }
                }
+       case adbc.OptionKeyIngestTargetTable:
+               if s.prepared != nil {
+                       if err := s.closePreparedStatement(); err != nil {
+                               return err
+                       }
+                       s.prepared = nil
+               }
+               s.query.sqlQuery = ""
+               s.query.substraitPlan = nil
+               s.targetTable = val
+       case adbc.OptionKeyIngestMode:
+               switch val {
+               case adbc.OptionValueIngestModeCreate,
+                       adbc.OptionValueIngestModeAppend,
+                       adbc.OptionValueIngestModeReplace,
+                       adbc.OptionValueIngestModeCreateAppend:
+                       s.ingestMode = val
+               default:
+                       return adbc.Error{
+                               Msg:  fmt.Sprintf("[Flight SQL] Invalid ingest 
mode '%s'", val),
+                               Code: adbc.StatusInvalidArgument,
+                       }
+               }
+       case adbc.OptionValueIngestTargetCatalog:
+               if val == "" {
+                       s.catalog = nil
+               } else {
+                       s.catalog = &val
+               }
+       case adbc.OptionValueIngestTargetDBSchema:
+               if val == "" {
+                       s.dbSchema = nil
+               } else {
+                       s.dbSchema = &val
+               }
+       case adbc.OptionValueIngestTemporary:
+               switch val {
+               case adbc.OptionValueEnabled:
+                       s.temporary = true
+               case adbc.OptionValueDisabled:
+                       s.temporary = false
+               default:
+                       return adbc.Error{
+                               Msg:  fmt.Sprintf("[Flight SQL] Invalid 
statement option value %s=%s", key, val),
+                               Code: adbc.StatusInvalidArgument,
+                       }
+               }
        default:
                return adbc.Error{
                        Msg:  "[Flight SQL] Unknown statement option '" + key + 
"'",
@@ -421,6 +488,7 @@ func (s *statement) SetSqlQuery(query string) error {
        if err := s.clearIncrementalQuery(); err != nil {
                return err
        }
+       s.targetTable = ""
        s.query.setSqlQuery(query)
        return nil
 }
@@ -435,6 +503,12 @@ func (s *statement) ExecuteQuery(ctx context.Context) (rdr 
array.RecordReader, n
                return nil, -1, err
        }
 
+       // Handle bulk ingest
+       if s.targetTable != "" {
+               nrec, err = s.executeIngest(ctx)
+               return nil, nrec, err
+       }
+
        ctx = metadata.NewOutgoingContext(ctx, s.hdrs)
        var info *flight.FlightInfo
        var header, trailer metadata.MD
@@ -461,6 +535,11 @@ func (s *statement) ExecuteUpdate(ctx context.Context) (n 
int64, err error) {
                return -1, err
        }
 
+       // Handle bulk ingest
+       if s.targetTable != "" {
+               return s.executeIngest(ctx)
+       }
+
        ctx = metadata.NewOutgoingContext(ctx, s.hdrs)
        var header, trailer metadata.MD
        opts := append([]grpc.CallOption{}, grpc.Header(&header), 
grpc.Trailer(&trailer), s.timeouts)
@@ -521,11 +600,25 @@ func (s *statement) SetSubstraitPlan(plan []byte) error {
 // but it may not do this until the statement is closed or another
 // record is bound.
 func (s *statement) Bind(_ context.Context, values arrow.RecordBatch) error {
-       // TODO: handle bulk insert situation
+       // For bulk ingest, bind to the statement
+       if s.targetTable != "" {
+               if s.streamBind != nil {
+                       s.streamBind.Release()
+                       s.streamBind = nil
+               }
+               if s.bound != nil {
+                       s.bound.Release()
+               }
+               s.bound = values
+               if s.bound != nil {
+                       s.bound.Retain()
+               }
+               return nil
+       }
 
        if s.prepared == nil {
                return adbc.Error{
-                       Msg:  "[Flight SQL Statement] must call Prepare before 
calling Bind",
+                       Msg:  "[Flight SQL Statement] must call Prepare or set 
IngestTargetTable before calling Bind",
                        Code: adbc.StatusInvalidState}
        }
 
@@ -540,9 +633,25 @@ func (s *statement) Bind(_ context.Context, values 
arrow.RecordBatch) error {
 // The driver will call Release on the record reader, but may not do this
 // until Close is called.
 func (s *statement) BindStream(_ context.Context, stream array.RecordReader) 
error {
+       // For bulk ingest, bind to the statement
+       if s.targetTable != "" {
+               if s.bound != nil {
+                       s.bound.Release()
+                       s.bound = nil
+               }
+               if s.streamBind != nil {
+                       s.streamBind.Release()
+               }
+               s.streamBind = stream
+               if s.streamBind != nil {
+                       s.streamBind.Retain()
+               }
+               return nil
+       }
+
        if s.prepared == nil {
                return adbc.Error{
-                       Msg:  "[Flight SQL Statement] must call Prepare before 
calling Bind",
+                       Msg:  "[Flight SQL Statement] must call Prepare or set 
IngestTargetTable before calling Bind",
                        Code: adbc.StatusInvalidState}
        }
 
diff --git a/python/adbc_driver_flightsql/tests/conftest.py 
b/python/adbc_driver_flightsql/tests/conftest.py
index 96105a6f2..4c775d8e6 100644
--- a/python/adbc_driver_flightsql/tests/conftest.py
+++ b/python/adbc_driver_flightsql/tests/conftest.py
@@ -85,3 +85,55 @@ def test_dbapi():
         autocommit=True,
     ) as conn:
         yield conn
+
+
[email protected](scope="session")
+def gizmosql_uri() -> str:
+    uri = os.environ.get("ADBC_GIZMOSQL_URI")
+    if not uri:
+        pytest.skip("Set ADBC_GIZMOSQL_URI to run tests")
+    return uri
+
+
[email protected](scope="session")
+def gizmosql_user() -> str:
+    username = os.environ.get("ADBC_GIZMOSQL_USER")
+    if not username:
+        pytest.skip("Set ADBC_GIZMOSQL_USER to run tests")
+    return username
+
+
[email protected](scope="session")
+def gizmosql_pass() -> str:
+    password = os.environ.get("ADBC_GIZMOSQL_PASSWORD")
+    if not password:
+        pytest.skip("Set ADBC_GIZMOSQL_PASSWORD to run tests")
+    return password
+
+
[email protected]
+def gizmosql(gizmosql_uri, gizmosql_user, gizmosql_pass):
+    """Create a low-level ADBC connection to the GizmoSQL server."""
+    with adbc_driver_flightsql.connect(
+        gizmosql_uri,
+        db_kwargs={
+            adbc_driver_manager.DatabaseOptions.USERNAME.value: gizmosql_user,
+            adbc_driver_manager.DatabaseOptions.PASSWORD.value: gizmosql_pass,
+        },
+    ) as db:
+        with adbc_driver_manager.AdbcConnection(db) as conn:
+            yield conn
+
+
[email protected]
+def gizmosql_dbapi(gizmosql_uri, gizmosql_user, gizmosql_pass):
+    """Create a DBAPI (PEP 249) connection to the GizmoSQL server."""
+    with adbc_driver_flightsql.dbapi.connect(
+        gizmosql_uri,
+        db_kwargs={
+            adbc_driver_manager.DatabaseOptions.USERNAME.value: gizmosql_user,
+            adbc_driver_manager.DatabaseOptions.PASSWORD.value: gizmosql_pass,
+        },
+        autocommit=True,
+    ) as conn:
+        yield conn
diff --git a/python/adbc_driver_flightsql/tests/test_bulk_ingest.py 
b/python/adbc_driver_flightsql/tests/test_bulk_ingest.py
new file mode 100644
index 000000000..1db468c50
--- /dev/null
+++ b/python/adbc_driver_flightsql/tests/test_bulk_ingest.py
@@ -0,0 +1,380 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Tests for bulk ingest functionality using GizmoSQL."""
+
+import pyarrow as pa
+
+import adbc_driver_manager
+
+# Aliases for long constant names to keep lines under 88 chars
+INGEST_TARGET_TABLE = 
adbc_driver_manager.StatementOptions.INGEST_TARGET_TABLE.value
+INGEST_MODE = adbc_driver_manager.StatementOptions.INGEST_MODE.value
+
+
+def test_ingest_create(gizmosql):
+    """Test creating a new table via bulk ingest."""
+    table_name = "test_ingest_create"
+
+    # Create sample data
+    data = pa.table(
+        {
+            "id": pa.array([1, 2, 3], type=pa.int64()),
+            "name": pa.array(["Alice", "Bob", "Charlie"], type=pa.string()),
+            "value": pa.array([1.1, 2.2, 3.3], type=pa.float64()),
+        }
+    )
+
+    with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+        # Configure for bulk ingest - create mode
+        stmt.set_options(
+            **{
+                INGEST_TARGET_TABLE: table_name,
+                INGEST_MODE: "adbc.ingest.mode.create",
+            }
+        )
+
+        # Bind the data
+        stmt.bind_stream(data.to_reader())
+
+        # Execute the ingest
+        rows_affected = stmt.execute_update()
+        assert rows_affected == 3
+
+    # Verify the data was ingested by querying it back
+    with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+        stmt.set_sql_query(f"SELECT * FROM {table_name} ORDER BY id")
+        stream, _ = stmt.execute_query()
+        reader = pa.RecordBatchReader._import_from_c(stream.address)
+        result = reader.read_all()
+
+        assert result.num_rows == 3
+        assert result.column("id").to_pylist() == [1, 2, 3]
+        assert result.column("name").to_pylist() == ["Alice", "Bob", "Charlie"]
+
+
+def test_ingest_append(gizmosql):
+    """Test appending to an existing table via bulk ingest."""
+    table_name = "test_ingest_append"
+
+    # First create the table with initial data
+    initial_data = pa.table(
+        {
+            "id": pa.array([1, 2], type=pa.int64()),
+            "value": pa.array([10, 20], type=pa.int32()),
+        }
+    )
+
+    with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+        stmt.set_options(
+            **{
+                INGEST_TARGET_TABLE: table_name,
+                INGEST_MODE: "adbc.ingest.mode.create",
+            }
+        )
+        stmt.bind_stream(initial_data.to_reader())
+        stmt.execute_update()
+
+    # Now append more data
+    append_data = pa.table(
+        {
+            "id": pa.array([3, 4], type=pa.int64()),
+            "value": pa.array([30, 40], type=pa.int32()),
+        }
+    )
+
+    with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+        stmt.set_options(
+            **{
+                INGEST_TARGET_TABLE: table_name,
+                INGEST_MODE: "adbc.ingest.mode.append",
+            }
+        )
+        stmt.bind_stream(append_data.to_reader())
+        rows_affected = stmt.execute_update()
+        assert rows_affected == 2
+
+    # Verify all data is present
+    with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+        stmt.set_sql_query(f"SELECT COUNT(*) as cnt FROM {table_name}")
+        stream, _ = stmt.execute_query()
+        reader = pa.RecordBatchReader._import_from_c(stream.address)
+        result = reader.read_all()
+        assert result.column("cnt")[0].as_py() == 4
+
+
+def test_ingest_replace(gizmosql):
+    """Test replacing a table via bulk ingest."""
+    table_name = "test_ingest_replace"
+
+    # First create the table with initial data
+    initial_data = pa.table(
+        {
+            "id": pa.array([1, 2, 3], type=pa.int64()),
+        }
+    )
+
+    with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+        stmt.set_options(
+            **{
+                INGEST_TARGET_TABLE: table_name,
+                INGEST_MODE: "adbc.ingest.mode.create",
+            }
+        )
+        stmt.bind_stream(initial_data.to_reader())
+        stmt.execute_update()
+
+    # Now replace with new data
+    replace_data = pa.table(
+        {
+            "id": pa.array([100, 200], type=pa.int64()),
+        }
+    )
+
+    with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+        stmt.set_options(
+            **{
+                INGEST_TARGET_TABLE: table_name,
+                INGEST_MODE: "adbc.ingest.mode.replace",
+            }
+        )
+        stmt.bind_stream(replace_data.to_reader())
+        rows_affected = stmt.execute_update()
+        assert rows_affected == 2
+
+    # Verify only the new data is present
+    with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+        stmt.set_sql_query(f"SELECT * FROM {table_name} ORDER BY id")
+        stream, _ = stmt.execute_query()
+        reader = pa.RecordBatchReader._import_from_c(stream.address)
+        result = reader.read_all()
+        assert result.num_rows == 2
+        assert result.column("id").to_pylist() == [100, 200]
+
+
+def test_ingest_create_append(gizmosql):
+    """Test create_append mode - creates if not exists, appends if exists."""
+    table_name = "test_ingest_create_append"
+
+    data = pa.table(
+        {
+            "id": pa.array([1, 2], type=pa.int64()),
+        }
+    )
+
+    # First call should create the table
+    with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+        stmt.set_options(
+            **{
+                INGEST_TARGET_TABLE: table_name,
+                INGEST_MODE: "adbc.ingest.mode.create_append",
+            }
+        )
+        stmt.bind_stream(data.to_reader())
+        rows_affected = stmt.execute_update()
+        assert rows_affected == 2
+
+    # Second call should append
+    with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+        stmt.set_options(
+            **{
+                INGEST_TARGET_TABLE: table_name,
+                INGEST_MODE: "adbc.ingest.mode.create_append",
+            }
+        )
+        stmt.bind_stream(data.to_reader())
+        rows_affected = stmt.execute_update()
+        assert rows_affected == 2
+
+    # Verify we have 4 rows total
+    with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+        stmt.set_sql_query(f"SELECT COUNT(*) as cnt FROM {table_name}")
+        stream, _ = stmt.execute_query()
+        reader = pa.RecordBatchReader._import_from_c(stream.address)
+        result = reader.read_all()
+        assert result.column("cnt")[0].as_py() == 4
+
+
+def test_ingest_multiple_batches(gizmosql):
+    """Test ingesting data with multiple record batches."""
+    table_name = "test_ingest_multi_batch"
+
+    # Create data with multiple batches
+    schema = pa.schema(
+        [
+            ("id", pa.int64()),
+            ("value", pa.string()),
+        ]
+    )
+
+    batches = [
+        pa.record_batch(
+            [
+                pa.array([1, 2, 3], type=pa.int64()),
+                pa.array(["a", "b", "c"], type=pa.string()),
+            ],
+            schema=schema,
+        ),
+        pa.record_batch(
+            [
+                pa.array([4, 5], type=pa.int64()),
+                pa.array(["d", "e"], type=pa.string()),
+            ],
+            schema=schema,
+        ),
+    ]
+
+    reader = pa.RecordBatchReader.from_batches(schema, batches)
+
+    with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+        stmt.set_options(
+            **{
+                INGEST_TARGET_TABLE: table_name,
+                INGEST_MODE: "adbc.ingest.mode.create",
+            }
+        )
+        stmt.bind_stream(reader)
+        rows_affected = stmt.execute_update()
+        assert rows_affected == 5
+
+    # Verify all data was ingested
+    with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+        stmt.set_sql_query(f"SELECT COUNT(*) as cnt FROM {table_name}")
+        stream, _ = stmt.execute_query()
+        reader = pa.RecordBatchReader._import_from_c(stream.address)
+        result = reader.read_all()
+        assert result.column("cnt")[0].as_py() == 5
+
+
+def test_ingest_via_dbapi_with_reader(gizmosql_dbapi):
+    """Test bulk ingest using the DBAPI interface with RecordBatchReader."""
+    table_name = "test_ingest_dbapi_reader"
+
+    data = pa.table(
+        {
+            "id": pa.array([1, 2, 3], type=pa.int64()),
+            "name": pa.array(["x", "y", "z"], type=pa.string()),
+        }
+    )
+
+    # Test using the cursor's adbc_ingest method with explicit reader
+    with gizmosql_dbapi.cursor() as cur:
+        rows_affected = cur.adbc_ingest(table_name, data.to_reader())
+        assert rows_affected == 3
+
+    # Verify via DBAPI query
+    with gizmosql_dbapi.cursor() as cur:
+        cur.execute(f"SELECT COUNT(*) FROM {table_name}")
+        result = cur.fetchone()
+        assert result[0] == 3
+
+
+def test_ingest_via_dbapi_with_table(gizmosql_dbapi):
+    """Test bulk ingest using the DBAPI interface with Table directly."""
+    table_name = "test_ingest_dbapi_table"
+
+    data = pa.table(
+        {
+            "id": pa.array([1, 2, 3], type=pa.int64()),
+            "name": pa.array(["x", "y", "z"], type=pa.string()),
+        }
+    )
+
+    # Test using the cursor's adbc_ingest method with Table directly
+    with gizmosql_dbapi.cursor() as cur:
+        rows_affected = cur.adbc_ingest(table_name, data)
+        assert rows_affected == 3
+
+    # Verify via DBAPI query
+    with gizmosql_dbapi.cursor() as cur:
+        cur.execute(f"SELECT COUNT(*) FROM {table_name}")
+        result = cur.fetchone()
+        assert result[0] == 3
+
+
+def test_ingest_dbapi_modes(gizmosql_dbapi):
+    """Test different ingest modes via DBAPI (append, replace, 
create_append)."""
+    table_name = "test_ingest_dbapi_modes"
+
+    # Create initial table
+    initial_data = pa.table(
+        {
+            "id": pa.array([1, 2], type=pa.int64()),
+        }
+    )
+
+    with gizmosql_dbapi.cursor() as cur:
+        rows_affected = cur.adbc_ingest(table_name, initial_data, 
mode="create")
+        assert rows_affected == 2
+
+    # Test append mode
+    append_data = pa.table(
+        {
+            "id": pa.array([3, 4], type=pa.int64()),
+        }
+    )
+
+    with gizmosql_dbapi.cursor() as cur:
+        rows_affected = cur.adbc_ingest(table_name, append_data, mode="append")
+        assert rows_affected == 2
+
+    # Verify 4 rows total
+    with gizmosql_dbapi.cursor() as cur:
+        cur.execute(f"SELECT COUNT(*) FROM {table_name}")
+        assert cur.fetchone()[0] == 4
+
+    # Test replace mode
+    replace_data = pa.table(
+        {
+            "id": pa.array([100, 200, 300], type=pa.int64()),
+        }
+    )
+
+    with gizmosql_dbapi.cursor() as cur:
+        rows_affected = cur.adbc_ingest(table_name, replace_data, 
mode="replace")
+        assert rows_affected == 3
+
+    # Verify 3 rows (replaced)
+    with gizmosql_dbapi.cursor() as cur:
+        cur.execute(f"SELECT COUNT(*) FROM {table_name}")
+        assert cur.fetchone()[0] == 3
+
+
+def test_ingest_dbapi_create_append(gizmosql_dbapi):
+    """Test create_append mode via DBAPI."""
+    table_name = "test_ingest_dbapi_create_append"
+
+    data = pa.table(
+        {
+            "id": pa.array([1, 2], type=pa.int64()),
+        }
+    )
+
+    # First call should create the table
+    with gizmosql_dbapi.cursor() as cur:
+        rows_affected = cur.adbc_ingest(table_name, data, mode="create_append")
+        assert rows_affected == 2
+
+    # Second call should append to existing table
+    with gizmosql_dbapi.cursor() as cur:
+        rows_affected = cur.adbc_ingest(table_name, data, mode="create_append")
+        assert rows_affected == 2
+
+    # Verify 4 rows total
+    with gizmosql_dbapi.cursor() as cur:
+        cur.execute(f"SELECT COUNT(*) FROM {table_name}")
+        assert cur.fetchone()[0] == 4


Reply via email to