lidavidm commented on code in PR #3808:
URL: https://github.com/apache/arrow-adbc/pull/3808#discussion_r2650263703


##########
go/adbc/driver/flightsql/flightsql_statement.go:
##########


Review Comment:
   setSqlQuery needs to clear targetTable



##########
go/adbc/driver/flightsql/flightsql_bulk_ingest.go:
##########
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package flightsql
+
+import (
+       "context"
+       "fmt"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/flight/flightsql"
+       pb "github.com/apache/arrow-go/v18/arrow/flight/gen/flight"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/metadata"
+)
+
+// ingestOptions holds the configuration for bulk ingestion operations.
+type ingestOptions struct {
+       targetTable string
+       mode        string
+       catalog     *string
+       dbSchema    *string
+       temporary   bool
+}
+
+// buildTableDefinitionOptions maps ADBC ingest modes to FlightSQL 
TableDefinitionOptions.
+func buildTableDefinitionOptions(mode string) 
*flightsql.TableDefinitionOptions {
+       opts := &flightsql.TableDefinitionOptions{}
+
+       switch mode {
+       case adbc.OptionValueIngestModeCreate:
+               // Create new table, fail if exists
+               opts.IfNotExist = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_CREATE
+               opts.IfExists = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_FAIL
+       case adbc.OptionValueIngestModeAppend:
+               // Append to existing table, fail if not exists
+               opts.IfNotExist = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_FAIL
+               opts.IfExists = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_APPEND
+       case adbc.OptionValueIngestModeReplace:
+               // Replace table if exists, create if not
+               opts.IfNotExist = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_CREATE
+               opts.IfExists = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_REPLACE
+       case adbc.OptionValueIngestModeCreateAppend:
+               // Create table if not exists, append if exists
+               opts.IfNotExist = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_CREATE
+               opts.IfExists = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_APPEND
+       default:
+               // Default to create mode
+               opts.IfNotExist = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_CREATE
+               opts.IfExists = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_FAIL
+       }
+
+       return opts
+}
+
+// buildExecuteIngestOpts creates ExecuteIngestOpts from ingestOptions.
+func buildExecuteIngestOpts(opts ingestOptions) *flightsql.ExecuteIngestOpts {
+       tableDefOpts := buildTableDefinitionOptions(opts.mode)
+
+       ingestOpts := &flightsql.ExecuteIngestOpts{
+               TableDefinitionOptions: tableDefOpts,
+               Table:                  opts.targetTable,
+               Temporary:              opts.temporary,
+       }
+
+       if opts.catalog != nil {
+               ingestOpts.Catalog = opts.catalog
+       }
+       if opts.dbSchema != nil {
+               ingestOpts.Schema = opts.dbSchema
+       }
+
+       return ingestOpts
+}
+
+// executeIngestWithReader performs the bulk ingest operation with the given 
record reader.
+func executeIngestWithReader(
+       ctx context.Context,
+       client *flightsql.Client,
+       rdr array.RecordReader,
+       opts *flightsql.ExecuteIngestOpts,
+       callOpts ...grpc.CallOption,
+) (int64, error) {
+       return client.ExecuteIngest(ctx, rdr, opts, callOpts...)
+}
+
+// createRecordReaderFromBatch converts a single record batch to a 
RecordReader.
+func createRecordReaderFromBatch(batch arrow.RecordBatch) (array.RecordReader, 
error) {
+       rdr, err := array.NewRecordReader(batch.Schema(), 
[]arrow.RecordBatch{batch})
+       if err != nil {
+               return nil, adbc.Error{
+                       Msg:  fmt.Sprintf("[Flight SQL Statement] failed to 
create record reader: %s", err.Error()),

Review Comment:
   nit: I think there's no need to explicitly call `err.Error()`



##########
python/adbc_driver_flightsql/tests/test_bulk_ingest.py:
##########
@@ -0,0 +1,415 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Tests for bulk ingest functionality using GizmoSQL."""
+
+import pyarrow as pa
+
+import adbc_driver_manager
+
+# Aliases for long constant names to keep lines under 88 chars
+INGEST_TARGET_TABLE = 
adbc_driver_manager.StatementOptions.INGEST_TARGET_TABLE.value
+INGEST_MODE = adbc_driver_manager.StatementOptions.INGEST_MODE.value
+
+
+class TestBulkIngest:
+    """Test bulk ingest functionality."""
+
+    def test_ingest_create(self, gizmosql):
+        """Test creating a new table via bulk ingest."""
+        table_name = "test_ingest_create"
+
+        # Create sample data
+        data = pa.table(
+            {
+                "id": pa.array([1, 2, 3], type=pa.int64()),
+                "name": pa.array(["Alice", "Bob", "Charlie"], 
type=pa.string()),
+                "value": pa.array([1.1, 2.2, 3.3], type=pa.float64()),
+            }
+        )
+
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            # Configure for bulk ingest - create mode
+            stmt.set_options(
+                **{
+                    INGEST_TARGET_TABLE: table_name,
+                    INGEST_MODE: "adbc.ingest.mode.create",
+                }
+            )
+
+            # Bind the data
+            stmt.bind_stream(data.to_reader())
+
+            # Execute the ingest
+            rows_affected = stmt.execute_update()
+            assert rows_affected == 3
+
+        # Verify the data was ingested by querying it back
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_sql_query(f"SELECT * FROM {table_name} ORDER BY id")
+            stream, _ = stmt.execute_query()
+            reader = pa.RecordBatchReader._import_from_c(stream.address)
+            result = reader.read_all()
+
+            assert result.num_rows == 3
+            assert result.column("id").to_pylist() == [1, 2, 3]
+            assert result.column("name").to_pylist() == ["Alice", "Bob", 
"Charlie"]
+
+    def test_ingest_append(self, gizmosql):
+        """Test appending to an existing table via bulk ingest."""
+        table_name = "test_ingest_append"
+
+        # First create the table with initial data
+        initial_data = pa.table(
+            {
+                "id": pa.array([1, 2], type=pa.int64()),
+                "value": pa.array([10, 20], type=pa.int32()),
+            }
+        )
+
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_options(
+                **{
+                    INGEST_TARGET_TABLE: table_name,
+                    INGEST_MODE: "adbc.ingest.mode.create",
+                }
+            )
+            stmt.bind_stream(initial_data.to_reader())
+            stmt.execute_update()
+
+        # Now append more data
+        append_data = pa.table(
+            {
+                "id": pa.array([3, 4], type=pa.int64()),
+                "value": pa.array([30, 40], type=pa.int32()),
+            }
+        )
+
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_options(
+                **{
+                    INGEST_TARGET_TABLE: table_name,
+                    INGEST_MODE: "adbc.ingest.mode.append",
+                }
+            )
+            stmt.bind_stream(append_data.to_reader())
+            rows_affected = stmt.execute_update()
+            assert rows_affected == 2
+
+        # Verify all data is present
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_sql_query(f"SELECT COUNT(*) as cnt FROM {table_name}")
+            stream, _ = stmt.execute_query()
+            reader = pa.RecordBatchReader._import_from_c(stream.address)
+            result = reader.read_all()
+            assert result.column("cnt")[0].as_py() == 4
+
+    def test_ingest_replace(self, gizmosql):
+        """Test replacing a table via bulk ingest."""
+        table_name = "test_ingest_replace"
+
+        # First create the table with initial data
+        initial_data = pa.table(
+            {
+                "id": pa.array([1, 2, 3], type=pa.int64()),
+            }
+        )
+
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_options(
+                **{
+                    INGEST_TARGET_TABLE: table_name,
+                    INGEST_MODE: "adbc.ingest.mode.create",
+                }
+            )
+            stmt.bind_stream(initial_data.to_reader())
+            stmt.execute_update()
+
+        # Now replace with new data
+        replace_data = pa.table(
+            {
+                "id": pa.array([100, 200], type=pa.int64()),
+            }
+        )
+
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_options(
+                **{
+                    INGEST_TARGET_TABLE: table_name,
+                    INGEST_MODE: "adbc.ingest.mode.replace",
+                }
+            )
+            stmt.bind_stream(replace_data.to_reader())
+            rows_affected = stmt.execute_update()
+            assert rows_affected == 2
+
+        # Verify only the new data is present
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_sql_query(f"SELECT * FROM {table_name} ORDER BY id")
+            stream, _ = stmt.execute_query()
+            reader = pa.RecordBatchReader._import_from_c(stream.address)
+            result = reader.read_all()
+            assert result.num_rows == 2
+            assert result.column("id").to_pylist() == [100, 200]
+
+    def test_ingest_create_append(self, gizmosql):
+        """Test create_append mode - creates if not exists, appends if 
exists."""
+        table_name = "test_ingest_create_append"
+
+        data = pa.table(
+            {
+                "id": pa.array([1, 2], type=pa.int64()),
+            }
+        )
+
+        # First call should create the table
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_options(
+                **{
+                    INGEST_TARGET_TABLE: table_name,
+                    INGEST_MODE: "adbc.ingest.mode.create_append",
+                }
+            )
+            stmt.bind_stream(data.to_reader())
+            rows_affected = stmt.execute_update()
+            assert rows_affected == 2
+
+        # Second call should append
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_options(
+                **{
+                    INGEST_TARGET_TABLE: table_name,
+                    INGEST_MODE: "adbc.ingest.mode.create_append",
+                }
+            )
+            stmt.bind_stream(data.to_reader())
+            rows_affected = stmt.execute_update()
+            assert rows_affected == 2
+
+        # Verify we have 4 rows total
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_sql_query(f"SELECT COUNT(*) as cnt FROM {table_name}")
+            stream, _ = stmt.execute_query()
+            reader = pa.RecordBatchReader._import_from_c(stream.address)
+            result = reader.read_all()
+            assert result.column("cnt")[0].as_py() == 4
+
+    def test_ingest_multiple_batches(self, gizmosql):
+        """Test ingesting data with multiple record batches."""
+        table_name = "test_ingest_multi_batch"
+
+        # Create data with multiple batches
+        schema = pa.schema(
+            [
+                ("id", pa.int64()),
+                ("value", pa.string()),
+            ]
+        )
+
+        batches = [
+            pa.record_batch(
+                [
+                    pa.array([1, 2, 3], type=pa.int64()),
+                    pa.array(["a", "b", "c"], type=pa.string()),
+                ],
+                schema=schema,
+            ),
+            pa.record_batch(
+                [
+                    pa.array([4, 5], type=pa.int64()),
+                    pa.array(["d", "e"], type=pa.string()),
+                ],
+                schema=schema,
+            ),
+        ]
+
+        reader = pa.RecordBatchReader.from_batches(schema, batches)
+
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_options(
+                **{
+                    INGEST_TARGET_TABLE: table_name,
+                    INGEST_MODE: "adbc.ingest.mode.create",
+                }
+            )
+            stmt.bind_stream(reader)
+            rows_affected = stmt.execute_update()
+            assert rows_affected == 5
+
+        # Verify all data was ingested
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_sql_query(f"SELECT COUNT(*) as cnt FROM {table_name}")
+            stream, _ = stmt.execute_query()
+            reader = pa.RecordBatchReader._import_from_c(stream.address)
+            result = reader.read_all()
+            assert result.column("cnt")[0].as_py() == 5
+
+    def test_ingest_various_types(self, gizmosql):
+        """Test ingesting data with various Arrow types."""
+        table_name = "test_ingest_types"
+
+        data = pa.table(
+            {
+                "col_int8": pa.array([1, 2, 3], type=pa.int8()),
+                "col_int16": pa.array([100, 200, 300], type=pa.int16()),
+                "col_int32": pa.array([1000, 2000, 3000], type=pa.int32()),
+                "col_int64": pa.array([10000, 20000, 30000], type=pa.int64()),
+                "col_float32": pa.array([1.1, 2.2, 3.3], type=pa.float32()),
+                "col_float64": pa.array([1.11, 2.22, 3.33], type=pa.float64()),
+                "col_string": pa.array(["hello", "world", "test"], 
type=pa.string()),
+                "col_bool": pa.array([True, False, True], type=pa.bool_()),
+            }
+        )
+
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_options(
+                **{
+                    INGEST_TARGET_TABLE: table_name,
+                    INGEST_MODE: "adbc.ingest.mode.create",
+                }
+            )
+            stmt.bind_stream(data.to_reader())
+            rows_affected = stmt.execute_update()
+            assert rows_affected == 3
+
+        # Verify the data
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_sql_query(f"SELECT * FROM {table_name}")
+            stream, _ = stmt.execute_query()
+            reader = pa.RecordBatchReader._import_from_c(stream.address)
+            result = reader.read_all()
+            assert result.num_rows == 3
+
+
+class TestBulkIngestDBAPI:
+    """Test bulk ingest using the DBAPI interface."""

Review Comment:
   Same question here. The test names all encode the difference anyways, so why 
add the extra layer? 



##########
python/adbc_driver_flightsql/tests/test_bulk_ingest.py:
##########
@@ -0,0 +1,415 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Tests for bulk ingest functionality using GizmoSQL."""
+
+import pyarrow as pa
+
+import adbc_driver_manager
+
+# Aliases for long constant names to keep lines under 88 chars
+INGEST_TARGET_TABLE = 
adbc_driver_manager.StatementOptions.INGEST_TARGET_TABLE.value
+INGEST_MODE = adbc_driver_manager.StatementOptions.INGEST_MODE.value
+
+
+class TestBulkIngest:
+    """Test bulk ingest functionality."""

Review Comment:
   nit: is there need for the class? It's not required in pytest



##########
python/adbc_driver_flightsql/tests/test_bulk_ingest.py:
##########
@@ -0,0 +1,415 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Tests for bulk ingest functionality using GizmoSQL."""
+
+import pyarrow as pa
+
+import adbc_driver_manager
+
+# Aliases for long constant names to keep lines under 88 chars
+INGEST_TARGET_TABLE = 
adbc_driver_manager.StatementOptions.INGEST_TARGET_TABLE.value
+INGEST_MODE = adbc_driver_manager.StatementOptions.INGEST_MODE.value
+
+
+class TestBulkIngest:
+    """Test bulk ingest functionality."""
+
+    def test_ingest_create(self, gizmosql):
+        """Test creating a new table via bulk ingest."""
+        table_name = "test_ingest_create"
+
+        # Create sample data
+        data = pa.table(
+            {
+                "id": pa.array([1, 2, 3], type=pa.int64()),
+                "name": pa.array(["Alice", "Bob", "Charlie"], 
type=pa.string()),
+                "value": pa.array([1.1, 2.2, 3.3], type=pa.float64()),
+            }
+        )
+
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            # Configure for bulk ingest - create mode
+            stmt.set_options(
+                **{
+                    INGEST_TARGET_TABLE: table_name,
+                    INGEST_MODE: "adbc.ingest.mode.create",
+                }
+            )
+
+            # Bind the data
+            stmt.bind_stream(data.to_reader())
+
+            # Execute the ingest
+            rows_affected = stmt.execute_update()
+            assert rows_affected == 3
+
+        # Verify the data was ingested by querying it back
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_sql_query(f"SELECT * FROM {table_name} ORDER BY id")
+            stream, _ = stmt.execute_query()
+            reader = pa.RecordBatchReader._import_from_c(stream.address)
+            result = reader.read_all()
+
+            assert result.num_rows == 3
+            assert result.column("id").to_pylist() == [1, 2, 3]
+            assert result.column("name").to_pylist() == ["Alice", "Bob", 
"Charlie"]
+
+    def test_ingest_append(self, gizmosql):
+        """Test appending to an existing table via bulk ingest."""
+        table_name = "test_ingest_append"
+
+        # First create the table with initial data
+        initial_data = pa.table(
+            {
+                "id": pa.array([1, 2], type=pa.int64()),
+                "value": pa.array([10, 20], type=pa.int32()),
+            }
+        )
+
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_options(
+                **{
+                    INGEST_TARGET_TABLE: table_name,
+                    INGEST_MODE: "adbc.ingest.mode.create",
+                }
+            )
+            stmt.bind_stream(initial_data.to_reader())
+            stmt.execute_update()
+
+        # Now append more data
+        append_data = pa.table(
+            {
+                "id": pa.array([3, 4], type=pa.int64()),
+                "value": pa.array([30, 40], type=pa.int32()),
+            }
+        )
+
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_options(
+                **{
+                    INGEST_TARGET_TABLE: table_name,
+                    INGEST_MODE: "adbc.ingest.mode.append",
+                }
+            )
+            stmt.bind_stream(append_data.to_reader())
+            rows_affected = stmt.execute_update()
+            assert rows_affected == 2
+
+        # Verify all data is present
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_sql_query(f"SELECT COUNT(*) as cnt FROM {table_name}")
+            stream, _ = stmt.execute_query()
+            reader = pa.RecordBatchReader._import_from_c(stream.address)
+            result = reader.read_all()
+            assert result.column("cnt")[0].as_py() == 4
+
+    def test_ingest_replace(self, gizmosql):
+        """Test replacing a table via bulk ingest."""
+        table_name = "test_ingest_replace"
+
+        # First create the table with initial data
+        initial_data = pa.table(
+            {
+                "id": pa.array([1, 2, 3], type=pa.int64()),
+            }
+        )
+
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_options(
+                **{
+                    INGEST_TARGET_TABLE: table_name,
+                    INGEST_MODE: "adbc.ingest.mode.create",
+                }
+            )
+            stmt.bind_stream(initial_data.to_reader())
+            stmt.execute_update()
+
+        # Now replace with new data
+        replace_data = pa.table(
+            {
+                "id": pa.array([100, 200], type=pa.int64()),
+            }
+        )
+
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_options(
+                **{
+                    INGEST_TARGET_TABLE: table_name,
+                    INGEST_MODE: "adbc.ingest.mode.replace",
+                }
+            )
+            stmt.bind_stream(replace_data.to_reader())
+            rows_affected = stmt.execute_update()
+            assert rows_affected == 2
+
+        # Verify only the new data is present
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_sql_query(f"SELECT * FROM {table_name} ORDER BY id")
+            stream, _ = stmt.execute_query()
+            reader = pa.RecordBatchReader._import_from_c(stream.address)
+            result = reader.read_all()
+            assert result.num_rows == 2
+            assert result.column("id").to_pylist() == [100, 200]
+
+    def test_ingest_create_append(self, gizmosql):
+        """Test create_append mode - creates if not exists, appends if 
exists."""
+        table_name = "test_ingest_create_append"
+
+        data = pa.table(
+            {
+                "id": pa.array([1, 2], type=pa.int64()),
+            }
+        )
+
+        # First call should create the table
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_options(
+                **{
+                    INGEST_TARGET_TABLE: table_name,
+                    INGEST_MODE: "adbc.ingest.mode.create_append",
+                }
+            )
+            stmt.bind_stream(data.to_reader())
+            rows_affected = stmt.execute_update()
+            assert rows_affected == 2
+
+        # Second call should append
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_options(
+                **{
+                    INGEST_TARGET_TABLE: table_name,
+                    INGEST_MODE: "adbc.ingest.mode.create_append",
+                }
+            )
+            stmt.bind_stream(data.to_reader())
+            rows_affected = stmt.execute_update()
+            assert rows_affected == 2
+
+        # Verify we have 4 rows total
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_sql_query(f"SELECT COUNT(*) as cnt FROM {table_name}")
+            stream, _ = stmt.execute_query()
+            reader = pa.RecordBatchReader._import_from_c(stream.address)
+            result = reader.read_all()
+            assert result.column("cnt")[0].as_py() == 4
+
+    def test_ingest_multiple_batches(self, gizmosql):
+        """Test ingesting data with multiple record batches."""
+        table_name = "test_ingest_multi_batch"
+
+        # Create data with multiple batches
+        schema = pa.schema(
+            [
+                ("id", pa.int64()),
+                ("value", pa.string()),
+            ]
+        )
+
+        batches = [
+            pa.record_batch(
+                [
+                    pa.array([1, 2, 3], type=pa.int64()),
+                    pa.array(["a", "b", "c"], type=pa.string()),
+                ],
+                schema=schema,
+            ),
+            pa.record_batch(
+                [
+                    pa.array([4, 5], type=pa.int64()),
+                    pa.array(["d", "e"], type=pa.string()),
+                ],
+                schema=schema,
+            ),
+        ]
+
+        reader = pa.RecordBatchReader.from_batches(schema, batches)
+
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_options(
+                **{
+                    INGEST_TARGET_TABLE: table_name,
+                    INGEST_MODE: "adbc.ingest.mode.create",
+                }
+            )
+            stmt.bind_stream(reader)
+            rows_affected = stmt.execute_update()
+            assert rows_affected == 5
+
+        # Verify all data was ingested
+        with adbc_driver_manager.AdbcStatement(gizmosql) as stmt:
+            stmt.set_sql_query(f"SELECT COUNT(*) as cnt FROM {table_name}")
+            stream, _ = stmt.execute_query()
+            reader = pa.RecordBatchReader._import_from_c(stream.address)
+            result = reader.read_all()
+            assert result.column("cnt")[0].as_py() == 5
+
+    def test_ingest_various_types(self, gizmosql):

Review Comment:
   Is this super necessary? It's more a test of gizmosql than the driver



##########
go/adbc/driver/flightsql/flightsql_statement.go:
##########
@@ -360,6 +380,47 @@ func (s *statement) SetOption(key string, val string) 
error {
                                Code: adbc.StatusInvalidArgument,
                        }
                }
+       case adbc.OptionKeyIngestTargetTable:
+               s.query.sqlQuery = ""
+               s.query.substraitPlan = nil
+               s.targetTable = val

Review Comment:
   We need to clear s.prepared too?



##########
go/adbc/driver/flightsql/flightsql_bulk_ingest.go:
##########
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package flightsql
+
+import (
+       "context"
+       "fmt"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/flight/flightsql"
+       pb "github.com/apache/arrow-go/v18/arrow/flight/gen/flight"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/metadata"
+)
+
+// ingestOptions holds the configuration for bulk ingestion operations.
+type ingestOptions struct {
+       targetTable string
+       mode        string
+       catalog     *string
+       dbSchema    *string
+       temporary   bool
+}
+
+// buildTableDefinitionOptions maps ADBC ingest modes to FlightSQL 
TableDefinitionOptions.
+func buildTableDefinitionOptions(mode string) 
*flightsql.TableDefinitionOptions {
+       opts := &flightsql.TableDefinitionOptions{}
+
+       switch mode {
+       case adbc.OptionValueIngestModeCreate:
+               // Create new table, fail if exists
+               opts.IfNotExist = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_CREATE
+               opts.IfExists = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_FAIL
+       case adbc.OptionValueIngestModeAppend:
+               // Append to existing table, fail if not exists
+               opts.IfNotExist = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_FAIL
+               opts.IfExists = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_APPEND
+       case adbc.OptionValueIngestModeReplace:
+               // Replace table if exists, create if not
+               opts.IfNotExist = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_CREATE
+               opts.IfExists = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_REPLACE
+       case adbc.OptionValueIngestModeCreateAppend:
+               // Create table if not exists, append if exists
+               opts.IfNotExist = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_CREATE
+               opts.IfExists = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_APPEND
+       default:
+               // Default to create mode
+               opts.IfNotExist = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_NOT_EXIST_OPTION_CREATE
+               opts.IfExists = 
pb.CommandStatementIngest_TableDefinitionOptions_TABLE_EXISTS_OPTION_FAIL
+       }
+
+       return opts
+}
+
+// buildExecuteIngestOpts creates ExecuteIngestOpts from ingestOptions.
+func buildExecuteIngestOpts(opts ingestOptions) *flightsql.ExecuteIngestOpts {
+       tableDefOpts := buildTableDefinitionOptions(opts.mode)
+
+       ingestOpts := &flightsql.ExecuteIngestOpts{
+               TableDefinitionOptions: tableDefOpts,
+               Table:                  opts.targetTable,
+               Temporary:              opts.temporary,
+       }
+
+       if opts.catalog != nil {
+               ingestOpts.Catalog = opts.catalog
+       }
+       if opts.dbSchema != nil {
+               ingestOpts.Schema = opts.dbSchema
+       }
+
+       return ingestOpts
+}
+
+// executeIngestWithReader performs the bulk ingest operation with the given 
record reader.
+func executeIngestWithReader(
+       ctx context.Context,
+       client *flightsql.Client,
+       rdr array.RecordReader,
+       opts *flightsql.ExecuteIngestOpts,
+       callOpts ...grpc.CallOption,
+) (int64, error) {
+       return client.ExecuteIngest(ctx, rdr, opts, callOpts...)
+}

Review Comment:
   This is a function with one line that is only ever used once; just inline it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to