This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 05fa60d64 feat(c/driver/postgresql): Implement consuming a PGresult
via the copy reader (#2029)
05fa60d64 is described below
commit 05fa60d643c66b572d426ab28aa78fc52e9520e8
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue Aug 6 21:52:23 2024 -0300
feat(c/driver/postgresql): Implement consuming a PGresult via the copy
reader (#2029)
I started this PR wanting to get queries with parameters able to return
their results; however, this turned into a PR leaning in to the
`PqResultHelper` because it was helpful to export arrays from the
`PGresult*` but wasn't quite general enough. I did a second bit of
shuffling to make it (possibly, or maybe just for me) easier to
understand what path gets taken on `ExecuteQuery()`.
Some side effects of these changes are that we can now support multiple
statements in the same query (by using `PQexec()` instead of
`PQexecParams()` when there is no output requested) and that we can
`ExecuteSchema()` for all parameterized queries.
The actual feature is that a user can set `adbc.postgresql.use_copy =
FALSE` to force a non-COPY path for queries that aren't supported there.
Because we request binary data, we can use all the same infrastructure
for converting the results! I have only one test for this although I did
run the whole test suite in C++ and Python...there are still a few
missing features (batch size hint, large string overflow, error detail,
cancel) but most tests pass using either path.
I'm happy to split this up if that is easier! I'm also planning to
document the helper (but wanted a first round of review before
documenting the behaviour to make sure it's behaviour we actually want).
Closes #855, Closes #2035.
``` r
library(adbcdrivermanager)
#> Warning: package 'adbcdrivermanager' was built under R version 4.3.3
con <- adbc_database_init(
adbcpostgresql::adbcpostgresql(),
uri =
"postgresql://localhost:5432/postgres?user=postgres&password=password"
) |>
adbc_connection_init()
nycflights13::flights |>
write_adbc(con, "flights")
stream <- nanoarrow::nanoarrow_allocate_array_stream()
rows <- con |>
adbc_statement_init(adbc.postgresql.use_copy = FALSE) |>
adbc_statement_set_sql_query(
"SELECT * from flights where month = 1 AND day = 1"
) |>
adbc_statement_prepare() |>
adbc_statement_execute_query(stream)
rows
#> [1] 842
tibble::as_tibble(stream)
#> # A tibble: 842 × 19
#> year month day dep_time sched_dep_time dep_delay arr_time
sched_arr_time
#> <int> <int> <int> <int> <int> <dbl> <int>
<int>
#> 1 2013 1 1 517 515 2 830
819
#> 2 2013 1 1 533 529 4 850
830
#> 3 2013 1 1 542 540 2 923
850
#> 4 2013 1 1 544 545 -1 1004
1022
#> 5 2013 1 1 554 600 -6 812
837
#> 6 2013 1 1 554 558 -4 740
728
#> 7 2013 1 1 555 600 -5 913
854
#> 8 2013 1 1 557 600 -3 709
723
#> 9 2013 1 1 557 600 -3 838
846
#> 10 2013 1 1 558 600 -2 753
745
#> # ℹ 832 more rows
#> # ℹ 11 more variables: arr_delay <dbl>, carrier <chr>, flight <int>,
#> # tailnum <chr>, origin <chr>, dest <chr>, air_time <dbl>, distance
<dbl>,
#> # hour <dbl>, minute <dbl>, time_hour <dttm>
con |>
execute_adbc("DROP TABLE flights")
```
<sup>Created on 2024-07-25 with [reprex
v2.1.0](https://reprex.tidyverse.org)</sup>
---
c/driver/postgresql/connection.cc | 55 ++--
c/driver/postgresql/postgresql_test.cc | 130 ++++++++-
c/driver/postgresql/result_helper.cc | 338 ++++++++++++++++++++--
c/driver/postgresql/result_helper.h | 110 ++++++-
c/driver/postgresql/statement.cc | 298 ++++++++-----------
c/driver/postgresql/statement.h | 21 +-
c/validation/adbc_validation_util.cc | 14 +
c/validation/adbc_validation_util.h | 4 +
python/adbc_driver_postgresql/tests/test_dbapi.py | 2 +-
9 files changed, 707 insertions(+), 265 deletions(-)
diff --git a/c/driver/postgresql/connection.cc
b/c/driver/postgresql/connection.cc
index a918188bf..b5c0ef161 100644
--- a/c/driver/postgresql/connection.cc
+++ b/c/driver/postgresql/connection.cc
@@ -145,12 +145,10 @@ class PqGetObjectsHelper {
params.push_back(db_schema_);
}
- auto result_helper =
- PqResultHelper{conn_, std::string(query.buffer), params, error_};
+ auto result_helper = PqResultHelper{conn_, std::string(query.buffer)};
StringBuilderReset(&query);
- RAISE_ADBC(result_helper.Prepare());
- RAISE_ADBC(result_helper.Execute());
+ RAISE_ADBC(result_helper.Execute(error_, params));
for (PqResultRow row : result_helper) {
const char* schema_name = row[0].data;
@@ -188,12 +186,10 @@ class PqGetObjectsHelper {
params.push_back(catalog_);
}
- PqResultHelper result_helper =
- PqResultHelper{conn_, std::string(query.buffer), params, error_};
+ PqResultHelper result_helper = PqResultHelper{conn_,
std::string(query.buffer)};
StringBuilderReset(&query);
- RAISE_ADBC(result_helper.Prepare());
- RAISE_ADBC(result_helper.Execute());
+ RAISE_ADBC(result_helper.Execute(error_, params));
for (PqResultRow row : result_helper) {
const char* db_name = row[0].data;
@@ -280,11 +276,10 @@ class PqGetObjectsHelper {
}
}
- auto result_helper = PqResultHelper{conn_, query.buffer, params, error_};
+ auto result_helper = PqResultHelper{conn_, query.buffer};
StringBuilderReset(&query);
- RAISE_ADBC(result_helper.Prepare());
- RAISE_ADBC(result_helper.Execute());
+ RAISE_ADBC(result_helper.Execute(error_, params));
for (PqResultRow row : result_helper) {
const char* table_name = row[0].data;
const char* table_type = row[1].data;
@@ -341,11 +336,10 @@ class PqGetObjectsHelper {
params.push_back(std::string(column_name_));
}
- auto result_helper = PqResultHelper{conn_, query.buffer, params, error_};
+ auto result_helper = PqResultHelper{conn_, query.buffer};
StringBuilderReset(&query);
- RAISE_ADBC(result_helper.Prepare());
- RAISE_ADBC(result_helper.Execute());
+ RAISE_ADBC(result_helper.Execute(error_, params));
for (PqResultRow row : result_helper) {
const char* column_name = row[0].data;
@@ -493,11 +487,10 @@ class PqGetObjectsHelper {
params.push_back(std::string(column_name_));
}
- auto result_helper = PqResultHelper{conn_, query.buffer, params, error_};
+ auto result_helper = PqResultHelper{conn_, query.buffer};
StringBuilderReset(&query);
- RAISE_ADBC(result_helper.Prepare());
- RAISE_ADBC(result_helper.Execute());
+ RAISE_ADBC(result_helper.Execute(error_, params));
for (PqResultRow row : result_helper) {
const char* constraint_name = row[0].data;
@@ -655,9 +648,8 @@ AdbcStatusCode
PostgresConnection::PostgresConnectionGetInfoImpl(
break;
case ADBC_INFO_VENDOR_VERSION: {
const char* stmt = "SHOW server_version_num";
- auto result_helper = PqResultHelper{conn_, std::string(stmt), error};
- RAISE_ADBC(result_helper.Prepare());
- RAISE_ADBC(result_helper.Execute());
+ auto result_helper = PqResultHelper{conn_, std::string(stmt)};
+ RAISE_ADBC(result_helper.Execute(error));
auto it = result_helper.begin();
if (it == result_helper.end()) {
SetError(error, "[libpq] PostgreSQL returned no rows for '%s'",
stmt);
@@ -760,9 +752,8 @@ AdbcStatusCode PostgresConnection::GetOption(const char*
option, char* value,
if (std::strcmp(option, ADBC_CONNECTION_OPTION_CURRENT_CATALOG) == 0) {
output = PQdb(conn_);
} else if (std::strcmp(option, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA) ==
0) {
- PqResultHelper result_helper{conn_, "SELECT CURRENT_SCHEMA", {}, error};
- RAISE_ADBC(result_helper.Prepare());
- RAISE_ADBC(result_helper.Execute());
+ PqResultHelper result_helper{conn_, "SELECT CURRENT_SCHEMA"};
+ RAISE_ADBC(result_helper.Execute(error));
auto it = result_helper.begin();
if (it == result_helper.end()) {
SetError(error, "[libpq] PostgreSQL returned no rows for 'SELECT
CURRENT_SCHEMA'");
@@ -931,10 +922,8 @@ AdbcStatusCode PostgresConnectionGetStatisticsImpl(PGconn*
conn, const char* db_
std::string prev_table;
{
- PqResultHelper result_helper{
- conn, query, {db_schema, table_name ? table_name : "%"}, error};
- RAISE_ADBC(result_helper.Prepare());
- RAISE_ADBC(result_helper.Execute());
+ PqResultHelper result_helper{conn, query};
+ RAISE_ADBC(result_helper.Execute(error, {db_schema, table_name ?
table_name : "%"}));
for (PqResultRow row : result_helper) {
auto reltuples = row[5].ParseDouble();
@@ -1166,11 +1155,9 @@ AdbcStatusCode PostgresConnection::GetTableSchema(const
char* catalog,
std::vector<std::string> params = {table_name_str};
- PqResultHelper result_helper =
- PqResultHelper{conn_, std::string(query.c_str()), params, error};
+ PqResultHelper result_helper = PqResultHelper{conn_,
std::string(query.c_str())};
- RAISE_ADBC(result_helper.Prepare());
- auto result = result_helper.Execute();
+ auto result = result_helper.Execute(error, params);
if (result != ADBC_STATUS_OK) {
auto error_code = std::string(error->sqlstate, 5);
if ((error_code == "42P01") || (error_code == "42602")) {
@@ -1337,10 +1324,8 @@ AdbcStatusCode PostgresConnection::SetOption(const char*
key, const char* value,
return ADBC_STATUS_OK;
} else if (std::strcmp(key, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA) == 0) {
// PostgreSQL doesn't accept a parameter here
- PqResultHelper result_helper{
- conn_, std::string("SET search_path TO ") + value, {}, error};
- RAISE_ADBC(result_helper.Prepare());
- RAISE_ADBC(result_helper.Execute());
+ PqResultHelper result_helper{conn_, std::string("SET search_path TO ") +
value};
+ RAISE_ADBC(result_helper.Execute(error));
return ADBC_STATUS_OK;
}
SetError(error, "%s%s", "[libpq] Unknown option ", key);
diff --git a/c/driver/postgresql/postgresql_test.cc
b/c/driver/postgresql/postgresql_test.cc
index a6d4f7704..ff3dc0b70 100644
--- a/c/driver/postgresql/postgresql_test.cc
+++ b/c/driver/postgresql/postgresql_test.cc
@@ -1247,7 +1247,7 @@ TEST_F(PostgresStatementTest, UpdateInExecuteQuery) {
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
- ASSERT_EQ(reader.rows_affected, -1);
+ ASSERT_EQ(reader.rows_affected, 2);
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(reader.array->release, nullptr);
@@ -1276,6 +1276,32 @@ TEST_F(PostgresStatementTest, UpdateInExecuteQuery) {
}
}
+TEST_F(PostgresStatementTest, ExecuteSchemaParameterizedQuery) {
+ nanoarrow::UniqueSchema schema_bind;
+ ArrowSchemaInit(schema_bind.get());
+ ASSERT_THAT(ArrowSchemaSetTypeStruct(schema_bind.get(), 1),
+ adbc_validation::IsOkErrno());
+ ASSERT_THAT(ArrowSchemaSetType(schema_bind->children[0],
NANOARROW_TYPE_STRING),
+ adbc_validation::IsOkErrno());
+
+ nanoarrow::UniqueArrayStream bind;
+ nanoarrow::EmptyArrayStream(schema_bind.get()).ToArrayStream(bind.get());
+
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error),
IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT $1", &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBindStream(&statement, bind.get(), &error),
IsOkStatus());
+
+ nanoarrow::UniqueSchema schema;
+ ASSERT_THAT(AdbcStatementExecuteSchema(&statement, schema.get(), &error),
+ IsOkStatus(&error));
+
+ ASSERT_EQ(1, schema->n_children);
+ ASSERT_STREQ("u", schema->children[0]->format);
+
+ ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
+}
+
TEST_F(PostgresStatementTest, BatchSizeHint) {
ASSERT_THAT(quirks()->EnsureSampleTable(&connection, "batch_size_hint_test",
&error),
IsOkStatus(&error));
@@ -1345,16 +1371,13 @@ TEST_F(PostgresStatementTest,
AdbcErrorBackwardsCompatibility) {
TEST_F(PostgresStatementTest, Cancel) {
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error),
IsOkStatus(&error));
- for (const char* query : {
- "DROP TABLE IF EXISTS test_cancel",
- "CREATE TABLE test_cancel (ints INT)",
- R"(INSERT INTO test_cancel (ints)
- SELECT g :: INT FROM GENERATE_SERIES(1, 65536) temp(g))",
- }) {
- ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query, &error),
IsOkStatus(&error));
- ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr,
&error),
- IsOkStatus(&error));
- }
+ const char* query = R"(DROP TABLE IF EXISTS test_cancel;
+ CREATE TABLE test_cancel (ints INT);
+ INSERT INTO test_cancel (ints)
+ SELECT g :: INT FROM GENERATE_SERIES(1, 65536) temp(g);)";
+ ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query, &error),
IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
+ IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT * FROM
test_cancel", &error),
IsOkStatus(&error));
@@ -1381,6 +1404,91 @@ TEST_F(PostgresStatementTest, Cancel) {
ASSERT_NE(0, AdbcErrorGetDetailCount(detail));
}
+TEST_F(PostgresStatementTest, MultipleStatementsSingleQuery) {
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error),
IsOkStatus(&error));
+
+ const char* query = R"(DROP TABLE IF EXISTS test_query_statements;
+ CREATE TABLE test_query_statements (ints INT);
+ INSERT INTO test_query_statements VALUES((1));
+ INSERT INTO test_query_statements VALUES((2));
+ INSERT INTO test_query_statements VALUES((3));)";
+ ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query, &error),
IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
+ IsOkStatus(&error));
+
+ ASSERT_THAT(
+ AdbcStatementSetSqlQuery(&statement, "SELECT * FROM
test_query_statements", &error),
+ IsOkStatus(&error));
+
+ adbc_validation::StreamReader reader;
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
+ &reader.rows_affected, &error),
+ IsOkStatus(&error));
+ reader.GetSchema();
+ ASSERT_THAT(reader.MaybeNext(), adbc_validation::IsOkErrno());
+ ASSERT_EQ(reader.array->length, 3);
+}
+
+TEST_F(PostgresStatementTest, SetUseCopyFalse) {
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error),
IsOkStatus(&error));
+
+ const char* query = R"(DROP TABLE IF EXISTS test_query_set_copy_false;
+ CREATE TABLE test_query_set_copy_false (ints INT);
+ INSERT INTO test_query_set_copy_false VALUES((1));
+ INSERT INTO test_query_set_copy_false VALUES((NULL));
+ INSERT INTO test_query_set_copy_false VALUES((3));)";
+ ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query, &error),
IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
+ IsOkStatus(&error));
+
+ // Check option setting/getting
+ ASSERT_EQ(
+ adbc_validation::StatementGetOption(&statement,
"adbc.postgresql.use_copy", &error),
+ "true");
+
+ ASSERT_THAT(AdbcStatementSetOption(&statement, "adbc.postgresql.use_copy",
+ "not true or false", &error),
+ IsStatus(ADBC_STATUS_INVALID_ARGUMENT));
+
+ ASSERT_THAT(AdbcStatementSetOption(&statement, "adbc.postgresql.use_copy",
+ ADBC_OPTION_VALUE_ENABLED, &error),
+ IsOkStatus(&error));
+ ASSERT_EQ(
+ adbc_validation::StatementGetOption(&statement,
"adbc.postgresql.use_copy", &error),
+ "true");
+
+ ASSERT_THAT(AdbcStatementSetOption(&statement, "adbc.postgresql.use_copy",
+ ADBC_OPTION_VALUE_DISABLED, &error),
+ IsOkStatus(&error));
+ ASSERT_EQ(
+ adbc_validation::StatementGetOption(&statement,
"adbc.postgresql.use_copy", &error),
+ "false");
+
+ ASSERT_THAT(AdbcStatementSetSqlQuery(&statement,
+ "SELECT * FROM
test_query_set_copy_false", &error),
+ IsOkStatus(&error));
+
+ adbc_validation::StreamReader reader;
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
+ &reader.rows_affected, &error),
+ IsOkStatus(&error));
+
+ ASSERT_EQ(reader.rows_affected, 3);
+
+ reader.GetSchema();
+ ASSERT_EQ(reader.schema->n_children, 1);
+ ASSERT_STREQ(reader.schema->children[0]->format, "i");
+ ASSERT_STREQ(reader.schema->children[0]->name, "ints");
+
+ ASSERT_THAT(reader.MaybeNext(), adbc_validation::IsOkErrno());
+ ASSERT_EQ(reader.array->length, 3);
+ ASSERT_EQ(reader.array->n_children, 1);
+ ASSERT_EQ(reader.array->children[0]->null_count, 1);
+
+ ASSERT_THAT(reader.MaybeNext(), adbc_validation::IsOkErrno());
+ ASSERT_EQ(reader.array->release, nullptr);
+}
+
struct TypeTestCase {
std::string name;
std::string sql_type;
diff --git a/c/driver/postgresql/result_helper.cc
b/c/driver/postgresql/result_helper.cc
index ad5a54e00..df890a7c5 100644
--- a/c/driver/postgresql/result_helper.cc
+++ b/c/driver/postgresql/result_helper.cc
@@ -17,24 +17,25 @@
#include "result_helper.h"
+#include <charconv>
+#include <memory>
+
+#include "copy/reader.h"
#include "driver/common/utils.h"
#include "error.h"
namespace adbcpq {
-PqResultHelper::~PqResultHelper() {
- if (result_ != nullptr) {
- PQclear(result_);
- }
-}
+PqResultHelper::~PqResultHelper() { ClearResult(); }
-AdbcStatusCode PqResultHelper::Prepare() {
+AdbcStatusCode PqResultHelper::PrepareInternal(int n_params, const Oid*
param_oids,
+ struct AdbcError* error) {
// TODO: make stmtName a unique identifier?
PGresult* result =
- PQprepare(conn_, /*stmtName=*/"", query_.c_str(), param_values_.size(),
NULL);
+ PQprepare(conn_, /*stmtName=*/"", query_.c_str(), n_params, param_oids);
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
AdbcStatusCode code =
- SetError(error_, result, "[libpq] Failed to prepare query: %s\nQuery
was:%s",
+ SetError(error, result, "[libpq] Failed to prepare query: %s\nQuery
was:%s",
PQerrorMessage(conn_), query_.c_str());
PQclear(result);
return code;
@@ -44,24 +45,327 @@ AdbcStatusCode PqResultHelper::Prepare() {
return ADBC_STATUS_OK;
}
-AdbcStatusCode PqResultHelper::Execute() {
- std::vector<const char*> param_c_strs;
+AdbcStatusCode PqResultHelper::Prepare(struct AdbcError* error) {
+ return PrepareInternal(0, nullptr, error);
+}
- for (size_t index = 0; index < param_values_.size(); index++) {
- param_c_strs.push_back(param_values_[index].c_str());
+AdbcStatusCode PqResultHelper::Prepare(const std::vector<Oid>& param_oids,
+ struct AdbcError* error) {
+ return PrepareInternal(param_oids.size(), param_oids.data(), error);
+}
+
+AdbcStatusCode PqResultHelper::DescribePrepared(struct AdbcError* error) {
+ ClearResult();
+ result_ = PQdescribePrepared(conn_, /*stmtName=*/"");
+ if (PQresultStatus(result_) != PGRES_COMMAND_OK) {
+ AdbcStatusCode code = SetError(
+ error, result_, "[libpq] Failed to describe prepared statement:
%s\nQuery was:%s",
+ PQerrorMessage(conn_), query_.c_str());
+ ClearResult();
+ return code;
}
- result_ =
- PQexecPrepared(conn_, "", param_values_.size(), param_c_strs.data(),
NULL, NULL, 0);
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode PqResultHelper::Execute(struct AdbcError* error,
+ const std::vector<std::string>& params,
+ PostgresType* param_types) {
+ if (params.size() == 0 && param_types == nullptr && output_format_ ==
Format::kText) {
+ ClearResult();
+ result_ = PQexec(conn_, query_.c_str());
+ } else {
+ std::vector<const char*> param_values;
+ std::vector<int> param_lengths;
+ std::vector<int> param_formats;
+
+ for (const auto& param : params) {
+ param_values.push_back(param.data());
+ param_lengths.push_back(static_cast<int>(param.size()));
+ param_formats.push_back(static_cast<int>(param_format_));
+ }
+
+ std::vector<Oid> param_oids;
+ const Oid* param_oids_ptr = nullptr;
+ if (param_types != nullptr) {
+ param_oids.resize(params.size());
+ for (size_t i = 0; i < params.size(); i++) {
+ param_oids[i] = param_types->child(i).oid();
+ }
+ param_oids_ptr = param_oids.data();
+ }
+
+ ClearResult();
+ result_ = PQexecParams(conn_, query_.c_str(), param_values.size(),
param_oids_ptr,
+ param_values.data(), param_lengths.data(),
+ param_formats.data(),
static_cast<int>(output_format_));
+ }
ExecStatusType status = PQresultStatus(result_);
if (status != PGRES_TUPLES_OK && status != PGRES_COMMAND_OK) {
- AdbcStatusCode error =
- SetError(error_, result_, "[libpq] Failed to execute query '%s': %s",
+ AdbcStatusCode status =
+ SetError(error, result_, "[libpq] Failed to execute query '%s': %s",
query_.c_str(), PQerrorMessage(conn_));
- return error;
+ return status;
+ }
+
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode PqResultHelper::ExecuteCopy(struct AdbcError* error) {
+ // Remove trailing semicolon(s) from the query before feeding it into COPY
+ while (!query_.empty() && query_.back() == ';') {
+ query_.pop_back();
}
+ std::string copy_query = "COPY (" + query_ + ") TO STDOUT (FORMAT binary)";
+ ClearResult();
+ result_ = PQexecParams(conn_, copy_query.c_str(), /*nParams=*/0,
+ /*paramTypes=*/nullptr, /*paramValues=*/nullptr,
+ /*paramLengths=*/nullptr, /*paramFormats=*/nullptr,
+ static_cast<int>(Format::kBinary));
+
+ if (PQresultStatus(result_) != PGRES_COPY_OUT) {
+ AdbcStatusCode code = SetError(
+ error, result_,
+ "[libpq] Failed to execute query: could not begin COPY: %s\nQuery was:
%s",
+ PQerrorMessage(conn_), copy_query.c_str());
+ ClearResult();
+ return code;
+ }
+
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode PqResultHelper::ResolveParamTypes(PostgresTypeResolver&
type_resolver,
+ PostgresType* param_types,
+ struct AdbcError* error) {
+ struct ArrowError na_error;
+ ArrowErrorInit(&na_error);
+
+ const int num_params = PQnparams(result_);
+ PostgresType root_type(PostgresTypeId::kRecord);
+
+ for (int i = 0; i < num_params; i++) {
+ const Oid pg_oid = PQparamtype(result_, i);
+ PostgresType pg_type;
+ if (type_resolver.Find(pg_oid, &pg_type, &na_error) != NANOARROW_OK) {
+ SetError(error, "%s%d%s%s%s%d", "[libpq] Parameter #", i + 1, " (\"",
+ PQfname(result_, i), "\") has unknown type code ", pg_oid);
+ ClearResult();
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+ }
+
+ root_type.AppendChild(PQfname(result_, i), pg_type);
+ }
+
+ *param_types = root_type;
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode PqResultHelper::ResolveOutputTypes(PostgresTypeResolver&
type_resolver,
+ PostgresType* result_types,
+ struct AdbcError* error) {
+ struct ArrowError na_error;
+ ArrowErrorInit(&na_error);
+
+ const int num_fields = PQnfields(result_);
+ PostgresType root_type(PostgresTypeId::kRecord);
+
+ for (int i = 0; i < num_fields; i++) {
+ const Oid pg_oid = PQftype(result_, i);
+ PostgresType pg_type;
+ if (type_resolver.Find(pg_oid, &pg_type, &na_error) != NANOARROW_OK) {
+ SetError(error, "%s%d%s%s%s%d", "[libpq] Column #", i + 1, " (\"",
+ PQfname(result_, i), "\") has unknown type code ", pg_oid);
+ ClearResult();
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+ }
+
+ root_type.AppendChild(PQfname(result_, i), pg_type);
+ }
+
+ *result_types = root_type;
+ return ADBC_STATUS_OK;
+}
+
+PGresult* PqResultHelper::ReleaseResult() {
+ PGresult* out = result_;
+ result_ = nullptr;
+ return out;
+}
+
+int64_t PqResultHelper::AffectedRows() {
+ if (result_ == nullptr) {
+ return -1;
+ }
+
+ char* first = PQcmdTuples(result_);
+ char* last = first + strlen(first);
+ if ((last - first) == 0) {
+ return -1;
+ }
+
+ int64_t out;
+ auto result = std::from_chars(first, last, out);
+
+ if (result.ec == std::errc() && result.ptr == last) {
+ return out;
+ } else {
+ return -1;
+ }
+}
+
+int PqResultArrayReader::GetSchema(struct ArrowSchema* out) {
+ ResetErrors();
+
+ if (schema_->release == nullptr) {
+ AdbcStatusCode status = Initialize(&error_);
+ if (status != ADBC_STATUS_OK) {
+ return EINVAL;
+ }
+ }
+
+ return ArrowSchemaDeepCopy(schema_.get(), out);
+}
+
+int PqResultArrayReader::GetNext(struct ArrowArray* out) {
+ ResetErrors();
+
+ if (schema_->release == nullptr) {
+ AdbcStatusCode status = Initialize(&error_);
+ if (status != ADBC_STATUS_OK) {
+ return EINVAL;
+ }
+ }
+
+ if (!helper_.HasResult()) {
+ out->release = nullptr;
+ return NANOARROW_OK;
+ }
+
+ nanoarrow::UniqueArray tmp;
+ NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromSchema(tmp.get(), schema_.get(),
&na_error_));
+ NANOARROW_RETURN_NOT_OK(ArrowArrayStartAppending(tmp.get()));
+ for (int i = 0; i < helper_.NumColumns(); i++) {
+ NANOARROW_RETURN_NOT_OK(field_readers_[i]->InitArray(tmp->children[i]));
+ }
+
+ // TODO: If we get an EOVERFLOW here (e.g., big string data), we
+ // would need to keep track of what row number we're on and start
+ // from there instead of begin() on the next call. We could also
+ // respect the size hint here to chunk the batches.
+ struct ArrowBufferView item;
+ for (auto it = helper_.begin(); it != helper_.end(); it++) {
+ auto row = *it;
+ for (int i = 0; i < helper_.NumColumns(); i++) {
+ auto pg_item = row[i];
+ item.data.data = pg_item.data;
+
+ if (pg_item.is_null) {
+ item.size_bytes = -1;
+ } else {
+ item.size_bytes = pg_item.len;
+ }
+
+ NANOARROW_RETURN_NOT_OK(
+ field_readers_[i]->Read(&item, item.size_bytes, tmp->children[i],
&na_error_));
+ }
+ }
+
+ for (int i = 0; i < helper_.NumColumns(); i++) {
+ NANOARROW_RETURN_NOT_OK(field_readers_[i]->FinishArray(tmp->children[i],
&na_error_));
+ }
+
+ tmp->length = helper_.NumRows();
+ tmp->null_count = 0;
+ NANOARROW_RETURN_NOT_OK(ArrowArrayFinishBuildingDefault(tmp.get(),
&na_error_));
+
+ // Ensure that the next call to GetNext() will signal the end of the stream
+ helper_.ClearResult();
+
+ // Canonically return zero-size results as an empty stream
+ if (tmp->length == 0) {
+ out->release = nullptr;
+ return NANOARROW_OK;
+ }
+
+ ArrowArrayMove(tmp.get(), out);
+ return NANOARROW_OK;
+}
+
+const char* PqResultArrayReader::GetLastError() {
+ if (error_.message != nullptr) {
+ return error_.message;
+ } else {
+ return na_error_.message;
+ }
+}
+
+AdbcStatusCode PqResultArrayReader::Initialize(struct AdbcError* error) {
+ helper_.set_output_format(PqResultHelper::Format::kBinary);
+ RAISE_ADBC(helper_.Execute(error));
+
+ ArrowSchemaInit(schema_.get());
+ CHECK_NA_DETAIL(INTERNAL, ArrowSchemaSetTypeStruct(schema_.get(),
helper_.NumColumns()),
+ &na_error_, error);
+
+ for (int i = 0; i < helper_.NumColumns(); i++) {
+ PostgresType child_type;
+ CHECK_NA_DETAIL(INTERNAL,
+ type_resolver_->Find(helper_.FieldType(i), &child_type,
&na_error_),
+ &na_error_, error);
+
+ CHECK_NA(INTERNAL, child_type.SetSchema(schema_->children[i]), error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(schema_->children[i],
helper_.FieldName(i)),
+ error);
+
+ std::unique_ptr<PostgresCopyFieldReader> child_reader;
+ CHECK_NA_DETAIL(
+ INTERNAL,
+ MakeCopyFieldReader(child_type, schema_->children[i], &child_reader,
&na_error_),
+ &na_error_, error);
+
+ child_reader->Init(child_type);
+ CHECK_NA_DETAIL(INTERNAL, child_reader->InitSchema(schema_->children[i]),
&na_error_,
+ error);
+
+ field_readers_.push_back(std::move(child_reader));
+ }
+
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode PqResultArrayReader::ToArrayStream(int64_t* affected_rows,
+ struct ArrowArrayStream* out,
+ struct AdbcError* error) {
+ if (out == nullptr) {
+ // If there is no output requested, we still need to execute and set
+ // affected_rows if needed. We don't need an output schema or to set
+ // up a copy reader, so we can skip those steps by going straight
+ // to Execute(). This also enables us to support queries with multiple
+ // statements because we can call PQexec() instead of PQexecParams().
+ RAISE_ADBC(helper_.Execute(error));
+
+ if (affected_rows != nullptr) {
+ *affected_rows = helper_.AffectedRows();
+ }
+
+ return ADBC_STATUS_OK;
+ }
+
+ // Execute eagerly. We need this to provide row counts for DELETE and
+ // CREATE TABLE queries as well as to provide more informative errors
+ // until this reader class is wired up to provide extended AdbcError
+ // information.
+ RAISE_ADBC(Initialize(error));
+ if (affected_rows != nullptr) {
+ *affected_rows = helper_.AffectedRows();
+ }
+
+ nanoarrow::ArrayStreamFactory<PqResultArrayReader>::InitArrayStream(
+ new PqResultArrayReader(this), out);
+
return ADBC_STATUS_OK;
}
diff --git a/c/driver/postgresql/result_helper.h
b/c/driver/postgresql/result_helper.h
index 25a79fad8..43083b8bc 100644
--- a/c/driver/postgresql/result_helper.h
+++ b/c/driver/postgresql/result_helper.h
@@ -18,6 +18,7 @@
#pragma once
#include <cassert>
+#include <memory>
#include <optional>
#include <string>
#include <utility>
@@ -26,6 +27,8 @@
#include <arrow-adbc/adbc.h>
#include <libpq-fe.h>
+#include "copy/reader.h"
+
namespace adbcpq {
/// \brief A single column in a single row of a result set.
@@ -73,25 +76,57 @@ class PqResultRow {
// prior to iterating
class PqResultHelper {
public:
- explicit PqResultHelper(PGconn* conn, std::string query, struct AdbcError*
error)
- : conn_(conn), query_(std::move(query)), error_(error) {}
+ enum class Format {
+ kText = 0,
+ kBinary = 1,
+ };
- explicit PqResultHelper(PGconn* conn, std::string query,
- std::vector<std::string> param_values, struct
AdbcError* error)
- : conn_(conn),
- query_(std::move(query)),
- param_values_(std::move(param_values)),
- error_(error) {}
+ explicit PqResultHelper(PGconn* conn, std::string query)
+ : conn_(conn), query_(std::move(query)) {}
+
+ PqResultHelper(PqResultHelper&& other)
+ : PqResultHelper(other.conn_, std::move(other.query_)) {
+ result_ = other.result_;
+ other.result_ = nullptr;
+ }
~PqResultHelper();
- AdbcStatusCode Prepare();
- AdbcStatusCode Execute();
+ void set_param_format(Format format) { param_format_ = format; }
+ void set_output_format(Format format) { output_format_ = format; }
+
+ AdbcStatusCode Prepare(struct AdbcError* error);
+ AdbcStatusCode Prepare(const std::vector<Oid>& param_oids, struct AdbcError*
error);
+ AdbcStatusCode DescribePrepared(struct AdbcError* error);
+ AdbcStatusCode Execute(struct AdbcError* error,
+ const std::vector<std::string>& params = {},
+ PostgresType* param_types = nullptr);
+ AdbcStatusCode ExecuteCopy(struct AdbcError* error);
+ AdbcStatusCode ResolveParamTypes(PostgresTypeResolver& type_resolver,
+ PostgresType* param_types, struct
AdbcError* error);
+ AdbcStatusCode ResolveOutputTypes(PostgresTypeResolver& type_resolver,
+ PostgresType* result_types, struct
AdbcError* error);
+
+ bool HasResult() { return result_ != nullptr; }
+
+ PGresult* ReleaseResult();
+
+ void ClearResult() {
+ PQclear(result_);
+ result_ = nullptr;
+ }
+
+ int64_t AffectedRows();
int NumRows() const { return PQntuples(result_); }
int NumColumns() const { return PQnfields(result_); }
+ const char* FieldName(int column_number) const {
+ return PQfname(result_, column_number);
+ }
+ Oid FieldType(int column_number) const { return PQftype(result_,
column_number); }
+
class iterator {
const PqResultHelper& outer_;
int curr_row_ = 0;
@@ -127,7 +162,58 @@ class PqResultHelper {
PGresult* result_ = nullptr;
PGconn* conn_;
std::string query_;
- std::vector<std::string> param_values_;
- struct AdbcError* error_;
+ Format param_format_ = Format::kText;
+ Format output_format_ = Format::kText;
+
+ AdbcStatusCode PrepareInternal(int n_params, const Oid* param_oids,
+ struct AdbcError* error);
};
+
+class PqResultArrayReader {
+ public:
+ PqResultArrayReader(PGconn* conn, std::shared_ptr<PostgresTypeResolver>
type_resolver,
+ std::string query)
+ : helper_(conn, std::move(query)), type_resolver_(type_resolver) {
+ ArrowErrorInit(&na_error_);
+ error_ = ADBC_ERROR_INIT;
+ }
+
+ ~PqResultArrayReader() { ResetErrors(); }
+
+ int GetSchema(struct ArrowSchema* out);
+ int GetNext(struct ArrowArray* out);
+ const char* GetLastError();
+
+ AdbcStatusCode ToArrayStream(int64_t* affected_rows, struct
ArrowArrayStream* out,
+ struct AdbcError* error);
+
+ AdbcStatusCode Initialize(struct AdbcError* error);
+
+ private:
+ PqResultHelper helper_;
+ std::shared_ptr<PostgresTypeResolver> type_resolver_;
+ std::vector<std::unique_ptr<PostgresCopyFieldReader>> field_readers_;
+ nanoarrow::UniqueSchema schema_;
+ struct AdbcError error_;
+ struct ArrowError na_error_;
+
+ explicit PqResultArrayReader(PqResultArrayReader* other)
+ : helper_(std::move(other->helper_)),
+ type_resolver_(std::move(other->type_resolver_)),
+ field_readers_(std::move(other->field_readers_)),
+ schema_(std::move(other->schema_)) {
+ ArrowErrorInit(&na_error_);
+ error_ = ADBC_ERROR_INIT;
+ }
+
+ void ResetErrors() {
+ ArrowErrorInit(&na_error_);
+
+ if (error_.private_data != nullptr) {
+ error_.release(&error_);
+ }
+ error_ = ADBC_ERROR_INIT;
+ }
+};
+
} // namespace adbcpq
diff --git a/c/driver/postgresql/statement.cc b/c/driver/postgresql/statement.cc
index 444355123..0fa8a79b9 100644
--- a/c/driver/postgresql/statement.cc
+++ b/c/driver/postgresql/statement.cc
@@ -82,30 +82,6 @@ struct OneValueStream {
}
};
-/// Build an PostgresType object from a PGresult*
-AdbcStatusCode ResolvePostgresType(const PostgresTypeResolver& type_resolver,
- PGresult* result, PostgresType* out,
- struct AdbcError* error) {
- ArrowError na_error;
- const int num_fields = PQnfields(result);
- PostgresType root_type(PostgresTypeId::kRecord);
-
- for (int i = 0; i < num_fields; i++) {
- const Oid pg_oid = PQftype(result, i);
- PostgresType pg_type;
- if (type_resolver.Find(pg_oid, &pg_type, &na_error) != NANOARROW_OK) {
- SetError(error, "%s%d%s%s%s%d", "[libpq] Column #", i + 1, " (\"",
- PQfname(result, i), "\") has unknown type code ", pg_oid);
- return ADBC_STATUS_NOT_IMPLEMENTED;
- }
-
- root_type.AppendChild(PQfname(result, i), pg_type);
- }
-
- *out = root_type;
- return ADBC_STATUS_OK;
-}
-
/// Helper to manage bind parameters with a prepared statement
struct BindStream {
Handle<struct ArrowArrayStream> bind;
@@ -1148,18 +1124,15 @@ AdbcStatusCode PostgresStatement::CreateBulkTable(
return ADBC_STATUS_OK;
}
-AdbcStatusCode PostgresStatement::ExecutePreparedStatement(
- struct ArrowArrayStream* stream, int64_t* rows_affected, struct AdbcError*
error) {
- if (!bind_.release) {
- // TODO: set an empty stream just to unify the code paths
- SetError(error, "%s",
- "[libpq] Prepared statements without parameters are not
implemented");
- return ADBC_STATUS_NOT_IMPLEMENTED;
- }
+AdbcStatusCode PostgresStatement::ExecuteBind(struct ArrowArrayStream* stream,
+ int64_t* rows_affected,
+ struct AdbcError* error) {
if (stream) {
// TODO:
SetError(error, "%s",
- "[libpq] Prepared statements returning result sets are not
implemented");
+ "[libpq] Prepared statements with parameters returning result
sets are not "
+ "implemented");
+
return ADBC_STATUS_NOT_IMPLEMENTED;
}
@@ -1178,27 +1151,10 @@ AdbcStatusCode PostgresStatement::ExecuteQuery(struct
ArrowArrayStream* stream,
int64_t* rows_affected,
struct AdbcError* error) {
ClearResult();
- if (prepared_) {
- if (bind_.release || !stream) {
- return ExecutePreparedStatement(stream, rows_affected, error);
- }
- // XXX: don't use a prepared statement to execute a no-parameter
- // result-set-returning query for now, since we can't easily get
- // access to COPY there. (This might have to become sequential
- // executions of COPY (EXECUTE ($n, ...)) TO STDOUT which won't
- // get all the benefits of a prepared statement.) At preparation
- // time we don't know whether the query will be used with a result
- // set or not without analyzing the query (we could prepare both?)
- // and https://stackoverflow.com/questions/69233792 suggests that
- // you can't PREPARE a query containing COPY.
- }
- if (!stream && !ingest_.target.empty()) {
- return ExecuteUpdateBulk(rows_affected, error);
- }
- // Remove trailing semicolon(s) from the query before feeding it into COPY
- while (!query_.empty() && query_.back() == ';') {
- query_.pop_back();
+ // Use a dedicated path to handle bulk ingest
+ if (!ingest_.target.empty()) {
+ return ExecuteIngest(stream, rows_affected, error);
}
if (query_.empty()) {
@@ -1206,53 +1162,53 @@ AdbcStatusCode PostgresStatement::ExecuteQuery(struct
ArrowArrayStream* stream,
return ADBC_STATUS_INVALID_STATE;
}
- // 1. Prepare the query to get the schema
- {
- RAISE_ADBC(SetupReader(error));
-
- // If the caller did not request a result set or if there are no
- // inferred output columns (e.g. a CREATE or UPDATE), then don't
- // use COPY (which would fail anyways)
- if (!stream || reader_.copy_reader_->pg_type().n_children() == 0) {
- RAISE_ADBC(ExecuteUpdateQuery(rows_affected, error));
- if (stream) {
- struct ArrowSchema schema;
- std::memset(&schema, 0, sizeof(schema));
- RAISE_NA(reader_.copy_reader_->GetSchema(&schema));
- nanoarrow::EmptyArrayStream::MakeUnique(&schema).move(stream);
- }
- return ADBC_STATUS_OK;
- }
+ // Use a dedicated path to handle parameter binding
+ if (bind_.release != nullptr) {
+ return ExecuteBind(stream, rows_affected, error);
+ }
- // This resolves the reader specific to each PostgresType -> ArrowSchema
- // conversion. It is unlikely that this will fail given that we have just
- // inferred these conversions ourselves.
- struct ArrowError na_error;
- int na_res = reader_.copy_reader_->InitFieldReaders(&na_error);
- if (na_res != NANOARROW_OK) {
- SetError(error, "[libpq] Failed to initialize field readers: %s",
na_error.message);
- return na_res;
- }
+ // If we have been requested to avoid COPY or there is no output requested,
+ // execute using the PqResultArrayReader.
+ if (!stream || !use_copy_) {
+ PqResultArrayReader reader(connection_->conn(), type_resolver_, query_);
+ RAISE_ADBC(reader.ToArrayStream(rows_affected, stream, error));
+ return ADBC_STATUS_OK;
}
- // 2. Execute the query with COPY to get binary tuples
- {
- std::string copy_query = "COPY (" + query_ + ") TO STDOUT (FORMAT binary)";
- reader_.result_ =
- PQexecParams(connection_->conn(), copy_query.c_str(), /*nParams=*/0,
- /*paramTypes=*/nullptr, /*paramValues=*/nullptr,
- /*paramLengths=*/nullptr, /*paramFormats=*/nullptr,
kPgBinaryFormat);
- if (PQresultStatus(reader_.result_) != PGRES_COPY_OUT) {
- AdbcStatusCode code = SetError(
- error, reader_.result_,
- "[libpq] Failed to execute query: could not begin COPY: %s\nQuery
was: %s",
- PQerrorMessage(connection_->conn()), copy_query.c_str());
- ClearResult();
- return code;
- }
- // Result is read from the connection, not the result, but we won't clear
it here
+ PqResultHelper helper(connection_->conn(), query_);
+ RAISE_ADBC(helper.Prepare(error));
+ RAISE_ADBC(helper.DescribePrepared(error));
+
+ // Initialize the copy reader and infer the output schema (i.e., error for
+ // unsupported types before issuing the COPY query). This could be lazier
+ // (i.e., executed on the first call to GetSchema() or GetNext()).
+ PostgresType root_type;
+ RAISE_ADBC(helper.ResolveOutputTypes(*type_resolver_, &root_type, error));
+
+ // If there will be no columns in the result, we can also avoid COPY
+ if (root_type.n_children() == 0) {
+ // Could/should move the helper into the reader instead of repreparing
+ PqResultArrayReader reader(connection_->conn(), type_resolver_, query_);
+ RAISE_ADBC(reader.ToArrayStream(rows_affected, stream, error));
+ return ADBC_STATUS_OK;
}
+ struct ArrowError na_error;
+ reader_.copy_reader_ = std::make_unique<PostgresCopyStreamReader>();
+ CHECK_NA(INTERNAL, reader_.copy_reader_->Init(root_type), error);
+ CHECK_NA_DETAIL(INTERNAL,
reader_.copy_reader_->InferOutputSchema(&na_error), &na_error,
+ error);
+
+ CHECK_NA_DETAIL(INTERNAL, reader_.copy_reader_->InitFieldReaders(&na_error),
&na_error,
+ error);
+
+ // Execute the COPY query
+ RAISE_ADBC(helper.ExecuteCopy(error));
+
+ // We need the PQresult back for the reader
+ reader_.result_ = helper.ReleaseResult();
+
+ // Export to stream
reader_.ExportTo(stream);
if (rows_affected) *rows_affected = -1;
return ADBC_STATUS_OK;
@@ -1264,31 +1220,69 @@ AdbcStatusCode PostgresStatement::ExecuteSchema(struct
ArrowSchema* schema,
if (query_.empty()) {
SetError(error, "%s", "[libpq] Must SetSqlQuery before ExecuteQuery");
return ADBC_STATUS_INVALID_STATE;
- } else if (bind_.release) {
- // TODO: if we have parameters, bind them (since they can affect the
output schema)
- SetError(error, "[libpq] ExecuteSchema with parameters is not
implemented");
- return ADBC_STATUS_NOT_IMPLEMENTED;
}
- RAISE_ADBC(SetupReader(error));
- CHECK_NA(INTERNAL, reader_.copy_reader_->GetSchema(schema), error);
+ PqResultHelper helper(connection_->conn(), query_);
+
+ if (bind_.release) {
+ nanoarrow::UniqueSchema schema;
+ struct ArrowError na_error;
+ ArrowErrorInit(&na_error);
+ CHECK_NA_DETAIL(INTERNAL, ArrowArrayStreamGetSchema(&bind_, schema.get(),
&na_error),
+ &na_error, error);
+
+ if (std::string(schema->format) != "+s") {
+ SetError(error, "%s", "[libpq] Bind parameters must have type STRUCT");
+ return ADBC_STATUS_INVALID_STATE;
+ }
+
+ std::vector<Oid> param_oids(schema->n_children);
+ for (int64_t i = 0; i < schema->n_children; i++) {
+ PostgresType pg_type;
+ CHECK_NA_DETAIL(INTERNAL,
+ PostgresType::FromSchema(*type_resolver_,
schema->children[i],
+ &pg_type, &na_error),
+ &na_error, error);
+ param_oids[i] = pg_type.oid();
+ }
+
+ RAISE_ADBC(helper.Prepare(param_oids, error));
+ } else {
+ RAISE_ADBC(helper.Prepare(error));
+ }
+
+ RAISE_ADBC(helper.DescribePrepared(error));
+
+ PostgresType output_type;
+ RAISE_ADBC(helper.ResolveOutputTypes(*type_resolver_, &output_type, error));
+
+ nanoarrow::UniqueSchema tmp;
+ ArrowSchemaInit(tmp.get());
+ CHECK_NA(INTERNAL, output_type.SetSchema(tmp.get()), error);
+
+ tmp.move(schema);
return ADBC_STATUS_OK;
}
-AdbcStatusCode PostgresStatement::ExecuteUpdateBulk(int64_t* rows_affected,
- struct AdbcError* error) {
+AdbcStatusCode PostgresStatement::ExecuteIngest(struct ArrowArrayStream*
stream,
+ int64_t* rows_affected,
+ struct AdbcError* error) {
if (!bind_.release) {
SetError(error, "%s", "[libpq] Must Bind() before Execute() for bulk
ingestion");
return ADBC_STATUS_INVALID_STATE;
}
+ if (stream != nullptr) {
+ SetError(error, "%s", "[libpq] Bulk ingest with result set is not
supported");
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+ }
+
// Need the current schema to avoid being shadowed by temp tables
// This is a little unfortunate; we need another DB roundtrip
std::string current_schema;
{
- PqResultHelper result_helper{connection_->conn(), "SELECT CURRENT_SCHEMA",
{}, error};
- RAISE_ADBC(result_helper.Prepare());
- RAISE_ADBC(result_helper.Execute());
+ PqResultHelper result_helper{connection_->conn(), "SELECT CURRENT_SCHEMA"};
+ RAISE_ADBC(result_helper.Execute(error));
auto it = result_helper.begin();
if (it == result_helper.end()) {
SetError(error, "[libpq] PostgreSQL returned no rows for 'SELECT
CURRENT_SCHEMA'");
@@ -1329,37 +1323,6 @@ AdbcStatusCode
PostgresStatement::ExecuteUpdateBulk(int64_t* rows_affected,
return ADBC_STATUS_OK;
}
-AdbcStatusCode PostgresStatement::ExecuteUpdateQuery(int64_t* rows_affected,
- struct AdbcError* error) {
- // NOTE: must prepare first (used in ExecuteQuery)
- PGresult* result =
- PQexecPrepared(connection_->conn(), /*stmtName=*/"", /*nParams=*/0,
- /*paramValues=*/nullptr, /*paramLengths=*/nullptr,
- /*paramFormats=*/nullptr,
/*resultFormat=*/kPgBinaryFormat);
- ExecStatusType status = PQresultStatus(result);
- if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK) {
- AdbcStatusCode code =
- SetError(error, result, "[libpq] Failed to execute query: %s\nQuery
was:%s",
- PQerrorMessage(connection_->conn()), query_.c_str());
- PQclear(result);
- return code;
- }
- if (rows_affected) {
- if (status == PGRES_TUPLES_OK) {
- *rows_affected = PQntuples(reader_.result_);
- } else {
- // In theory, PQcmdTuples would work here, but experimentally it gives
- // an empty string even for a DELETE. (Also, why does it return a
- // string...) Possibly, it doesn't work because we use PQexecPrepared
- // but the docstring is careful to specify it works on an EXECUTE of a
- // prepared statement.
- *rows_affected = -1;
- }
- }
- PQclear(result);
- return ADBC_STATUS_OK;
-}
-
AdbcStatusCode PostgresStatement::GetOption(const char* key, char* value,
size_t* length,
struct AdbcError* error) {
std::string result;
@@ -1384,6 +1347,12 @@ AdbcStatusCode PostgresStatement::GetOption(const char*
key, char* value, size_t
}
} else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) ==
0) {
result = std::to_string(reader_.batch_size_hint_bytes_);
+ } else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_USE_COPY) == 0) {
+ if (use_copy_) {
+ result = "true";
+ } else {
+ result = "false";
+ }
} else {
SetError(error, "[libpq] Unknown statement option '%s'", key);
return ADBC_STATUS_NOT_FOUND;
@@ -1503,6 +1472,15 @@ AdbcStatusCode PostgresStatement::SetOption(const char*
key, const char* value,
}
this->reader_.batch_size_hint_bytes_ = int_value;
+ } else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_USE_COPY) == 0) {
+ if (std::strcmp(value, ADBC_OPTION_VALUE_ENABLED) == 0) {
+ use_copy_ = true;
+ } else if (std::strcmp(value, ADBC_OPTION_VALUE_DISABLED) == 0) {
+ use_copy_ = false;
+ } else {
+ SetError(error, "[libpq] Invalid value '%s' for option '%s'", value,
key);
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
} else {
SetError(error, "[libpq] Unknown statement option '%s'", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
@@ -1537,53 +1515,9 @@ AdbcStatusCode PostgresStatement::SetOptionInt(const
char* key, int64_t value,
return ADBC_STATUS_NOT_IMPLEMENTED;
}
-AdbcStatusCode PostgresStatement::SetupReader(struct AdbcError* error) {
- // TODO: we should pipeline here and assume this will succeed
- PGresult* result = PQprepare(connection_->conn(), /*stmtName=*/"",
query_.c_str(),
- /*nParams=*/0, nullptr);
- if (PQresultStatus(result) != PGRES_COMMAND_OK) {
- AdbcStatusCode code =
- SetError(error, result,
- "[libpq] Failed to execute query: could not infer schema:
failed to "
- "prepare query: %s\nQuery was:%s",
- PQerrorMessage(connection_->conn()), query_.c_str());
- PQclear(result);
- return code;
- }
- PQclear(result);
- result = PQdescribePrepared(connection_->conn(), /*stmtName=*/"");
- if (PQresultStatus(result) != PGRES_COMMAND_OK) {
- AdbcStatusCode code =
- SetError(error, result,
- "[libpq] Failed to execute query: could not infer schema:
failed to "
- "describe prepared statement: %s\nQuery was:%s",
- PQerrorMessage(connection_->conn()), query_.c_str());
- PQclear(result);
- return code;
- }
-
- // Resolve the information from the PGresult into a PostgresType
- PostgresType root_type;
- AdbcStatusCode status = ResolvePostgresType(*type_resolver_, result,
&root_type, error);
- PQclear(result);
- if (status != ADBC_STATUS_OK) return status;
-
- // Initialize the copy reader and infer the output schema (i.e., error for
- // unsupported types before issuing the COPY query)
- reader_.copy_reader_ = std::make_unique<PostgresCopyStreamReader>();
- reader_.copy_reader_->Init(root_type);
- struct ArrowError na_error;
- int na_res = reader_.copy_reader_->InferOutputSchema(&na_error);
- if (na_res != NANOARROW_OK) {
- SetError(error, "[libpq] Failed to infer output schema: (%d) %s: %s",
na_res,
- std::strerror(na_res), na_error.message);
- return ADBC_STATUS_INTERNAL;
- }
- return ADBC_STATUS_OK;
-}
-
void PostgresStatement::ClearResult() {
// TODO: we may want to synchronize here for safety
reader_.Release();
}
+
} // namespace adbcpq
diff --git a/c/driver/postgresql/statement.h b/c/driver/postgresql/statement.h
index f2387a3ac..1cd60bff5 100644
--- a/c/driver/postgresql/statement.h
+++ b/c/driver/postgresql/statement.h
@@ -33,6 +33,8 @@
#define ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES \
"adbc.postgresql.batch_size_hint_bytes"
+#define ADBC_POSTGRESQL_OPTION_USE_COPY "adbc.postgresql.use_copy"
+
namespace adbcpq {
class PostgresConnection;
class PostgresStatement;
@@ -90,7 +92,11 @@ class TupleReader final {
class PostgresStatement {
public:
PostgresStatement()
- : connection_(nullptr), query_(), prepared_(false), reader_(nullptr) {
+ : connection_(nullptr),
+ query_(),
+ prepared_(false),
+ use_copy_(true),
+ reader_(nullptr) {
std::memset(&bind_, 0, sizeof(bind_));
}
@@ -130,12 +136,10 @@ class PostgresStatement {
const std::vector<struct ArrowSchemaView>& source_schema_fields,
std::string* escaped_table, std::string* escaped_field_list,
struct AdbcError* error);
- AdbcStatusCode ExecuteUpdateBulk(int64_t* rows_affected, struct AdbcError*
error);
- AdbcStatusCode ExecuteUpdateQuery(int64_t* rows_affected, struct AdbcError*
error);
- AdbcStatusCode ExecutePreparedStatement(struct ArrowArrayStream* stream,
- int64_t* rows_affected,
- struct AdbcError* error);
- AdbcStatusCode SetupReader(struct AdbcError* error);
+ AdbcStatusCode ExecuteIngest(struct ArrowArrayStream* stream, int64_t*
rows_affected,
+ struct AdbcError* error);
+ AdbcStatusCode ExecuteBind(struct ArrowArrayStream* stream, int64_t*
rows_affected,
+ struct AdbcError* error);
private:
std::shared_ptr<PostgresTypeResolver> type_resolver_;
@@ -154,6 +158,9 @@ class PostgresStatement {
kCreateAppend,
};
+ // Options
+ bool use_copy_;
+
struct {
std::string db_schema;
std::string target;
diff --git a/c/validation/adbc_validation_util.cc
b/c/validation/adbc_validation_util.cc
index b319e5495..54c18cce7 100644
--- a/c/validation/adbc_validation_util.cc
+++ b/c/validation/adbc_validation_util.cc
@@ -36,6 +36,20 @@ std::optional<std::string> ConnectionGetOption(struct
AdbcConnection* connection
return std::string(buffer, buffer_size - 1);
}
+std::optional<std::string> StatementGetOption(struct AdbcStatement* statement,
+ std::string_view option,
+ struct AdbcError* error) {
+ char buffer[128];
+ size_t buffer_size = sizeof(buffer);
+ AdbcStatusCode status =
+ AdbcStatementGetOption(statement, option.data(), buffer, &buffer_size,
error);
+ EXPECT_THAT(status, IsOkStatus(error));
+ if (status != ADBC_STATUS_OK) return std::nullopt;
+ EXPECT_GT(buffer_size, 0);
+ if (buffer_size == 0) return std::nullopt;
+ return std::string(buffer, buffer_size - 1);
+}
+
std::string StatusCodeToString(AdbcStatusCode code) {
#define CASE(CONSTANT) \
case ADBC_STATUS_##CONSTANT: \
diff --git a/c/validation/adbc_validation_util.h
b/c/validation/adbc_validation_util.h
index 0dba6dafb..21eca52de 100644
--- a/c/validation/adbc_validation_util.h
+++ b/c/validation/adbc_validation_util.h
@@ -43,6 +43,10 @@ std::optional<std::string> ConnectionGetOption(struct
AdbcConnection* connection
std::string_view option,
struct AdbcError* error);
+std::optional<std::string> StatementGetOption(struct AdbcStatement* statement,
+ std::string_view option,
+ struct AdbcError* error);
+
// ------------------------------------------------------------
// Helpers to print values
diff --git a/python/adbc_driver_postgresql/tests/test_dbapi.py
b/python/adbc_driver_postgresql/tests/test_dbapi.py
index 94cd9f82d..9e2661d54 100644
--- a/python/adbc_driver_postgresql/tests/test_dbapi.py
+++ b/python/adbc_driver_postgresql/tests/test_dbapi.py
@@ -148,7 +148,7 @@ def test_query_execute_schema(postgres: dbapi.Connection)
-> None:
def test_query_invalid(postgres: dbapi.Connection) -> None:
with postgres.cursor() as cur:
with pytest.raises(
- postgres.ProgrammingError, match="failed to prepare query"
+ postgres.ProgrammingError, match="Failed to prepare query"
) as excinfo:
cur.execute("SELECT * FROM tabledoesnotexist")