This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new db392368 feat(c/driver): support target catalog/schema for ingestion 
(#1056)
db392368 is described below

commit db392368d6259f59d1b1165056e229052a00c6c0
Author: David Li <[email protected]>
AuthorDate: Tue Sep 12 15:40:34 2023 -0400

    feat(c/driver): support target catalog/schema for ingestion (#1056)
    
    - Support target catalog/schema options for ingestion
    - Fix escaping in SQLite, PostgreSQL
    
    Fixes #1000.
---
 c/driver/{sqlite/types.h => common/options.h}      |  53 +++-----
 c/driver/flightsql/dremio_flightsql_test.cc        |   3 +
 c/driver/flightsql/sqlite_flightsql_test.cc        |   3 +
 c/driver/postgresql/postgresql_test.cc             |   4 +-
 c/driver/postgresql/statement.cc                   |  68 ++++++++--
 c/driver/postgresql/statement.h                    |   3 +-
 c/driver/sqlite/sqlite.c                           |  50 +++++--
 c/driver/sqlite/sqlite_test.cc                     |   5 +-
 c/driver/sqlite/types.h                            |   1 +
 c/integration/duckdb/duckdb_test.cc                |   3 +
 c/validation/adbc_validation.cc                    | 147 ++++++++++++++++++++-
 c/validation/adbc_validation.h                     |  14 ++
 .../adbc_driver_manager/__init__.py                |   6 +
 .../adbc_driver_manager/dbapi.py                   |  29 +++-
 python/adbc_driver_postgresql/tests/test_dbapi.py  |  13 ++
 python/adbc_driver_sqlite/tests/test_dbapi.py      |  18 +++
 r/adbcpostgresql/bootstrap.R                       |   3 +
 r/adbcsqlite/bootstrap.R                           |   5 +-
 18 files changed, 354 insertions(+), 74 deletions(-)

diff --git a/c/driver/sqlite/types.h b/c/driver/common/options.h
similarity index 53%
copy from c/driver/sqlite/types.h
copy to c/driver/common/options.h
index cd46f4f3..f8b64efa 100644
--- a/c/driver/sqlite/types.h
+++ b/c/driver/common/options.h
@@ -15,44 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#pragma once
-
-#include <stddef.h>
-#include <stdint.h>
-
-#include <adbc.h>
-#include <sqlite3.h>
-
-#include "statement_reader.h"
+/// Common options that haven't yet been formally standardized.
+/// https://github.com/apache/arrow-adbc/issues/1055
 
-struct SqliteDatabase {
-  sqlite3* db;
-  char* uri;
-  size_t connection_count;
-};
-
-struct SqliteConnection {
-  sqlite3* conn;
-  char active_transaction;
-};
-
-struct SqliteStatement {
-  sqlite3* conn;
-
-  // -- Query state -----------------------------------------
+#pragma once
 
-  sqlite3_stmt* stmt;
-  char prepared;
-  char* query;
-  size_t query_len;
+#ifdef __cplusplus
+extern "C" {
+#endif
 
-  // -- Bind state ------------------------------------------
-  struct AdbcSqliteBinder binder;
+/// \brief The catalog of the table for bulk insert.
+///
+/// The type is char*.
+#define ADBC_INGEST_OPTION_TARGET_CATALOG "adbc.ingest.target_catalog"
 
-  // -- Ingest state ----------------------------------------
-  char* target_table;
-  char append;
+/// \brief The schema of the table for bulk insert.
+///
+/// The type is char*.
+#define ADBC_INGEST_OPTION_TARGET_DB_SCHEMA "adbc.ingest.target_db_schema"
 
-  // -- Query options ---------------------------------------
-  int batch_size;
-};
+#ifdef __cplusplus
+}
+#endif
diff --git a/c/driver/flightsql/dremio_flightsql_test.cc 
b/c/driver/flightsql/dremio_flightsql_test.cc
index 416b8aea..52c184f3 100644
--- a/c/driver/flightsql/dremio_flightsql_test.cc
+++ b/c/driver/flightsql/dremio_flightsql_test.cc
@@ -89,6 +89,9 @@ class DremioFlightSqlStatementTest : public ::testing::Test,
 
   void TestResultInvalidation() { GTEST_SKIP() << "Dremio generates a 
CANCELLED"; }
   void TestSqlIngestTableEscaping() { GTEST_SKIP() << "Table escaping not 
implemented"; }
+  void TestSqlIngestColumnEscaping() {
+    GTEST_SKIP() << "Column escaping not implemented";
+  }
 
  protected:
   DremioFlightSqlQuirks quirks_;
diff --git a/c/driver/flightsql/sqlite_flightsql_test.cc 
b/c/driver/flightsql/sqlite_flightsql_test.cc
index 46ca69be..41a7dedf 100644
--- a/c/driver/flightsql/sqlite_flightsql_test.cc
+++ b/c/driver/flightsql/sqlite_flightsql_test.cc
@@ -268,6 +268,9 @@ class SqliteFlightSqlStatementTest : public ::testing::Test,
   void TearDown() override { ASSERT_NO_FATAL_FAILURE(TearDownTest()); }
 
   void TestSqlIngestTableEscaping() { GTEST_SKIP() << "Table escaping not 
implemented"; }
+  void TestSqlIngestColumnEscaping() {
+    GTEST_SKIP() << "Column escaping not implemented";
+  }
   void TestSqlIngestInterval() {
     GTEST_SKIP() << "Cannot ingest Interval (not implemented)";
   }
diff --git a/c/driver/postgresql/postgresql_test.cc 
b/c/driver/postgresql/postgresql_test.cc
index d4449981..4328f8e3 100644
--- a/c/driver/postgresql/postgresql_test.cc
+++ b/c/driver/postgresql/postgresql_test.cc
@@ -55,7 +55,7 @@ class PostgresQuirks : public adbc_validation::DriverQuirks {
     AdbcStatusCode status = AdbcStatementNew(connection, &statement, error);
     if (status != ADBC_STATUS_OK) return status;
 
-    std::string query = "DROP TABLE IF EXISTS " + name;
+    std::string query = "DROP TABLE IF EXISTS \"" + name + "\"";
     status = AdbcStatementSetSqlQuery(&statement, query.c_str(), error);
     if (status != ADBC_STATUS_OK) {
       std::ignore = AdbcStatementRelease(&statement, error);
@@ -111,6 +111,8 @@ class PostgresQuirks : public adbc_validation::DriverQuirks 
{
   std::string catalog() const override { return "postgres"; }
   std::string db_schema() const override { return "public"; }
 
+  bool supports_bulk_ingest_catalog() const override { return false; }
+  bool supports_bulk_ingest_db_schema() const override { return true; }
   bool supports_cancel() const override { return true; }
   bool supports_execute_schema() const override { return true; }
   std::optional<adbc_validation::SqlInfoValue> supports_get_sql_info(
diff --git a/c/driver/postgresql/statement.cc b/c/driver/postgresql/statement.cc
index 28069379..9fbbcaf9 100644
--- a/c/driver/postgresql/statement.cc
+++ b/c/driver/postgresql/statement.cc
@@ -31,6 +31,7 @@
 #include <libpq-fe.h>
 #include <nanoarrow/nanoarrow.hpp>
 
+#include "common/options.h"
 #include "common/utils.h"
 #include "connection.h"
 #include "error.h"
@@ -831,7 +832,36 @@ AdbcStatusCode PostgresStatement::Cancel(struct AdbcError* 
error) {
 AdbcStatusCode PostgresStatement::CreateBulkTable(
     const struct ArrowSchema& source_schema,
     const std::vector<struct ArrowSchemaView>& source_schema_fields,
-    struct AdbcError* error) {
+    std::string* escaped_table, struct AdbcError* error) {
+  PGconn* conn = connection_->conn();
+
+  {
+    if (!ingest_.db_schema.empty()) {
+      char* escaped =
+          PQescapeIdentifier(conn, ingest_.db_schema.c_str(), 
ingest_.db_schema.size());
+      if (escaped == nullptr) {
+        SetError(error, "[libpq] Failed to escape target schema %s for 
ingestion: %s",
+                 ingest_.db_schema.c_str(), PQerrorMessage(conn));
+        return ADBC_STATUS_INTERNAL;
+      }
+      *escaped_table += escaped;
+      *escaped_table += " . ";
+      PQfreemem(escaped);
+    }
+
+    if (!ingest_.target.empty()) {
+      char* escaped =
+          PQescapeIdentifier(conn, ingest_.target.c_str(), 
ingest_.target.size());
+      if (escaped == nullptr) {
+        SetError(error, "[libpq] Failed to escape target table %s for 
ingestion: %s",
+                 ingest_.target.c_str(), PQerrorMessage(conn));
+        return ADBC_STATUS_INTERNAL;
+      }
+      *escaped_table += escaped;
+      PQfreemem(escaped);
+    }
+  }
+
   std::string create = "CREATE TABLE ";
   switch (ingest_.mode) {
     case IngestMode::kCreate:
@@ -840,15 +870,15 @@ AdbcStatusCode PostgresStatement::CreateBulkTable(
     case IngestMode::kAppend:
       return ADBC_STATUS_OK;
     case IngestMode::kReplace: {
-      std::string drop = "DROP TABLE IF EXISTS " + ingest_.target;
-      PGresult* result = PQexecParams(connection_->conn(), drop.c_str(), 
/*nParams=*/0,
+      std::string drop = "DROP TABLE IF EXISTS " + *escaped_table;
+      PGresult* result = PQexecParams(conn, drop.c_str(), /*nParams=*/0,
                                       /*paramTypes=*/nullptr, 
/*paramValues=*/nullptr,
                                       /*paramLengths=*/nullptr, 
/*paramFormats=*/nullptr,
                                       /*resultFormat=*/1 /*(binary)*/);
       if (PQresultStatus(result) != PGRES_COMMAND_OK) {
         AdbcStatusCode code =
             SetError(error, result, "[libpq] Failed to drop table: %s\nQuery 
was: %s",
-                     PQerrorMessage(connection_->conn()), drop.c_str());
+                     PQerrorMessage(conn), drop.c_str());
         PQclear(result);
         return code;
       }
@@ -859,12 +889,22 @@ AdbcStatusCode PostgresStatement::CreateBulkTable(
       create += "IF NOT EXISTS ";
       break;
   }
-  create += ingest_.target;
+  create += *escaped_table;
   create += " (";
 
   for (size_t i = 0; i < source_schema_fields.size(); i++) {
     if (i > 0) create += ", ";
-    create += source_schema.children[i]->name;
+
+    const char* unescaped = source_schema.children[i]->name;
+    char* escaped = PQescapeIdentifier(conn, unescaped, 
std::strlen(unescaped));
+    if (escaped == nullptr) {
+      SetError(error, "[libpq] Failed to escape column %s for ingestion: %s", 
unescaped,
+               PQerrorMessage(conn));
+      return ADBC_STATUS_INTERNAL;
+    }
+    create += escaped;
+    PQfreemem(escaped);
+
     switch (source_schema_fields[i].type) {
       case ArrowType::NANOARROW_TYPE_INT8:
       case ArrowType::NANOARROW_TYPE_INT16:
@@ -914,14 +954,14 @@ AdbcStatusCode PostgresStatement::CreateBulkTable(
 
   create += ")";
   SetError(error, "%s%s", "[libpq] ", create.c_str());
-  PGresult* result = PQexecParams(connection_->conn(), create.c_str(), 
/*nParams=*/0,
+  PGresult* result = PQexecParams(conn, create.c_str(), /*nParams=*/0,
                                   /*paramTypes=*/nullptr, 
/*paramValues=*/nullptr,
                                   /*paramLengths=*/nullptr, 
/*paramFormats=*/nullptr,
                                   /*resultFormat=*/1 /*(binary)*/);
   if (PQresultStatus(result) != PGRES_COMMAND_OK) {
     AdbcStatusCode code =
         SetError(error, result, "[libpq] Failed to create table: %s\nQuery 
was: %s",
-                 PQerrorMessage(connection_->conn()), create.c_str());
+                 PQerrorMessage(conn), create.c_str());
     PQclear(result);
     return code;
   }
@@ -1060,16 +1100,17 @@ AdbcStatusCode 
PostgresStatement::ExecuteUpdateBulk(int64_t* rows_affected,
 
   BindStream bind_stream(std::move(bind_));
   std::memset(&bind_, 0, sizeof(bind_));
+  std::string escaped_table;
   RAISE_ADBC(bind_stream.Begin(
       [&]() -> AdbcStatusCode {
         return CreateBulkTable(bind_stream.bind_schema.value,
-                               bind_stream.bind_schema_fields, error);
+                               bind_stream.bind_schema_fields, &escaped_table, 
error);
       },
       error));
   RAISE_ADBC(bind_stream.SetParamTypes(*type_resolver_, error));
 
   std::string insert = "INSERT INTO ";
-  insert += ingest_.target;
+  insert += escaped_table;
   insert += " VALUES (";
   for (size_t i = 0; i < bind_stream.bind_schema_fields.size(); i++) {
     if (i > 0) insert += ", ";
@@ -1109,6 +1150,8 @@ AdbcStatusCode PostgresStatement::GetOption(const char* 
key, char* value, size_t
   std::string result;
   if (std::strcmp(key, ADBC_INGEST_OPTION_TARGET_TABLE) == 0) {
     result = ingest_.target;
+  } else if (std::strcmp(key, ADBC_INGEST_OPTION_TARGET_DB_SCHEMA) == 0) {
+    result = ingest_.db_schema;
   } else if (std::strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) {
     switch (ingest_.mode) {
       case IngestMode::kCreate:
@@ -1190,6 +1233,7 @@ AdbcStatusCode PostgresStatement::Release(struct 
AdbcError* error) {
 AdbcStatusCode PostgresStatement::SetSqlQuery(const char* query,
                                               struct AdbcError* error) {
   ingest_.target.clear();
+  ingest_.db_schema.clear();
   query_ = query;
   prepared_ = false;
   return ADBC_STATUS_OK;
@@ -1201,6 +1245,10 @@ AdbcStatusCode PostgresStatement::SetOption(const char* 
key, const char* value,
     query_.clear();
     ingest_.target = value;
     prepared_ = false;
+  } else if (std::strcmp(key, ADBC_INGEST_OPTION_TARGET_DB_SCHEMA) == 0) {
+    query_.clear();
+    ingest_.db_schema = value;
+    prepared_ = false;
   } else if (std::strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) {
     if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_CREATE) == 0) {
       ingest_.mode = IngestMode::kCreate;
diff --git a/c/driver/postgresql/statement.h b/c/driver/postgresql/statement.h
index 59d3032c..013334fe 100644
--- a/c/driver/postgresql/statement.h
+++ b/c/driver/postgresql/statement.h
@@ -128,7 +128,7 @@ class PostgresStatement {
   AdbcStatusCode CreateBulkTable(
       const struct ArrowSchema& source_schema,
       const std::vector<struct ArrowSchemaView>& source_schema_fields,
-      struct AdbcError* error);
+      std::string* escaped_table, struct AdbcError* error);
   AdbcStatusCode ExecuteUpdateBulk(int64_t* rows_affected, struct AdbcError* 
error);
   AdbcStatusCode ExecuteUpdateQuery(int64_t* rows_affected, struct AdbcError* 
error);
   AdbcStatusCode ExecutePreparedStatement(struct ArrowArrayStream* stream,
@@ -154,6 +154,7 @@ class PostgresStatement {
   };
 
   struct {
+    std::string db_schema;
     std::string target;
     IngestMode mode = IngestMode::kCreate;
   } ingest_;
diff --git a/c/driver/sqlite/sqlite.c b/c/driver/sqlite/sqlite.c
index 21a53502..2c46976a 100644
--- a/c/driver/sqlite/sqlite.c
+++ b/c/driver/sqlite/sqlite.c
@@ -28,6 +28,7 @@
 #include <nanoarrow/nanoarrow.h>
 #include <sqlite3.h>
 
+#include "common/options.h"
 #include "common/utils.h"
 #include "statement_reader.h"
 #include "types.h"
@@ -1025,6 +1026,7 @@ AdbcStatusCode SqliteStatementRelease(struct 
AdbcStatement* statement,
   }
   if (stmt->query) free(stmt->query);
   AdbcSqliteBinderRelease(&stmt->binder);
+  if (stmt->target_catalog) free(stmt->target_catalog);
   if (stmt->target_table) free(stmt->target_table);
   if (rc != SQLITE_OK) {
     SetError(error,
@@ -1079,29 +1081,38 @@ AdbcStatusCode SqliteStatementInitIngest(struct 
SqliteStatement* stmt,
   AdbcStatusCode code = ADBC_STATUS_OK;
 
   // Create statements for CREATE TABLE / INSERT
-  sqlite3_str* create_query = sqlite3_str_new(NULL);
+  sqlite3_str* create_query = NULL;
+  sqlite3_str* insert_query = NULL;
+  char* table = NULL;
+
+  create_query = sqlite3_str_new(NULL);
   if (sqlite3_str_errcode(create_query)) {
     SetError(error, "[SQLite] %s", sqlite3_errmsg(stmt->conn));
-    sqlite3_free(sqlite3_str_finish(create_query));
-    return ADBC_STATUS_INTERNAL;
+    code = ADBC_STATUS_INTERNAL;
+    goto cleanup;
   }
 
-  sqlite3_str* insert_query = sqlite3_str_new(NULL);
+  insert_query = sqlite3_str_new(NULL);
   if (sqlite3_str_errcode(insert_query)) {
     SetError(error, "[SQLite] %s", sqlite3_errmsg(stmt->conn));
-    sqlite3_free(sqlite3_str_finish(create_query));
-    sqlite3_free(sqlite3_str_finish(insert_query));
-    return ADBC_STATUS_INTERNAL;
+    code = ADBC_STATUS_INTERNAL;
+    goto cleanup;
+  }
+
+  if (stmt->target_catalog != NULL) {
+    table = sqlite3_mprintf("\"%w\" . \"%w\"", stmt->target_catalog, 
stmt->target_table);
+  } else {
+    table = sqlite3_mprintf("\"%w\"", stmt->target_table);
   }
 
-  sqlite3_str_appendf(create_query, "CREATE TABLE %Q (", stmt->target_table);
+  sqlite3_str_appendf(create_query, "CREATE TABLE %s (", table);
   if (sqlite3_str_errcode(create_query)) {
     SetError(error, "[SQLite] Failed to build CREATE: %s", 
sqlite3_errmsg(stmt->conn));
     code = ADBC_STATUS_INTERNAL;
     goto cleanup;
   }
 
-  sqlite3_str_appendf(insert_query, "INSERT INTO %Q VALUES (", 
stmt->target_table);
+  sqlite3_str_appendf(insert_query, "INSERT INTO %s VALUES (", table);
   if (sqlite3_str_errcode(insert_query)) {
     SetError(error, "[SQLite] Failed to build INSERT: %s", 
sqlite3_errmsg(stmt->conn));
     code = ADBC_STATUS_INTERNAL;
@@ -1121,7 +1132,7 @@ AdbcStatusCode SqliteStatementInitIngest(struct 
SqliteStatement* stmt,
       }
     }
 
-    sqlite3_str_appendf(create_query, "%Q", 
stmt->binder.schema.children[i]->name);
+    sqlite3_str_appendf(create_query, "\"%w\"", 
stmt->binder.schema.children[i]->name);
     if (sqlite3_str_errcode(create_query)) {
       SetError(error, "[SQLite] Failed to build CREATE: %s", 
sqlite3_errmsg(stmt->conn));
       code = ADBC_STATUS_INTERNAL;
@@ -1221,6 +1232,7 @@ AdbcStatusCode SqliteStatementInitIngest(struct 
SqliteStatement* stmt,
 cleanup:
   sqlite3_free(sqlite3_str_finish(create_query));
   sqlite3_free(sqlite3_str_finish(insert_query));
+  if (table != NULL) sqlite3_free(table);
   return code;
 }
 
@@ -1347,6 +1359,10 @@ AdbcStatusCode SqliteStatementSetSqlQuery(struct 
AdbcStatement* statement,
     free(stmt->query);
     stmt->query = NULL;
   }
+  if (stmt->target_catalog) {
+    free(stmt->target_catalog);
+    stmt->target_catalog = NULL;
+  }
   if (stmt->target_table) {
     free(stmt->target_table);
     stmt->target_table = NULL;
@@ -1462,6 +1478,20 @@ AdbcStatusCode SqliteStatementSetOption(struct 
AdbcStatement* statement, const c
     stmt->target_table = (char*)malloc(len);
     strncpy(stmt->target_table, value, len);
     return ADBC_STATUS_OK;
+  } else if (strcmp(key, ADBC_INGEST_OPTION_TARGET_CATALOG) == 0) {
+    if (stmt->query) {
+      free(stmt->query);
+      stmt->query = NULL;
+    }
+    if (stmt->target_catalog) {
+      free(stmt->target_catalog);
+      stmt->target_catalog = NULL;
+    }
+
+    size_t len = strlen(value) + 1;
+    stmt->target_catalog = (char*)malloc(len);
+    strncpy(stmt->target_catalog, value, len);
+    return ADBC_STATUS_OK;
   } else if (strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) {
     if (strcmp(value, ADBC_INGEST_OPTION_MODE_APPEND) == 0) {
       stmt->append = 1;
diff --git a/c/driver/sqlite/sqlite_test.cc b/c/driver/sqlite/sqlite_test.cc
index 9f219126..5fdb8628 100644
--- a/c/driver/sqlite/sqlite_test.cc
+++ b/c/driver/sqlite/sqlite_test.cc
@@ -50,7 +50,7 @@ class SqliteQuirks : public adbc_validation::DriverQuirks {
     AdbcStatusCode status = AdbcStatementNew(connection, &statement, error);
     if (status != ADBC_STATUS_OK) return status;
 
-    std::string query = "DROP TABLE IF EXISTS " + name;
+    std::string query = "DROP TABLE IF EXISTS \"" + name + "\"";
     status = AdbcStatementSetSqlQuery(&statement, query.c_str(), error);
     if (status != ADBC_STATUS_OK) {
       std::ignore = AdbcStatementRelease(&statement, error);
@@ -97,6 +97,7 @@ class SqliteQuirks : public adbc_validation::DriverQuirks {
     return std::strcmp(mode, ADBC_INGEST_OPTION_MODE_APPEND) == 0 ||
            std::strcmp(mode, ADBC_INGEST_OPTION_MODE_CREATE) == 0;
   }
+  bool supports_bulk_ingest_catalog() const override { return true; }
   bool supports_concurrent_statements() const override { return true; }
   bool supports_get_option() const override { return false; }
   std::optional<adbc_validation::SqlInfoValue> supports_get_sql_info(
@@ -268,7 +269,7 @@ class SqliteStatementTest : public ::testing::Test,
 ADBCV_TEST_STATEMENT(SqliteStatementTest)
 
 TEST_F(SqliteStatementTest, SqlIngestNameEscaping) {
-  ASSERT_THAT(quirks()->DropTable(&connection, "\"test-table\"", &error),
+  ASSERT_THAT(quirks()->DropTable(&connection, "test-table", &error),
               adbc_validation::IsOkStatus(&error));
 
   std::string table = "test-table";
diff --git a/c/driver/sqlite/types.h b/c/driver/sqlite/types.h
index cd46f4f3..805aed46 100644
--- a/c/driver/sqlite/types.h
+++ b/c/driver/sqlite/types.h
@@ -50,6 +50,7 @@ struct SqliteStatement {
   struct AdbcSqliteBinder binder;
 
   // -- Ingest state ----------------------------------------
+  char* target_catalog;
   char* target_table;
   char append;
 
diff --git a/c/integration/duckdb/duckdb_test.cc 
b/c/integration/duckdb/duckdb_test.cc
index a373abd8..a6bded03 100644
--- a/c/integration/duckdb/duckdb_test.cc
+++ b/c/integration/duckdb/duckdb_test.cc
@@ -96,6 +96,9 @@ class DuckDbStatementTest : public ::testing::Test,
   void TestSqlPrepareErrorNoQuery() { GTEST_SKIP(); }
 
   void TestSqlIngestTableEscaping() { GTEST_SKIP() << "Table escaping not 
implemented"; }
+  void TestSqlIngestColumnEscaping() {
+    GTEST_SKIP() << "Column escaping not implemented";
+  }
 
   void TestSqlQueryErrors() { GTEST_SKIP() << "DuckDB does not set 
AdbcError.release"; }
 
diff --git a/c/validation/adbc_validation.cc b/c/validation/adbc_validation.cc
index 595ffd1f..beebb177 100644
--- a/c/validation/adbc_validation.cc
+++ b/c/validation/adbc_validation.cc
@@ -37,6 +37,7 @@
 #include <nanoarrow/nanoarrow.hpp>
 
 #include "adbc_validation_util.h"
+#include "common/options.h"
 
 namespace adbc_validation {
 
@@ -1484,8 +1485,7 @@ void StatementTest::TestSqlIngestInterval() {
 void StatementTest::TestSqlIngestTableEscaping() {
   std::string name = "create_table_escaping";
 
-  ASSERT_THAT(quirks()->DropTable(&connection, name, &error),
-              adbc_validation::IsOkStatus(&error));
+  ASSERT_THAT(quirks()->DropTable(&connection, name, &error), 
IsOkStatus(&error));
   Handle<struct ArrowSchema> schema;
   Handle<struct ArrowArray> array;
   struct ArrowError na_error;
@@ -1495,15 +1495,41 @@ void StatementTest::TestSqlIngestTableEscaping() {
               IsOkErrno());
 
   Handle<struct AdbcStatement> statement;
-  ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error), 
IsOkErrno());
+  ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
+              IsOkStatus(&error));
   ASSERT_THAT(AdbcStatementSetOption(&statement.value, 
ADBC_INGEST_OPTION_TARGET_TABLE,
                                      name.c_str(), &error),
-              IsOkErrno());
+              IsOkStatus(&error));
   ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, 
&error),
-              IsOkErrno());
+              IsOkStatus(&error));
   ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, 
&error),
+              IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), 
IsOkStatus(&error));
+}
+
+void StatementTest::TestSqlIngestColumnEscaping() {
+  std::string name = "create";
+
+  ASSERT_THAT(quirks()->DropTable(&connection, name, &error), 
IsOkStatus(&error));
+  Handle<struct ArrowSchema> schema;
+  Handle<struct ArrowArray> array;
+  struct ArrowError na_error;
+  ASSERT_THAT(MakeSchema(&schema.value, {{"index", NANOARROW_TYPE_INT64}}), 
IsOkErrno());
+  ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+                                  {42, -42, std::nullopt})),
               IsOkErrno());
-  ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkErrno());
+
+  Handle<struct AdbcStatement> statement;
+  ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
+              IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementSetOption(&statement.value, 
ADBC_INGEST_OPTION_TARGET_TABLE,
+                                     name.c_str(), &error),
+              IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, 
&error),
+              IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, 
&error),
+              IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), 
IsOkStatus(&error));
 }
 
 void StatementTest::TestSqlIngestAppend() {
@@ -1952,6 +1978,115 @@ void StatementTest::TestSqlIngestSample() {
   ASSERT_EQ(nullptr, reader.array->release);
 }
 
+void StatementTest::TestSqlIngestTargetCatalog() {
+  if (!quirks()->supports_bulk_ingest_catalog() ||
+      !quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
+    GTEST_SKIP();
+  }
+
+  std::string catalog = quirks()->catalog();
+  std::string name = "bulk_ingest";
+
+  ASSERT_THAT(quirks()->DropTable(&connection, name, &error), 
IsOkStatus(&error));
+  Handle<struct ArrowSchema> schema;
+  Handle<struct ArrowArray> array;
+  struct ArrowError na_error;
+  ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}), 
IsOkErrno());
+  ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+                                  {42, -42, std::nullopt})),
+              IsOkErrno());
+
+  Handle<struct AdbcStatement> statement;
+  ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
+              IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementSetOption(&statement.value, 
ADBC_INGEST_OPTION_TARGET_CATALOG,
+                                     catalog.c_str(), &error),
+              IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementSetOption(&statement.value, 
ADBC_INGEST_OPTION_TARGET_TABLE,
+                                     name.c_str(), &error),
+              IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, 
&error),
+              IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, 
&error),
+              IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), 
IsOkStatus(&error));
+}
+
+void StatementTest::TestSqlIngestTargetSchema() {
+  if (!quirks()->supports_bulk_ingest_db_schema() ||
+      !quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
+    GTEST_SKIP();
+  }
+
+  std::string db_schema = quirks()->db_schema();
+  std::string name = "bulk_ingest";
+
+  ASSERT_THAT(quirks()->DropTable(&connection, name, &error), 
IsOkStatus(&error));
+  Handle<struct ArrowSchema> schema;
+  Handle<struct ArrowArray> array;
+  struct ArrowError na_error;
+  ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}), 
IsOkErrno());
+  ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+                                  {42, -42, std::nullopt})),
+              IsOkErrno());
+
+  Handle<struct AdbcStatement> statement;
+  ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
+              IsOkStatus(&error));
+  ASSERT_THAT(
+      AdbcStatementSetOption(&statement.value, 
ADBC_INGEST_OPTION_TARGET_DB_SCHEMA,
+                             db_schema.c_str(), &error),
+      IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementSetOption(&statement.value, 
ADBC_INGEST_OPTION_TARGET_TABLE,
+                                     name.c_str(), &error),
+              IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, 
&error),
+              IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, 
&error),
+              IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), 
IsOkStatus(&error));
+}
+
+void StatementTest::TestSqlIngestTargetCatalogSchema() {
+  if (!quirks()->supports_bulk_ingest_catalog() ||
+      !quirks()->supports_bulk_ingest_db_schema() ||
+      !quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
+    GTEST_SKIP();
+  }
+
+  std::string catalog = quirks()->catalog();
+  std::string db_schema = quirks()->db_schema();
+  std::string name = "bulk_ingest";
+
+  ASSERT_THAT(quirks()->DropTable(&connection, name, &error), 
IsOkStatus(&error));
+  Handle<struct ArrowSchema> schema;
+  Handle<struct ArrowArray> array;
+  struct ArrowError na_error;
+  ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}), 
IsOkErrno());
+  ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+                                  {42, -42, std::nullopt})),
+              IsOkErrno());
+
+  Handle<struct AdbcStatement> statement;
+  ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
+              IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementSetOption(&statement.value, 
ADBC_INGEST_OPTION_TARGET_CATALOG,
+                                     catalog.c_str(), &error),
+              IsOkStatus(&error));
+  ASSERT_THAT(
+      AdbcStatementSetOption(&statement.value, 
ADBC_INGEST_OPTION_TARGET_DB_SCHEMA,
+                             db_schema.c_str(), &error),
+      IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementSetOption(&statement.value, 
ADBC_INGEST_OPTION_TARGET_TABLE,
+                                     name.c_str(), &error),
+              IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, 
&error),
+              IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, 
&error),
+              IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), 
IsOkStatus(&error));
+}
+
 void StatementTest::TestSqlPartitionedInts() {
   ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), 
IsOkStatus(&error));
   ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 42", &error),
diff --git a/c/validation/adbc_validation.h b/c/validation/adbc_validation.h
index 354e695c..56f44ba0 100644
--- a/c/validation/adbc_validation.h
+++ b/c/validation/adbc_validation.h
@@ -91,6 +91,12 @@ class DriverQuirks {
   /// \brief Whether bulk ingest is supported
   virtual bool supports_bulk_ingest(const char* mode) const { return true; }
 
+  /// \brief Whether bulk ingest to a specific catalog is supported
+  virtual bool supports_bulk_ingest_catalog() const { return false; }
+
+  /// \brief Whether bulk ingest to a specific schema is supported
+  virtual bool supports_bulk_ingest_db_schema() const { return false; }
+
   /// \brief Whether we can cancel queries.
   virtual bool supports_cancel() const { return false; }
 
@@ -284,12 +290,16 @@ class StatementTest {
   // ---- End Type-specific tests ----------------
 
   void TestSqlIngestTableEscaping();
+  void TestSqlIngestColumnEscaping();
   void TestSqlIngestAppend();
   void TestSqlIngestReplace();
   void TestSqlIngestCreateAppend();
   void TestSqlIngestErrors();
   void TestSqlIngestMultipleConnections();
   void TestSqlIngestSample();
+  void TestSqlIngestTargetCatalog();
+  void TestSqlIngestTargetSchema();
+  void TestSqlIngestTargetCatalogSchema();
 
   void TestSqlPartitionedInts();
 
@@ -365,12 +375,16 @@ class StatementTest {
   TEST_F(FIXTURE, SqlIngestTimestampTz) { TestSqlIngestTimestampTz(); }        
         \
   TEST_F(FIXTURE, SqlIngestInterval) { TestSqlIngestInterval(); }              
         \
   TEST_F(FIXTURE, SqlIngestTableEscaping) { TestSqlIngestTableEscaping(); }    
         \
+  TEST_F(FIXTURE, SqlIngestColumnEscaping) { TestSqlIngestColumnEscaping(); }  
         \
   TEST_F(FIXTURE, SqlIngestAppend) { TestSqlIngestAppend(); }                  
         \
   TEST_F(FIXTURE, SqlIngestReplace) { TestSqlIngestReplace(); }                
         \
   TEST_F(FIXTURE, SqlIngestCreateAppend) { TestSqlIngestCreateAppend(); }      
         \
   TEST_F(FIXTURE, SqlIngestErrors) { TestSqlIngestErrors(); }                  
         \
   TEST_F(FIXTURE, SqlIngestMultipleConnections) { 
TestSqlIngestMultipleConnections(); } \
   TEST_F(FIXTURE, SqlIngestSample) { TestSqlIngestSample(); }                  
         \
+  TEST_F(FIXTURE, SqlIngestTargetCatalog) { TestSqlIngestTargetCatalog(); }    
         \
+  TEST_F(FIXTURE, SqlIngestTargetSchema) { TestSqlIngestTargetSchema(); }      
         \
+  TEST_F(FIXTURE, SqlIngestTargetCatalogSchema) { 
TestSqlIngestTargetCatalogSchema(); } \
   TEST_F(FIXTURE, SqlPartitionedInts) { TestSqlPartitionedInts(); }            
         \
   TEST_F(FIXTURE, SqlPrepareGetParameterSchema) { 
TestSqlPrepareGetParameterSchema(); } \
   TEST_F(FIXTURE, SqlPrepareSelectNoParams) { TestSqlPrepareSelectNoParams(); 
}         \
diff --git a/python/adbc_driver_manager/adbc_driver_manager/__init__.py 
b/python/adbc_driver_manager/adbc_driver_manager/__init__.py
index 25b821eb..336fdb21 100644
--- a/python/adbc_driver_manager/adbc_driver_manager/__init__.py
+++ b/python/adbc_driver_manager/adbc_driver_manager/__init__.py
@@ -122,5 +122,11 @@ class StatementOptions(enum.Enum):
     INGEST_MODE = INGEST_OPTION_MODE
     #: For bulk ingestion, the table to ingest into.
     INGEST_TARGET_TABLE = INGEST_OPTION_TARGET_TABLE
+    #: For bulk ingestion, the catalog to create/locate the table in.
+    #: **This API is EXPERIMENTAL.**
+    INGEST_TARGET_CATALOG = "adbc.ingest.target_catalog"
+    #: For bulk ingestion, the schema to create/locate the table in.
+    #: **This API is EXPERIMENTAL.**
+    INGEST_TARGET_DB_SCHEMA = "adbc.ingest.target_db_schema"
     #: Get progress of a query.
     PROGRESS = "adbc.statement.exec.progress"
diff --git a/python/adbc_driver_manager/adbc_driver_manager/dbapi.py 
b/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
index f28fbf0e..f358a0fa 100644
--- a/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
+++ b/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
@@ -792,6 +792,9 @@ class Cursor(_Closeable):
         table_name: str,
         data: Union[pyarrow.RecordBatch, pyarrow.Table, 
pyarrow.RecordBatchReader],
         mode: Literal["append", "create", "replace", "create_append"] = 
"create",
+        *,
+        catalog_name: Optional[str] = None,
+        db_schema_name: Optional[str] = None,
     ) -> int:
         """
         Ingest Arrow data into a database table.
@@ -812,6 +815,12 @@ class Cursor(_Closeable):
             - 'create': create a table and insert (error if table exists)
             - 'create_append': create a table (if not exists) and insert
             - 'replace': drop existing table (if any), then same as 'create'
+        catalog_name
+            If given, the catalog to create/locate the table in.
+            **This API is EXPERIMENTAL.**
+        db_schema_name
+            If given, the schema to create/locate the table in.
+            **This API is EXPERIMENTAL.**
 
         Returns
         -------
@@ -833,12 +842,20 @@ class Cursor(_Closeable):
             c_mode = _lib.INGEST_OPTION_MODE_REPLACE
         else:
             raise ValueError(f"Invalid value for 'mode': {mode}")
-        self._stmt.set_options(
-            **{
-                _lib.INGEST_OPTION_TARGET_TABLE: table_name,
-                _lib.INGEST_OPTION_MODE: c_mode,
-            }
-        )
+
+        options = {
+            _lib.INGEST_OPTION_TARGET_TABLE: table_name,
+            _lib.INGEST_OPTION_MODE: c_mode,
+        }
+        if catalog_name is not None:
+            options[
+                
adbc_driver_manager.StatementOptions.INGEST_TARGET_CATALOG.value
+            ] = catalog_name
+        if db_schema_name is not None:
+            options[
+                
adbc_driver_manager.StatementOptions.INGEST_TARGET_DB_SCHEMA.value
+            ] = db_schema_name
+        self._stmt.set_options(**options)
 
         if isinstance(data, pyarrow.RecordBatch):
             array = _lib.ArrowArrayHandle()
diff --git a/python/adbc_driver_postgresql/tests/test_dbapi.py 
b/python/adbc_driver_postgresql/tests/test_dbapi.py
index f516ba0e..93779db1 100644
--- a/python/adbc_driver_postgresql/tests/test_dbapi.py
+++ b/python/adbc_driver_postgresql/tests/test_dbapi.py
@@ -250,3 +250,16 @@ def test_reuse(postgres: dbapi.Connection) -> None:
 
         cur.execute("SELECT 2")
         assert cur.fetchone() == (2,)
+
+
+def test_ingest(postgres: dbapi.Connection) -> None:
+    table = pyarrow.Table.from_pydict({"numbers": [1, 2], "letters": ["a", 
"b"]})
+
+    with postgres.cursor() as cur:
+        cur.adbc_ingest("foo", table, mode="replace", db_schema_name="public")
+
+        cur.execute("SELECT * FROM public.foo")
+        assert cur.fetch_arrow_table() == table
+
+        with pytest.raises(dbapi.NotSupportedError):
+            cur.adbc_ingest("foo", table, catalog_name="main")
diff --git a/python/adbc_driver_sqlite/tests/test_dbapi.py 
b/python/adbc_driver_sqlite/tests/test_dbapi.py
index 44648ff9..83c07d51 100644
--- a/python/adbc_driver_sqlite/tests/test_dbapi.py
+++ b/python/adbc_driver_sqlite/tests/test_dbapi.py
@@ -85,3 +85,21 @@ def test_create_types(tmp_path: Path) -> None:
                 assert len(col) == 6
             actual_types = [col[2] for col in table_info]
             assert actual_types == ["INTEGER", "TEXT"]
+
+
+def test_ingest() -> None:
+    table = pa.Table.from_pydict({"numbers": [1, 2], "letters": ["a", "b"]})
+
+    with dbapi.connect() as conn:
+        with conn.cursor() as cur:
+            cur.adbc_ingest("foo", table, catalog_name="main")
+            cur.adbc_ingest("foo", table, catalog_name="temp")
+
+            cur.execute("SELECT * FROM main.foo")
+            assert cur.fetch_arrow_table() == table
+
+            cur.execute("SELECT * FROM temp.foo")
+            assert cur.fetch_arrow_table() == table
+
+            with pytest.raises(dbapi.NotSupportedError):
+                cur.adbc_ingest("foo", table, db_schema_name="main")
diff --git a/r/adbcpostgresql/bootstrap.R b/r/adbcpostgresql/bootstrap.R
index d2473a79..c3e06795 100644
--- a/r/adbcpostgresql/bootstrap.R
+++ b/r/adbcpostgresql/bootstrap.R
@@ -31,6 +31,7 @@ files_to_vendor <- c(
   "../../c/driver/postgresql/database.h",
   "../../c/driver/postgresql/database.cc",
   "../../c/driver/postgresql/postgresql.cc",
+  "../../c/driver/common/options.h",
   "../../c/driver/common/utils.h",
   "../../c/driver/common/utils.c",
   "../../c/vendor/nanoarrow/nanoarrow.h",
@@ -59,6 +60,7 @@ if (all(file.exists(files_to_vendor))) {
         "src/nanoarrow.c",
         "src/nanoarrow.h",
         "src/nanoarrow.hpp",
+        "src/options.h",
         "src/utils.c",
         "src/utils.h"
       ),
@@ -66,6 +68,7 @@ if (all(file.exists(files_to_vendor))) {
         "src/nanoarrow/nanoarrow.c",
         "src/nanoarrow/nanoarrow.h",
         "src/nanoarrow/nanoarrow.hpp",
+        "src/common/options.h",
         "src/common/utils.c",
         "src/common/utils.h"
       )
diff --git a/r/adbcsqlite/bootstrap.R b/r/adbcsqlite/bootstrap.R
index 0bda3275..e30c593e 100644
--- a/r/adbcsqlite/bootstrap.R
+++ b/r/adbcsqlite/bootstrap.R
@@ -23,6 +23,7 @@ files_to_vendor <- c(
   "../../c/driver/sqlite/statement_reader.c",
   "../../c/driver/sqlite/statement_reader.h",
   "../../c/driver/sqlite/types.h",
+  "../../c/driver/common/options.h",
   "../../c/driver/common/utils.c",
   "../../c/driver/common/utils.h",
   "../../c/vendor/nanoarrow/nanoarrow.h",
@@ -50,10 +51,10 @@ if (all(file.exists(files_to_vendor))) {
     file.rename(
       c("src/nanoarrow.c", "src/nanoarrow.h",
         "src/sqlite3.c", "src/sqlite3.h",
-        "src/utils.c", "src/utils.h"),
+        "src/options.h", "src/utils.c", "src/utils.h"),
       c("src/nanoarrow/nanoarrow.c", "src/nanoarrow/nanoarrow.h",
         "tools/sqlite3.c", "tools/sqlite3.h",
-        "src/common/utils.c", "src/common/utils.h")
+        "src/common/options.h", "src/common/utils.c", "src/common/utils.h")
     )
     cat("All files successfully copied to src/\n")
   } else {


Reply via email to