This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new db392368 feat(c/driver): support target catalog/schema for ingestion
(#1056)
db392368 is described below
commit db392368d6259f59d1b1165056e229052a00c6c0
Author: David Li <[email protected]>
AuthorDate: Tue Sep 12 15:40:34 2023 -0400
feat(c/driver): support target catalog/schema for ingestion (#1056)
- Support target catalog/schema options for ingestion
- Fix escaping in SQLite, PostgreSQL
Fixes #1000.
---
c/driver/{sqlite/types.h => common/options.h} | 53 +++-----
c/driver/flightsql/dremio_flightsql_test.cc | 3 +
c/driver/flightsql/sqlite_flightsql_test.cc | 3 +
c/driver/postgresql/postgresql_test.cc | 4 +-
c/driver/postgresql/statement.cc | 68 ++++++++--
c/driver/postgresql/statement.h | 3 +-
c/driver/sqlite/sqlite.c | 50 +++++--
c/driver/sqlite/sqlite_test.cc | 5 +-
c/driver/sqlite/types.h | 1 +
c/integration/duckdb/duckdb_test.cc | 3 +
c/validation/adbc_validation.cc | 147 ++++++++++++++++++++-
c/validation/adbc_validation.h | 14 ++
.../adbc_driver_manager/__init__.py | 6 +
.../adbc_driver_manager/dbapi.py | 29 +++-
python/adbc_driver_postgresql/tests/test_dbapi.py | 13 ++
python/adbc_driver_sqlite/tests/test_dbapi.py | 18 +++
r/adbcpostgresql/bootstrap.R | 3 +
r/adbcsqlite/bootstrap.R | 5 +-
18 files changed, 354 insertions(+), 74 deletions(-)
diff --git a/c/driver/sqlite/types.h b/c/driver/common/options.h
similarity index 53%
copy from c/driver/sqlite/types.h
copy to c/driver/common/options.h
index cd46f4f3..f8b64efa 100644
--- a/c/driver/sqlite/types.h
+++ b/c/driver/common/options.h
@@ -15,44 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-#pragma once
-
-#include <stddef.h>
-#include <stdint.h>
-
-#include <adbc.h>
-#include <sqlite3.h>
-
-#include "statement_reader.h"
+/// Common options that haven't yet been formally standardized.
+/// https://github.com/apache/arrow-adbc/issues/1055
-struct SqliteDatabase {
- sqlite3* db;
- char* uri;
- size_t connection_count;
-};
-
-struct SqliteConnection {
- sqlite3* conn;
- char active_transaction;
-};
-
-struct SqliteStatement {
- sqlite3* conn;
-
- // -- Query state -----------------------------------------
+#pragma once
- sqlite3_stmt* stmt;
- char prepared;
- char* query;
- size_t query_len;
+#ifdef __cplusplus
+extern "C" {
+#endif
- // -- Bind state ------------------------------------------
- struct AdbcSqliteBinder binder;
+/// \brief The catalog of the table for bulk insert.
+///
+/// The type is char*.
+#define ADBC_INGEST_OPTION_TARGET_CATALOG "adbc.ingest.target_catalog"
- // -- Ingest state ----------------------------------------
- char* target_table;
- char append;
+/// \brief The schema of the table for bulk insert.
+///
+/// The type is char*.
+#define ADBC_INGEST_OPTION_TARGET_DB_SCHEMA "adbc.ingest.target_db_schema"
- // -- Query options ---------------------------------------
- int batch_size;
-};
+#ifdef __cplusplus
+}
+#endif
diff --git a/c/driver/flightsql/dremio_flightsql_test.cc
b/c/driver/flightsql/dremio_flightsql_test.cc
index 416b8aea..52c184f3 100644
--- a/c/driver/flightsql/dremio_flightsql_test.cc
+++ b/c/driver/flightsql/dremio_flightsql_test.cc
@@ -89,6 +89,9 @@ class DremioFlightSqlStatementTest : public ::testing::Test,
void TestResultInvalidation() { GTEST_SKIP() << "Dremio generates a
CANCELLED"; }
void TestSqlIngestTableEscaping() { GTEST_SKIP() << "Table escaping not
implemented"; }
+ void TestSqlIngestColumnEscaping() {
+ GTEST_SKIP() << "Column escaping not implemented";
+ }
protected:
DremioFlightSqlQuirks quirks_;
diff --git a/c/driver/flightsql/sqlite_flightsql_test.cc
b/c/driver/flightsql/sqlite_flightsql_test.cc
index 46ca69be..41a7dedf 100644
--- a/c/driver/flightsql/sqlite_flightsql_test.cc
+++ b/c/driver/flightsql/sqlite_flightsql_test.cc
@@ -268,6 +268,9 @@ class SqliteFlightSqlStatementTest : public ::testing::Test,
void TearDown() override { ASSERT_NO_FATAL_FAILURE(TearDownTest()); }
void TestSqlIngestTableEscaping() { GTEST_SKIP() << "Table escaping not
implemented"; }
+ void TestSqlIngestColumnEscaping() {
+ GTEST_SKIP() << "Column escaping not implemented";
+ }
void TestSqlIngestInterval() {
GTEST_SKIP() << "Cannot ingest Interval (not implemented)";
}
diff --git a/c/driver/postgresql/postgresql_test.cc
b/c/driver/postgresql/postgresql_test.cc
index d4449981..4328f8e3 100644
--- a/c/driver/postgresql/postgresql_test.cc
+++ b/c/driver/postgresql/postgresql_test.cc
@@ -55,7 +55,7 @@ class PostgresQuirks : public adbc_validation::DriverQuirks {
AdbcStatusCode status = AdbcStatementNew(connection, &statement, error);
if (status != ADBC_STATUS_OK) return status;
- std::string query = "DROP TABLE IF EXISTS " + name;
+ std::string query = "DROP TABLE IF EXISTS \"" + name + "\"";
status = AdbcStatementSetSqlQuery(&statement, query.c_str(), error);
if (status != ADBC_STATUS_OK) {
std::ignore = AdbcStatementRelease(&statement, error);
@@ -111,6 +111,8 @@ class PostgresQuirks : public adbc_validation::DriverQuirks
{
std::string catalog() const override { return "postgres"; }
std::string db_schema() const override { return "public"; }
+ bool supports_bulk_ingest_catalog() const override { return false; }
+ bool supports_bulk_ingest_db_schema() const override { return true; }
bool supports_cancel() const override { return true; }
bool supports_execute_schema() const override { return true; }
std::optional<adbc_validation::SqlInfoValue> supports_get_sql_info(
diff --git a/c/driver/postgresql/statement.cc b/c/driver/postgresql/statement.cc
index 28069379..9fbbcaf9 100644
--- a/c/driver/postgresql/statement.cc
+++ b/c/driver/postgresql/statement.cc
@@ -31,6 +31,7 @@
#include <libpq-fe.h>
#include <nanoarrow/nanoarrow.hpp>
+#include "common/options.h"
#include "common/utils.h"
#include "connection.h"
#include "error.h"
@@ -831,7 +832,36 @@ AdbcStatusCode PostgresStatement::Cancel(struct AdbcError*
error) {
AdbcStatusCode PostgresStatement::CreateBulkTable(
const struct ArrowSchema& source_schema,
const std::vector<struct ArrowSchemaView>& source_schema_fields,
- struct AdbcError* error) {
+ std::string* escaped_table, struct AdbcError* error) {
+ PGconn* conn = connection_->conn();
+
+ {
+ if (!ingest_.db_schema.empty()) {
+ char* escaped =
+ PQescapeIdentifier(conn, ingest_.db_schema.c_str(),
ingest_.db_schema.size());
+ if (escaped == nullptr) {
+ SetError(error, "[libpq] Failed to escape target schema %s for
ingestion: %s",
+ ingest_.db_schema.c_str(), PQerrorMessage(conn));
+ return ADBC_STATUS_INTERNAL;
+ }
+ *escaped_table += escaped;
+ *escaped_table += " . ";
+ PQfreemem(escaped);
+ }
+
+ if (!ingest_.target.empty()) {
+ char* escaped =
+ PQescapeIdentifier(conn, ingest_.target.c_str(),
ingest_.target.size());
+ if (escaped == nullptr) {
+ SetError(error, "[libpq] Failed to escape target table %s for
ingestion: %s",
+ ingest_.target.c_str(), PQerrorMessage(conn));
+ return ADBC_STATUS_INTERNAL;
+ }
+ *escaped_table += escaped;
+ PQfreemem(escaped);
+ }
+ }
+
std::string create = "CREATE TABLE ";
switch (ingest_.mode) {
case IngestMode::kCreate:
@@ -840,15 +870,15 @@ AdbcStatusCode PostgresStatement::CreateBulkTable(
case IngestMode::kAppend:
return ADBC_STATUS_OK;
case IngestMode::kReplace: {
- std::string drop = "DROP TABLE IF EXISTS " + ingest_.target;
- PGresult* result = PQexecParams(connection_->conn(), drop.c_str(),
/*nParams=*/0,
+ std::string drop = "DROP TABLE IF EXISTS " + *escaped_table;
+ PGresult* result = PQexecParams(conn, drop.c_str(), /*nParams=*/0,
/*paramTypes=*/nullptr,
/*paramValues=*/nullptr,
/*paramLengths=*/nullptr,
/*paramFormats=*/nullptr,
/*resultFormat=*/1 /*(binary)*/);
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
AdbcStatusCode code =
SetError(error, result, "[libpq] Failed to drop table: %s\nQuery
was: %s",
- PQerrorMessage(connection_->conn()), drop.c_str());
+ PQerrorMessage(conn), drop.c_str());
PQclear(result);
return code;
}
@@ -859,12 +889,22 @@ AdbcStatusCode PostgresStatement::CreateBulkTable(
create += "IF NOT EXISTS ";
break;
}
- create += ingest_.target;
+ create += *escaped_table;
create += " (";
for (size_t i = 0; i < source_schema_fields.size(); i++) {
if (i > 0) create += ", ";
- create += source_schema.children[i]->name;
+
+ const char* unescaped = source_schema.children[i]->name;
+ char* escaped = PQescapeIdentifier(conn, unescaped,
std::strlen(unescaped));
+ if (escaped == nullptr) {
+ SetError(error, "[libpq] Failed to escape column %s for ingestion: %s",
unescaped,
+ PQerrorMessage(conn));
+ return ADBC_STATUS_INTERNAL;
+ }
+ create += escaped;
+ PQfreemem(escaped);
+
switch (source_schema_fields[i].type) {
case ArrowType::NANOARROW_TYPE_INT8:
case ArrowType::NANOARROW_TYPE_INT16:
@@ -914,14 +954,14 @@ AdbcStatusCode PostgresStatement::CreateBulkTable(
create += ")";
SetError(error, "%s%s", "[libpq] ", create.c_str());
- PGresult* result = PQexecParams(connection_->conn(), create.c_str(),
/*nParams=*/0,
+ PGresult* result = PQexecParams(conn, create.c_str(), /*nParams=*/0,
/*paramTypes=*/nullptr,
/*paramValues=*/nullptr,
/*paramLengths=*/nullptr,
/*paramFormats=*/nullptr,
/*resultFormat=*/1 /*(binary)*/);
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
AdbcStatusCode code =
SetError(error, result, "[libpq] Failed to create table: %s\nQuery
was: %s",
- PQerrorMessage(connection_->conn()), create.c_str());
+ PQerrorMessage(conn), create.c_str());
PQclear(result);
return code;
}
@@ -1060,16 +1100,17 @@ AdbcStatusCode
PostgresStatement::ExecuteUpdateBulk(int64_t* rows_affected,
BindStream bind_stream(std::move(bind_));
std::memset(&bind_, 0, sizeof(bind_));
+ std::string escaped_table;
RAISE_ADBC(bind_stream.Begin(
[&]() -> AdbcStatusCode {
return CreateBulkTable(bind_stream.bind_schema.value,
- bind_stream.bind_schema_fields, error);
+ bind_stream.bind_schema_fields, &escaped_table,
error);
},
error));
RAISE_ADBC(bind_stream.SetParamTypes(*type_resolver_, error));
std::string insert = "INSERT INTO ";
- insert += ingest_.target;
+ insert += escaped_table;
insert += " VALUES (";
for (size_t i = 0; i < bind_stream.bind_schema_fields.size(); i++) {
if (i > 0) insert += ", ";
@@ -1109,6 +1150,8 @@ AdbcStatusCode PostgresStatement::GetOption(const char*
key, char* value, size_t
std::string result;
if (std::strcmp(key, ADBC_INGEST_OPTION_TARGET_TABLE) == 0) {
result = ingest_.target;
+ } else if (std::strcmp(key, ADBC_INGEST_OPTION_TARGET_DB_SCHEMA) == 0) {
+ result = ingest_.db_schema;
} else if (std::strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) {
switch (ingest_.mode) {
case IngestMode::kCreate:
@@ -1190,6 +1233,7 @@ AdbcStatusCode PostgresStatement::Release(struct
AdbcError* error) {
AdbcStatusCode PostgresStatement::SetSqlQuery(const char* query,
struct AdbcError* error) {
ingest_.target.clear();
+ ingest_.db_schema.clear();
query_ = query;
prepared_ = false;
return ADBC_STATUS_OK;
@@ -1201,6 +1245,10 @@ AdbcStatusCode PostgresStatement::SetOption(const char*
key, const char* value,
query_.clear();
ingest_.target = value;
prepared_ = false;
+ } else if (std::strcmp(key, ADBC_INGEST_OPTION_TARGET_DB_SCHEMA) == 0) {
+ query_.clear();
+ ingest_.db_schema = value;
+ prepared_ = false;
} else if (std::strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) {
if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_CREATE) == 0) {
ingest_.mode = IngestMode::kCreate;
diff --git a/c/driver/postgresql/statement.h b/c/driver/postgresql/statement.h
index 59d3032c..013334fe 100644
--- a/c/driver/postgresql/statement.h
+++ b/c/driver/postgresql/statement.h
@@ -128,7 +128,7 @@ class PostgresStatement {
AdbcStatusCode CreateBulkTable(
const struct ArrowSchema& source_schema,
const std::vector<struct ArrowSchemaView>& source_schema_fields,
- struct AdbcError* error);
+ std::string* escaped_table, struct AdbcError* error);
AdbcStatusCode ExecuteUpdateBulk(int64_t* rows_affected, struct AdbcError*
error);
AdbcStatusCode ExecuteUpdateQuery(int64_t* rows_affected, struct AdbcError*
error);
AdbcStatusCode ExecutePreparedStatement(struct ArrowArrayStream* stream,
@@ -154,6 +154,7 @@ class PostgresStatement {
};
struct {
+ std::string db_schema;
std::string target;
IngestMode mode = IngestMode::kCreate;
} ingest_;
diff --git a/c/driver/sqlite/sqlite.c b/c/driver/sqlite/sqlite.c
index 21a53502..2c46976a 100644
--- a/c/driver/sqlite/sqlite.c
+++ b/c/driver/sqlite/sqlite.c
@@ -28,6 +28,7 @@
#include <nanoarrow/nanoarrow.h>
#include <sqlite3.h>
+#include "common/options.h"
#include "common/utils.h"
#include "statement_reader.h"
#include "types.h"
@@ -1025,6 +1026,7 @@ AdbcStatusCode SqliteStatementRelease(struct
AdbcStatement* statement,
}
if (stmt->query) free(stmt->query);
AdbcSqliteBinderRelease(&stmt->binder);
+ if (stmt->target_catalog) free(stmt->target_catalog);
if (stmt->target_table) free(stmt->target_table);
if (rc != SQLITE_OK) {
SetError(error,
@@ -1079,29 +1081,38 @@ AdbcStatusCode SqliteStatementInitIngest(struct
SqliteStatement* stmt,
AdbcStatusCode code = ADBC_STATUS_OK;
// Create statements for CREATE TABLE / INSERT
- sqlite3_str* create_query = sqlite3_str_new(NULL);
+ sqlite3_str* create_query = NULL;
+ sqlite3_str* insert_query = NULL;
+ char* table = NULL;
+
+ create_query = sqlite3_str_new(NULL);
if (sqlite3_str_errcode(create_query)) {
SetError(error, "[SQLite] %s", sqlite3_errmsg(stmt->conn));
- sqlite3_free(sqlite3_str_finish(create_query));
- return ADBC_STATUS_INTERNAL;
+ code = ADBC_STATUS_INTERNAL;
+ goto cleanup;
}
- sqlite3_str* insert_query = sqlite3_str_new(NULL);
+ insert_query = sqlite3_str_new(NULL);
if (sqlite3_str_errcode(insert_query)) {
SetError(error, "[SQLite] %s", sqlite3_errmsg(stmt->conn));
- sqlite3_free(sqlite3_str_finish(create_query));
- sqlite3_free(sqlite3_str_finish(insert_query));
- return ADBC_STATUS_INTERNAL;
+ code = ADBC_STATUS_INTERNAL;
+ goto cleanup;
+ }
+
+ if (stmt->target_catalog != NULL) {
+ table = sqlite3_mprintf("\"%w\" . \"%w\"", stmt->target_catalog,
stmt->target_table);
+ } else {
+ table = sqlite3_mprintf("\"%w\"", stmt->target_table);
}
- sqlite3_str_appendf(create_query, "CREATE TABLE %Q (", stmt->target_table);
+ sqlite3_str_appendf(create_query, "CREATE TABLE %s (", table);
if (sqlite3_str_errcode(create_query)) {
SetError(error, "[SQLite] Failed to build CREATE: %s",
sqlite3_errmsg(stmt->conn));
code = ADBC_STATUS_INTERNAL;
goto cleanup;
}
- sqlite3_str_appendf(insert_query, "INSERT INTO %Q VALUES (",
stmt->target_table);
+ sqlite3_str_appendf(insert_query, "INSERT INTO %s VALUES (", table);
if (sqlite3_str_errcode(insert_query)) {
SetError(error, "[SQLite] Failed to build INSERT: %s",
sqlite3_errmsg(stmt->conn));
code = ADBC_STATUS_INTERNAL;
@@ -1121,7 +1132,7 @@ AdbcStatusCode SqliteStatementInitIngest(struct
SqliteStatement* stmt,
}
}
- sqlite3_str_appendf(create_query, "%Q",
stmt->binder.schema.children[i]->name);
+ sqlite3_str_appendf(create_query, "\"%w\"",
stmt->binder.schema.children[i]->name);
if (sqlite3_str_errcode(create_query)) {
SetError(error, "[SQLite] Failed to build CREATE: %s",
sqlite3_errmsg(stmt->conn));
code = ADBC_STATUS_INTERNAL;
@@ -1221,6 +1232,7 @@ AdbcStatusCode SqliteStatementInitIngest(struct
SqliteStatement* stmt,
cleanup:
sqlite3_free(sqlite3_str_finish(create_query));
sqlite3_free(sqlite3_str_finish(insert_query));
+ if (table != NULL) sqlite3_free(table);
return code;
}
@@ -1347,6 +1359,10 @@ AdbcStatusCode SqliteStatementSetSqlQuery(struct
AdbcStatement* statement,
free(stmt->query);
stmt->query = NULL;
}
+ if (stmt->target_catalog) {
+ free(stmt->target_catalog);
+ stmt->target_catalog = NULL;
+ }
if (stmt->target_table) {
free(stmt->target_table);
stmt->target_table = NULL;
@@ -1462,6 +1478,20 @@ AdbcStatusCode SqliteStatementSetOption(struct
AdbcStatement* statement, const c
stmt->target_table = (char*)malloc(len);
strncpy(stmt->target_table, value, len);
return ADBC_STATUS_OK;
+ } else if (strcmp(key, ADBC_INGEST_OPTION_TARGET_CATALOG) == 0) {
+ if (stmt->query) {
+ free(stmt->query);
+ stmt->query = NULL;
+ }
+ if (stmt->target_catalog) {
+ free(stmt->target_catalog);
+ stmt->target_catalog = NULL;
+ }
+
+ size_t len = strlen(value) + 1;
+ stmt->target_catalog = (char*)malloc(len);
+ strncpy(stmt->target_catalog, value, len);
+ return ADBC_STATUS_OK;
} else if (strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) {
if (strcmp(value, ADBC_INGEST_OPTION_MODE_APPEND) == 0) {
stmt->append = 1;
diff --git a/c/driver/sqlite/sqlite_test.cc b/c/driver/sqlite/sqlite_test.cc
index 9f219126..5fdb8628 100644
--- a/c/driver/sqlite/sqlite_test.cc
+++ b/c/driver/sqlite/sqlite_test.cc
@@ -50,7 +50,7 @@ class SqliteQuirks : public adbc_validation::DriverQuirks {
AdbcStatusCode status = AdbcStatementNew(connection, &statement, error);
if (status != ADBC_STATUS_OK) return status;
- std::string query = "DROP TABLE IF EXISTS " + name;
+ std::string query = "DROP TABLE IF EXISTS \"" + name + "\"";
status = AdbcStatementSetSqlQuery(&statement, query.c_str(), error);
if (status != ADBC_STATUS_OK) {
std::ignore = AdbcStatementRelease(&statement, error);
@@ -97,6 +97,7 @@ class SqliteQuirks : public adbc_validation::DriverQuirks {
return std::strcmp(mode, ADBC_INGEST_OPTION_MODE_APPEND) == 0 ||
std::strcmp(mode, ADBC_INGEST_OPTION_MODE_CREATE) == 0;
}
+ bool supports_bulk_ingest_catalog() const override { return true; }
bool supports_concurrent_statements() const override { return true; }
bool supports_get_option() const override { return false; }
std::optional<adbc_validation::SqlInfoValue> supports_get_sql_info(
@@ -268,7 +269,7 @@ class SqliteStatementTest : public ::testing::Test,
ADBCV_TEST_STATEMENT(SqliteStatementTest)
TEST_F(SqliteStatementTest, SqlIngestNameEscaping) {
- ASSERT_THAT(quirks()->DropTable(&connection, "\"test-table\"", &error),
+ ASSERT_THAT(quirks()->DropTable(&connection, "test-table", &error),
adbc_validation::IsOkStatus(&error));
std::string table = "test-table";
diff --git a/c/driver/sqlite/types.h b/c/driver/sqlite/types.h
index cd46f4f3..805aed46 100644
--- a/c/driver/sqlite/types.h
+++ b/c/driver/sqlite/types.h
@@ -50,6 +50,7 @@ struct SqliteStatement {
struct AdbcSqliteBinder binder;
// -- Ingest state ----------------------------------------
+ char* target_catalog;
char* target_table;
char append;
diff --git a/c/integration/duckdb/duckdb_test.cc
b/c/integration/duckdb/duckdb_test.cc
index a373abd8..a6bded03 100644
--- a/c/integration/duckdb/duckdb_test.cc
+++ b/c/integration/duckdb/duckdb_test.cc
@@ -96,6 +96,9 @@ class DuckDbStatementTest : public ::testing::Test,
void TestSqlPrepareErrorNoQuery() { GTEST_SKIP(); }
void TestSqlIngestTableEscaping() { GTEST_SKIP() << "Table escaping not
implemented"; }
+ void TestSqlIngestColumnEscaping() {
+ GTEST_SKIP() << "Column escaping not implemented";
+ }
void TestSqlQueryErrors() { GTEST_SKIP() << "DuckDB does not set
AdbcError.release"; }
diff --git a/c/validation/adbc_validation.cc b/c/validation/adbc_validation.cc
index 595ffd1f..beebb177 100644
--- a/c/validation/adbc_validation.cc
+++ b/c/validation/adbc_validation.cc
@@ -37,6 +37,7 @@
#include <nanoarrow/nanoarrow.hpp>
#include "adbc_validation_util.h"
+#include "common/options.h"
namespace adbc_validation {
@@ -1484,8 +1485,7 @@ void StatementTest::TestSqlIngestInterval() {
void StatementTest::TestSqlIngestTableEscaping() {
std::string name = "create_table_escaping";
- ASSERT_THAT(quirks()->DropTable(&connection, name, &error),
- adbc_validation::IsOkStatus(&error));
+ ASSERT_THAT(quirks()->DropTable(&connection, name, &error),
IsOkStatus(&error));
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
@@ -1495,15 +1495,41 @@ void StatementTest::TestSqlIngestTableEscaping() {
IsOkErrno());
Handle<struct AdbcStatement> statement;
- ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
IsOkErrno());
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
+ IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
- IsOkErrno());
+ IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value,
&error),
- IsOkErrno());
+ IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
&error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementRelease(&statement.value, &error),
IsOkStatus(&error));
+}
+
+void StatementTest::TestSqlIngestColumnEscaping() {
+ std::string name = "create";
+
+ ASSERT_THAT(quirks()->DropTable(&connection, name, &error),
IsOkStatus(&error));
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_THAT(MakeSchema(&schema.value, {{"index", NANOARROW_TYPE_INT64}}),
IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+ {42, -42, std::nullopt})),
IsOkErrno());
- ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkErrno());
+
+ Handle<struct AdbcStatement> statement;
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_TABLE,
+ name.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value,
&error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
&error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementRelease(&statement.value, &error),
IsOkStatus(&error));
}
void StatementTest::TestSqlIngestAppend() {
@@ -1952,6 +1978,115 @@ void StatementTest::TestSqlIngestSample() {
ASSERT_EQ(nullptr, reader.array->release);
}
+void StatementTest::TestSqlIngestTargetCatalog() {
+ if (!quirks()->supports_bulk_ingest_catalog() ||
+ !quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
+ GTEST_SKIP();
+ }
+
+ std::string catalog = quirks()->catalog();
+ std::string name = "bulk_ingest";
+
+ ASSERT_THAT(quirks()->DropTable(&connection, name, &error),
IsOkStatus(&error));
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}),
IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+ {42, -42, std::nullopt})),
+ IsOkErrno());
+
+ Handle<struct AdbcStatement> statement;
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_CATALOG,
+ catalog.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_TABLE,
+ name.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value,
&error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
&error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementRelease(&statement.value, &error),
IsOkStatus(&error));
+}
+
+void StatementTest::TestSqlIngestTargetSchema() {
+ if (!quirks()->supports_bulk_ingest_db_schema() ||
+ !quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
+ GTEST_SKIP();
+ }
+
+ std::string db_schema = quirks()->db_schema();
+ std::string name = "bulk_ingest";
+
+ ASSERT_THAT(quirks()->DropTable(&connection, name, &error),
IsOkStatus(&error));
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}),
IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+ {42, -42, std::nullopt})),
+ IsOkErrno());
+
+ Handle<struct AdbcStatement> statement;
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(
+ AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_DB_SCHEMA,
+ db_schema.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_TABLE,
+ name.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value,
&error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
&error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementRelease(&statement.value, &error),
IsOkStatus(&error));
+}
+
+void StatementTest::TestSqlIngestTargetCatalogSchema() {
+ if (!quirks()->supports_bulk_ingest_catalog() ||
+ !quirks()->supports_bulk_ingest_db_schema() ||
+ !quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
+ GTEST_SKIP();
+ }
+
+ std::string catalog = quirks()->catalog();
+ std::string db_schema = quirks()->db_schema();
+ std::string name = "bulk_ingest";
+
+ ASSERT_THAT(quirks()->DropTable(&connection, name, &error),
IsOkStatus(&error));
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}),
IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+ {42, -42, std::nullopt})),
+ IsOkErrno());
+
+ Handle<struct AdbcStatement> statement;
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_CATALOG,
+ catalog.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(
+ AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_DB_SCHEMA,
+ db_schema.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_TABLE,
+ name.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value,
&error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
&error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementRelease(&statement.value, &error),
IsOkStatus(&error));
+}
+
void StatementTest::TestSqlPartitionedInts() {
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 42", &error),
diff --git a/c/validation/adbc_validation.h b/c/validation/adbc_validation.h
index 354e695c..56f44ba0 100644
--- a/c/validation/adbc_validation.h
+++ b/c/validation/adbc_validation.h
@@ -91,6 +91,12 @@ class DriverQuirks {
/// \brief Whether bulk ingest is supported
virtual bool supports_bulk_ingest(const char* mode) const { return true; }
+ /// \brief Whether bulk ingest to a specific catalog is supported
+ virtual bool supports_bulk_ingest_catalog() const { return false; }
+
+ /// \brief Whether bulk ingest to a specific schema is supported
+ virtual bool supports_bulk_ingest_db_schema() const { return false; }
+
/// \brief Whether we can cancel queries.
virtual bool supports_cancel() const { return false; }
@@ -284,12 +290,16 @@ class StatementTest {
// ---- End Type-specific tests ----------------
void TestSqlIngestTableEscaping();
+ void TestSqlIngestColumnEscaping();
void TestSqlIngestAppend();
void TestSqlIngestReplace();
void TestSqlIngestCreateAppend();
void TestSqlIngestErrors();
void TestSqlIngestMultipleConnections();
void TestSqlIngestSample();
+ void TestSqlIngestTargetCatalog();
+ void TestSqlIngestTargetSchema();
+ void TestSqlIngestTargetCatalogSchema();
void TestSqlPartitionedInts();
@@ -365,12 +375,16 @@ class StatementTest {
TEST_F(FIXTURE, SqlIngestTimestampTz) { TestSqlIngestTimestampTz(); }
\
TEST_F(FIXTURE, SqlIngestInterval) { TestSqlIngestInterval(); }
\
TEST_F(FIXTURE, SqlIngestTableEscaping) { TestSqlIngestTableEscaping(); }
\
+ TEST_F(FIXTURE, SqlIngestColumnEscaping) { TestSqlIngestColumnEscaping(); }
\
TEST_F(FIXTURE, SqlIngestAppend) { TestSqlIngestAppend(); }
\
TEST_F(FIXTURE, SqlIngestReplace) { TestSqlIngestReplace(); }
\
TEST_F(FIXTURE, SqlIngestCreateAppend) { TestSqlIngestCreateAppend(); }
\
TEST_F(FIXTURE, SqlIngestErrors) { TestSqlIngestErrors(); }
\
TEST_F(FIXTURE, SqlIngestMultipleConnections) {
TestSqlIngestMultipleConnections(); } \
TEST_F(FIXTURE, SqlIngestSample) { TestSqlIngestSample(); }
\
+ TEST_F(FIXTURE, SqlIngestTargetCatalog) { TestSqlIngestTargetCatalog(); }
\
+ TEST_F(FIXTURE, SqlIngestTargetSchema) { TestSqlIngestTargetSchema(); }
\
+ TEST_F(FIXTURE, SqlIngestTargetCatalogSchema) {
TestSqlIngestTargetCatalogSchema(); } \
TEST_F(FIXTURE, SqlPartitionedInts) { TestSqlPartitionedInts(); }
\
TEST_F(FIXTURE, SqlPrepareGetParameterSchema) {
TestSqlPrepareGetParameterSchema(); } \
TEST_F(FIXTURE, SqlPrepareSelectNoParams) { TestSqlPrepareSelectNoParams();
} \
diff --git a/python/adbc_driver_manager/adbc_driver_manager/__init__.py
b/python/adbc_driver_manager/adbc_driver_manager/__init__.py
index 25b821eb..336fdb21 100644
--- a/python/adbc_driver_manager/adbc_driver_manager/__init__.py
+++ b/python/adbc_driver_manager/adbc_driver_manager/__init__.py
@@ -122,5 +122,11 @@ class StatementOptions(enum.Enum):
INGEST_MODE = INGEST_OPTION_MODE
#: For bulk ingestion, the table to ingest into.
INGEST_TARGET_TABLE = INGEST_OPTION_TARGET_TABLE
+ #: For bulk ingestion, the catalog to create/locate the table in.
+ #: **This API is EXPERIMENTAL.**
+ INGEST_TARGET_CATALOG = "adbc.ingest.target_catalog"
+ #: For bulk ingestion, the schema to create/locate the table in.
+ #: **This API is EXPERIMENTAL.**
+ INGEST_TARGET_DB_SCHEMA = "adbc.ingest.target_db_schema"
#: Get progress of a query.
PROGRESS = "adbc.statement.exec.progress"
diff --git a/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
b/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
index f28fbf0e..f358a0fa 100644
--- a/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
+++ b/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
@@ -792,6 +792,9 @@ class Cursor(_Closeable):
table_name: str,
data: Union[pyarrow.RecordBatch, pyarrow.Table,
pyarrow.RecordBatchReader],
mode: Literal["append", "create", "replace", "create_append"] =
"create",
+ *,
+ catalog_name: Optional[str] = None,
+ db_schema_name: Optional[str] = None,
) -> int:
"""
Ingest Arrow data into a database table.
@@ -812,6 +815,12 @@ class Cursor(_Closeable):
- 'create': create a table and insert (error if table exists)
- 'create_append': create a table (if not exists) and insert
- 'replace': drop existing table (if any), then same as 'create'
+ catalog_name
+ If given, the catalog to create/locate the table in.
+ **This API is EXPERIMENTAL.**
+ db_schema_name
+ If given, the schema to create/locate the table in.
+ **This API is EXPERIMENTAL.**
Returns
-------
@@ -833,12 +842,20 @@ class Cursor(_Closeable):
c_mode = _lib.INGEST_OPTION_MODE_REPLACE
else:
raise ValueError(f"Invalid value for 'mode': {mode}")
- self._stmt.set_options(
- **{
- _lib.INGEST_OPTION_TARGET_TABLE: table_name,
- _lib.INGEST_OPTION_MODE: c_mode,
- }
- )
+
+ options = {
+ _lib.INGEST_OPTION_TARGET_TABLE: table_name,
+ _lib.INGEST_OPTION_MODE: c_mode,
+ }
+ if catalog_name is not None:
+ options[
+
adbc_driver_manager.StatementOptions.INGEST_TARGET_CATALOG.value
+ ] = catalog_name
+ if db_schema_name is not None:
+ options[
+
adbc_driver_manager.StatementOptions.INGEST_TARGET_DB_SCHEMA.value
+ ] = db_schema_name
+ self._stmt.set_options(**options)
if isinstance(data, pyarrow.RecordBatch):
array = _lib.ArrowArrayHandle()
diff --git a/python/adbc_driver_postgresql/tests/test_dbapi.py
b/python/adbc_driver_postgresql/tests/test_dbapi.py
index f516ba0e..93779db1 100644
--- a/python/adbc_driver_postgresql/tests/test_dbapi.py
+++ b/python/adbc_driver_postgresql/tests/test_dbapi.py
@@ -250,3 +250,16 @@ def test_reuse(postgres: dbapi.Connection) -> None:
cur.execute("SELECT 2")
assert cur.fetchone() == (2,)
+
+
+def test_ingest(postgres: dbapi.Connection) -> None:
+ table = pyarrow.Table.from_pydict({"numbers": [1, 2], "letters": ["a",
"b"]})
+
+ with postgres.cursor() as cur:
+ cur.adbc_ingest("foo", table, mode="replace", db_schema_name="public")
+
+ cur.execute("SELECT * FROM public.foo")
+ assert cur.fetch_arrow_table() == table
+
+ with pytest.raises(dbapi.NotSupportedError):
+ cur.adbc_ingest("foo", table, catalog_name="main")
diff --git a/python/adbc_driver_sqlite/tests/test_dbapi.py
b/python/adbc_driver_sqlite/tests/test_dbapi.py
index 44648ff9..83c07d51 100644
--- a/python/adbc_driver_sqlite/tests/test_dbapi.py
+++ b/python/adbc_driver_sqlite/tests/test_dbapi.py
@@ -85,3 +85,21 @@ def test_create_types(tmp_path: Path) -> None:
assert len(col) == 6
actual_types = [col[2] for col in table_info]
assert actual_types == ["INTEGER", "TEXT"]
+
+
+def test_ingest() -> None:
+ table = pa.Table.from_pydict({"numbers": [1, 2], "letters": ["a", "b"]})
+
+ with dbapi.connect() as conn:
+ with conn.cursor() as cur:
+ cur.adbc_ingest("foo", table, catalog_name="main")
+ cur.adbc_ingest("foo", table, catalog_name="temp")
+
+ cur.execute("SELECT * FROM main.foo")
+ assert cur.fetch_arrow_table() == table
+
+ cur.execute("SELECT * FROM temp.foo")
+ assert cur.fetch_arrow_table() == table
+
+ with pytest.raises(dbapi.NotSupportedError):
+ cur.adbc_ingest("foo", table, db_schema_name="main")
diff --git a/r/adbcpostgresql/bootstrap.R b/r/adbcpostgresql/bootstrap.R
index d2473a79..c3e06795 100644
--- a/r/adbcpostgresql/bootstrap.R
+++ b/r/adbcpostgresql/bootstrap.R
@@ -31,6 +31,7 @@ files_to_vendor <- c(
"../../c/driver/postgresql/database.h",
"../../c/driver/postgresql/database.cc",
"../../c/driver/postgresql/postgresql.cc",
+ "../../c/driver/common/options.h",
"../../c/driver/common/utils.h",
"../../c/driver/common/utils.c",
"../../c/vendor/nanoarrow/nanoarrow.h",
@@ -59,6 +60,7 @@ if (all(file.exists(files_to_vendor))) {
"src/nanoarrow.c",
"src/nanoarrow.h",
"src/nanoarrow.hpp",
+ "src/options.h",
"src/utils.c",
"src/utils.h"
),
@@ -66,6 +68,7 @@ if (all(file.exists(files_to_vendor))) {
"src/nanoarrow/nanoarrow.c",
"src/nanoarrow/nanoarrow.h",
"src/nanoarrow/nanoarrow.hpp",
+ "src/common/options.h",
"src/common/utils.c",
"src/common/utils.h"
)
diff --git a/r/adbcsqlite/bootstrap.R b/r/adbcsqlite/bootstrap.R
index 0bda3275..e30c593e 100644
--- a/r/adbcsqlite/bootstrap.R
+++ b/r/adbcsqlite/bootstrap.R
@@ -23,6 +23,7 @@ files_to_vendor <- c(
"../../c/driver/sqlite/statement_reader.c",
"../../c/driver/sqlite/statement_reader.h",
"../../c/driver/sqlite/types.h",
+ "../../c/driver/common/options.h",
"../../c/driver/common/utils.c",
"../../c/driver/common/utils.h",
"../../c/vendor/nanoarrow/nanoarrow.h",
@@ -50,10 +51,10 @@ if (all(file.exists(files_to_vendor))) {
file.rename(
c("src/nanoarrow.c", "src/nanoarrow.h",
"src/sqlite3.c", "src/sqlite3.h",
- "src/utils.c", "src/utils.h"),
+ "src/options.h", "src/utils.c", "src/utils.h"),
c("src/nanoarrow/nanoarrow.c", "src/nanoarrow/nanoarrow.h",
"tools/sqlite3.c", "tools/sqlite3.h",
- "src/common/utils.c", "src/common/utils.h")
+ "src/common/options.h", "src/common/utils.c", "src/common/utils.h")
)
cat("All files successfully copied to src/\n")
} else {