This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new a48ccecbc refactor(c/driver/postgresql): Use Status instead of
AdbcStatusCode/AdbcError in result helper (#2178)
a48ccecbc is described below
commit a48ccecbcedbe7e01160553e2c12f297211896b2
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue Sep 24 23:18:12 2024 -0500
refactor(c/driver/postgresql): Use Status instead of
AdbcStatusCode/AdbcError in result helper (#2178)
This PR migrates the internal PqResultHelper and PqResultReader to use
the Status error handling system. This is to make it less painful to
transition the bind stream and objects helper (and later the
database/connection/statement) to the framework.
The CI failures are still GLib packaging and Python packaging.
---------
Co-authored-by: David Li <[email protected]>
---
c/driver/common/utils.c | 4 ++
c/driver/common/utils.h | 1 +
c/driver/framework/status.h | 37 ++++++++++
c/driver/postgresql/connection.cc | 28 +++-----
c/driver/postgresql/error.cc | 73 ++++---------------
c/driver/postgresql/error.h | 73 ++++++++++++++++++-
c/driver/postgresql/postgresql.cc | 32 +++++++--
c/driver/postgresql/result_helper.cc | 87 +++++++++++------------
c/driver/postgresql/result_helper.h | 27 +++----
c/driver/postgresql/result_reader.cc | 133 ++++++++++++++++++++---------------
c/driver/postgresql/result_reader.h | 9 ++-
c/driver/postgresql/statement.cc | 24 +++----
12 files changed, 315 insertions(+), 213 deletions(-)
diff --git a/c/driver/common/utils.c b/c/driver/common/utils.c
index 9df2b805a..00ebd5193 100644
--- a/c/driver/common/utils.c
+++ b/c/driver/common/utils.c
@@ -235,6 +235,10 @@ struct AdbcErrorDetail CommonErrorGetDetail(const struct
AdbcError* error, int i
};
}
+bool IsCommonError(const struct AdbcError* error) {
+ return error->release == ReleaseErrorWithDetails || error->release ==
ReleaseError;
+}
+
int StringBuilderInit(struct StringBuilder* builder, size_t initial_size) {
builder->buffer = (char*)malloc(initial_size);
if (builder->buffer == NULL) return errno;
diff --git a/c/driver/common/utils.h b/c/driver/common/utils.h
index ec3227f88..d204821b2 100644
--- a/c/driver/common/utils.h
+++ b/c/driver/common/utils.h
@@ -53,6 +53,7 @@ void AppendErrorDetail(struct AdbcError* error, const char*
key, const uint8_t*
int CommonErrorGetDetailCount(const struct AdbcError* error);
struct AdbcErrorDetail CommonErrorGetDetail(const struct AdbcError* error, int
index);
+bool IsCommonError(const struct AdbcError* error);
struct StringBuilder {
char* buffer;
diff --git a/c/driver/framework/status.h b/c/driver/framework/status.h
index d7952a69f..cfdca6ebb 100644
--- a/c/driver/framework/status.h
+++ b/c/driver/framework/status.h
@@ -69,6 +69,19 @@ class Status {
impl_->details.push_back({std::move(key), std::move(value)});
}
+ /// \brief Set the sqlstate of this status
+ void SetSqlState(std::string sqlstate) {
+ assert(impl_ != nullptr);
+ std::memset(impl_->sql_state, 0, sizeof(impl_->sql_state));
+ for (size_t i = 0; i < sqlstate.size(); i++) {
+ if (i >= sizeof(impl_->sql_state)) {
+ break;
+ }
+
+ impl_->sql_state[i] = sqlstate[i];
+ }
+ }
+
/// \brief Export this status to an AdbcError.
AdbcStatusCode ToAdbc(AdbcError* adbc_error) const {
if (impl_ == nullptr) return ADBC_STATUS_OK;
@@ -112,6 +125,27 @@ class Status {
return status;
}
+ // Helpers to create statuses with known codes
+ static Status Ok() { return Status(); }
+
+#define STATUS_CTOR(NAME, CODE) \
+ template <typename... Args> \
+ static Status NAME(Args&&... args) { \
+ std::stringstream ss; \
+ ([&] { ss << args; }(), ...); \
+ return Status(ADBC_STATUS_##CODE, ss.str()); \
+ }
+
+ STATUS_CTOR(Internal, INTERNAL)
+ STATUS_CTOR(InvalidArgument, INVALID_ARGUMENT)
+ STATUS_CTOR(InvalidState, INVALID_STATE)
+ STATUS_CTOR(IO, IO)
+ STATUS_CTOR(NotFound, NOT_FOUND)
+ STATUS_CTOR(NotImplemented, NOT_IMPLEMENTED)
+ STATUS_CTOR(Unknown, UNKNOWN)
+
+#undef STATUS_CTOR
+
private:
struct Impl {
// invariant: code is never OK
@@ -133,6 +167,8 @@ class Status {
template <typename DatabaseT, typename ConnectionT, typename StatementT>
friend class Driver;
+ // Allow access to these for drivers transitioning to the framework
+ public:
int CDetailCount() const { return impl_ ?
static_cast<int>(impl_->details.size()) : 0; }
AdbcErrorDetail CDetail(int index) const {
@@ -144,6 +180,7 @@ class Status {
detail.second.size()};
}
+ private:
static void CRelease(AdbcError* error) {
if (error->vendor_code == ADBC_ERROR_VENDOR_CODE_PRIVATE_DATA) {
auto* error_obj = reinterpret_cast<Status*>(error->private_data);
diff --git a/c/driver/postgresql/connection.cc
b/c/driver/postgresql/connection.cc
index 7b5bb9791..ca88c0a4e 100644
--- a/c/driver/postgresql/connection.cc
+++ b/c/driver/postgresql/connection.cc
@@ -149,7 +149,7 @@ class PqGetObjectsHelper {
auto result_helper = PqResultHelper{conn_, std::string(query.buffer)};
StringBuilderReset(&query);
- RAISE_ADBC(result_helper.Execute(error_, params));
+ RAISE_STATUS(error_, result_helper.Execute(params));
for (PqResultRow row : result_helper) {
const char* schema_name = row[0].data;
@@ -190,7 +190,7 @@ class PqGetObjectsHelper {
PqResultHelper result_helper = PqResultHelper{conn_,
std::string(query.buffer)};
StringBuilderReset(&query);
- RAISE_ADBC(result_helper.Execute(error_, params));
+ RAISE_STATUS(error_, result_helper.Execute(params));
for (PqResultRow row : result_helper) {
const char* db_name = row[0].data;
@@ -280,7 +280,7 @@ class PqGetObjectsHelper {
auto result_helper = PqResultHelper{conn_, query.buffer};
StringBuilderReset(&query);
- RAISE_ADBC(result_helper.Execute(error_, params));
+ RAISE_STATUS(error_, result_helper.Execute(params));
for (PqResultRow row : result_helper) {
const char* table_name = row[0].data;
const char* table_type = row[1].data;
@@ -340,7 +340,7 @@ class PqGetObjectsHelper {
auto result_helper = PqResultHelper{conn_, query.buffer};
StringBuilderReset(&query);
- RAISE_ADBC(result_helper.Execute(error_, params));
+ RAISE_STATUS(error_, result_helper.Execute(params));
for (PqResultRow row : result_helper) {
const char* column_name = row[0].data;
@@ -491,7 +491,7 @@ class PqGetObjectsHelper {
auto result_helper = PqResultHelper{conn_, query.buffer};
StringBuilderReset(&query);
- RAISE_ADBC(result_helper.Execute(error_, params));
+ RAISE_STATUS(error_, result_helper.Execute(params));
for (PqResultRow row : result_helper) {
const char* constraint_name = row[0].data;
@@ -655,7 +655,7 @@ AdbcStatusCode PostgresConnection::GetInfo(struct
AdbcConnection* connection,
case ADBC_INFO_VENDOR_VERSION: {
const char* stmt = "SHOW server_version_num";
auto result_helper = PqResultHelper{conn_, std::string(stmt)};
- RAISE_ADBC(result_helper.Execute(error));
+ RAISE_STATUS(error, result_helper.Execute());
auto it = result_helper.begin();
if (it == result_helper.end()) {
SetError(error, "[libpq] PostgreSQL returned no rows for '%s'",
stmt);
@@ -719,7 +719,7 @@ AdbcStatusCode PostgresConnection::GetOption(const char*
option, char* value,
output = PQdb(conn_);
} else if (std::strcmp(option, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA) ==
0) {
PqResultHelper result_helper{conn_, "SELECT CURRENT_SCHEMA"};
- RAISE_ADBC(result_helper.Execute(error));
+ RAISE_STATUS(error, result_helper.Execute());
auto it = result_helper.begin();
if (it == result_helper.end()) {
SetError(error, "[libpq] PostgreSQL returned no rows for 'SELECT
CURRENT_SCHEMA'");
@@ -889,7 +889,8 @@ AdbcStatusCode PostgresConnectionGetStatisticsImpl(PGconn*
conn, const char* db_
{
PqResultHelper result_helper{conn, query};
- RAISE_ADBC(result_helper.Execute(error, {db_schema, table_name ?
table_name : "%"}));
+ RAISE_STATUS(error,
+ result_helper.Execute({db_schema, table_name ? table_name :
"%"}));
for (PqResultRow row : result_helper) {
auto reltuples = row[5].ParseDouble();
@@ -1126,14 +1127,7 @@ AdbcStatusCode PostgresConnection::GetTableSchema(const
char* catalog,
PqResultHelper result_helper = PqResultHelper{conn_,
std::string(query.c_str())};
- auto result = result_helper.Execute(error, params);
- if (result != ADBC_STATUS_OK) {
- auto error_code = std::string(error->sqlstate, 5);
- if ((error_code == "42P01") || (error_code == "42602")) {
- return ADBC_STATUS_NOT_FOUND;
- }
- return result;
- }
+ RAISE_STATUS(error, result_helper.Execute(params));
auto uschema = nanoarrow::UniqueSchema();
ArrowSchemaInit(uschema.get());
@@ -1257,7 +1251,7 @@ AdbcStatusCode PostgresConnection::SetOption(const char*
key, const char* value,
} else if (std::strcmp(key, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA) == 0) {
// PostgreSQL doesn't accept a parameter here
PqResultHelper result_helper{conn_, std::string("SET search_path TO ") +
value};
- RAISE_ADBC(result_helper.Execute(error));
+ RAISE_STATUS(error, result_helper.Execute());
return ADBC_STATUS_OK;
}
SetError(error, "%s%s", "[libpq] Unknown option ", key);
diff --git a/c/driver/postgresql/error.cc b/c/driver/postgresql/error.cc
index 276aadc1c..173868baf 100644
--- a/c/driver/postgresql/error.cc
+++ b/c/driver/postgresql/error.cc
@@ -17,8 +17,8 @@
#include "error.h"
-#include <postgres_ext.h>
#include <stdarg.h>
+#include <stdio.h>
#include <cstring>
#include <string>
#include <vector>
@@ -29,72 +29,27 @@
namespace adbcpq {
-namespace {
-struct DetailField {
- int code;
- std::string key;
-};
-
-static const std::vector<DetailField> kDetailFields = {
- {PG_DIAG_COLUMN_NAME, "PG_DIAG_COLUMN_NAME"},
- {PG_DIAG_CONTEXT, "PG_DIAG_CONTEXT"},
- {PG_DIAG_CONSTRAINT_NAME, "PG_DIAG_CONSTRAINT_NAME"},
- {PG_DIAG_DATATYPE_NAME, "PG_DIAG_DATATYPE_NAME"},
- {PG_DIAG_INTERNAL_POSITION, "PG_DIAG_INTERNAL_POSITION"},
- {PG_DIAG_INTERNAL_QUERY, "PG_DIAG_INTERNAL_QUERY"},
- {PG_DIAG_MESSAGE_PRIMARY, "PG_DIAG_MESSAGE_PRIMARY"},
- {PG_DIAG_MESSAGE_DETAIL, "PG_DIAG_MESSAGE_DETAIL"},
- {PG_DIAG_MESSAGE_HINT, "PG_DIAG_MESSAGE_HINT"},
- {PG_DIAG_SEVERITY_NONLOCALIZED, "PG_DIAG_SEVERITY_NONLOCALIZED"},
- {PG_DIAG_SQLSTATE, "PG_DIAG_SQLSTATE"},
- {PG_DIAG_STATEMENT_POSITION, "PG_DIAG_STATEMENT_POSITION"},
- {PG_DIAG_SCHEMA_NAME, "PG_DIAG_SCHEMA_NAME"},
- {PG_DIAG_TABLE_NAME, "PG_DIAG_TABLE_NAME"},
-};
-} // namespace
-
AdbcStatusCode SetError(struct AdbcError* error, PGresult* result, const char*
format,
...) {
+ if (error && error->release) {
+ // TODO: combine the errors if possible
+ error->release(error);
+ }
+
va_list args;
va_start(args, format);
- SetErrorVariadic(error, format, args);
+ std::string message;
+ message.resize(1024);
+ int chars_needed = vsnprintf(message.data(), message.size(), format, args);
va_end(args);
- AdbcStatusCode code = ADBC_STATUS_IO;
-
- const char* sqlstate = PQresultErrorField(result, PG_DIAG_SQLSTATE);
- if (sqlstate) {
- // https://www.postgresql.org/docs/current/errcodes-appendix.html
- // This can be extended in the future
- if (std::strcmp(sqlstate, "57014") == 0) {
- code = ADBC_STATUS_CANCELLED;
- } else if (std::strcmp(sqlstate, "42P01") == 0 ||
- std::strcmp(sqlstate, "42602") == 0) {
- code = ADBC_STATUS_NOT_FOUND;
- } else if (std::strncmp(sqlstate, "42", 0) == 0) {
- // Class 42 — Syntax Error or Access Rule Violation
- code = ADBC_STATUS_INVALID_ARGUMENT;
- }
-
- static_assert(sizeof(error->sqlstate) == 5, "");
- // N.B. strncpy generates warnings when used for this purpose
- int i = 0;
- for (; sqlstate[i] != '\0' && i < 5; i++) {
- error->sqlstate[i] = sqlstate[i];
- }
- for (; i < 5; i++) {
- error->sqlstate[i] = '\0';
- }
+ if (chars_needed > 0) {
+ message.resize(chars_needed);
+ } else {
+ message.resize(0);
}
- for (const auto& field : kDetailFields) {
- const char* value = PQresultErrorField(result, field.code);
- if (value) {
- AppendErrorDetail(error, field.key.c_str(), reinterpret_cast<const
uint8_t*>(value),
- std::strlen(value));
- }
- }
- return code;
+ return MakeStatus(result, "{}", message).ToAdbc(error);
}
} // namespace adbcpq
diff --git a/c/driver/postgresql/error.h b/c/driver/postgresql/error.h
index 825de97b9..f24d41754 100644
--- a/c/driver/postgresql/error.h
+++ b/c/driver/postgresql/error.h
@@ -19,11 +19,42 @@
#pragma once
+#include <string>
+#include <vector>
+
#include <arrow-adbc/adbc.h>
#include <libpq-fe.h>
+#include <fmt/core.h>
+
+#include "driver/framework/status.h"
+
+using adbc::driver::Status;
+
namespace adbcpq {
+struct DetailField {
+ int code;
+ std::string key;
+};
+
+static const std::vector<DetailField> kDetailFields = {
+ {PG_DIAG_COLUMN_NAME, "PG_DIAG_COLUMN_NAME"},
+ {PG_DIAG_CONTEXT, "PG_DIAG_CONTEXT"},
+ {PG_DIAG_CONSTRAINT_NAME, "PG_DIAG_CONSTRAINT_NAME"},
+ {PG_DIAG_DATATYPE_NAME, "PG_DIAG_DATATYPE_NAME"},
+ {PG_DIAG_INTERNAL_POSITION, "PG_DIAG_INTERNAL_POSITION"},
+ {PG_DIAG_INTERNAL_QUERY, "PG_DIAG_INTERNAL_QUERY"},
+ {PG_DIAG_MESSAGE_PRIMARY, "PG_DIAG_MESSAGE_PRIMARY"},
+ {PG_DIAG_MESSAGE_DETAIL, "PG_DIAG_MESSAGE_DETAIL"},
+ {PG_DIAG_MESSAGE_HINT, "PG_DIAG_MESSAGE_HINT"},
+ {PG_DIAG_SEVERITY_NONLOCALIZED, "PG_DIAG_SEVERITY_NONLOCALIZED"},
+ {PG_DIAG_SQLSTATE, "PG_DIAG_SQLSTATE"},
+ {PG_DIAG_STATEMENT_POSITION, "PG_DIAG_STATEMENT_POSITION"},
+ {PG_DIAG_SCHEMA_NAME, "PG_DIAG_SCHEMA_NAME"},
+ {PG_DIAG_TABLE_NAME, "PG_DIAG_TABLE_NAME"},
+};
+
// The printf checking attribute doesn't work properly on gcc 4.8
// and results in spurious compiler warnings
#if defined(__clang__) || (defined(__GNUC__) && __GNUC__ >= 5)
@@ -33,10 +64,50 @@ namespace adbcpq {
#endif
/// \brief Set an error based on a PGresult, inferring the proper ADBC status
-/// code from the PGresult.
+/// code from the PGresult. Deprecated and is currently a thin wrapper around
+/// MakeStatus() below.
AdbcStatusCode SetError(struct AdbcError* error, PGresult* result, const char*
format,
...) ADBC_CHECK_PRINTF_ATTRIBUTE(3, 4);
#undef ADBC_CHECK_PRINTF_ATTRIBUTE
+template <typename... Args>
+Status MakeStatus(PGresult* result, const char* format_string, Args&&... args)
{
+ auto message = ::fmt::vformat(format_string,
::fmt::make_format_args(args...));
+
+ AdbcStatusCode code = ADBC_STATUS_IO;
+ char sqlstate_out[5];
+ std::memset(sqlstate_out, 0, sizeof(sqlstate_out));
+
+ if (result == nullptr) {
+ return Status(code, message);
+ }
+
+ const char* sqlstate = PQresultErrorField(result, PG_DIAG_SQLSTATE);
+ if (sqlstate) {
+ // https://www.postgresql.org/docs/current/errcodes-appendix.html
+ // This can be extended in the future
+ if (std::strcmp(sqlstate, "57014") == 0) {
+ code = ADBC_STATUS_CANCELLED;
+ } else if (std::strcmp(sqlstate, "42P01") == 0 ||
+ std::strcmp(sqlstate, "42602") == 0) {
+ code = ADBC_STATUS_NOT_FOUND;
+ } else if (std::strncmp(sqlstate, "42", 0) == 0) {
+ // Class 42 — Syntax Error or Access Rule Violation
+ code = ADBC_STATUS_INVALID_ARGUMENT;
+ }
+ }
+
+ Status status(code, message);
+ status.SetSqlState(sqlstate);
+ for (const auto& field : kDetailFields) {
+ const char* value = PQresultErrorField(result, field.code);
+ if (value) {
+ status.AddDetail(field.key, value);
+ }
+ }
+
+ return status;
+}
+
} // namespace adbcpq
diff --git a/c/driver/postgresql/postgresql.cc
b/c/driver/postgresql/postgresql.cc
index 033c44648..e43db9887 100644
--- a/c/driver/postgresql/postgresql.cc
+++ b/c/driver/postgresql/postgresql.cc
@@ -25,8 +25,10 @@
#include "connection.h"
#include "database.h"
#include "driver/common/utils.h"
+#include "driver/framework/status.h"
#include "statement.h"
+using adbc::driver::Status;
using adbcpq::PostgresConnection;
using adbcpq::PostgresDatabase;
using adbcpq::PostgresStatement;
@@ -56,14 +58,36 @@ const struct AdbcError* PostgresErrorFromArrayStream(struct
ArrowArrayStream* st
// Currently only valid for TupleReader
return adbcpq::TupleReader::ErrorFromArrayStream(stream, status);
}
+
+int PostgresErrorGetDetailCount(const struct AdbcError* error) {
+ if (IsCommonError(error)) {
+ return CommonErrorGetDetailCount(error);
+ }
+
+ if (error->vendor_code != ADBC_ERROR_VENDOR_CODE_PRIVATE_DATA) {
+ return 0;
+ }
+
+ auto error_obj = reinterpret_cast<Status*>(error->private_data);
+ return error_obj->CDetailCount();
+}
+
+struct AdbcErrorDetail PostgresErrorGetDetail(const struct AdbcError* error,
int index) {
+ if (IsCommonError(error)) {
+ return CommonErrorGetDetail(error, index);
+ }
+
+ auto error_obj = reinterpret_cast<Status*>(error->private_data);
+ return error_obj->CDetail(index);
+}
} // namespace
int AdbcErrorGetDetailCount(const struct AdbcError* error) {
- return CommonErrorGetDetailCount(error);
+ return PostgresErrorGetDetailCount(error);
}
struct AdbcErrorDetail AdbcErrorGetDetail(const struct AdbcError* error, int
index) {
- return CommonErrorGetDetail(error, index);
+ return PostgresErrorGetDetail(error, index);
}
const struct AdbcError* AdbcErrorFromArrayStream(struct ArrowArrayStream*
stream,
@@ -860,8 +884,8 @@ AdbcStatusCode PostgresqlDriverInit(int version, void*
raw_driver,
if (version >= ADBC_VERSION_1_1_0) {
std::memset(driver, 0, ADBC_DRIVER_1_1_0_SIZE);
- driver->ErrorGetDetailCount = CommonErrorGetDetailCount;
- driver->ErrorGetDetail = CommonErrorGetDetail;
+ driver->ErrorGetDetailCount = PostgresErrorGetDetailCount;
+ driver->ErrorGetDetail = PostgresErrorGetDetail;
driver->ErrorFromArrayStream = PostgresErrorFromArrayStream;
driver->DatabaseGetOption = PostgresDatabaseGetOption;
diff --git a/c/driver/postgresql/result_helper.cc
b/c/driver/postgresql/result_helper.cc
index 157b100b7..aa8a8d9dd 100644
--- a/c/driver/postgresql/result_helper.cc
+++ b/c/driver/postgresql/result_helper.cc
@@ -20,56 +20,51 @@
#include <charconv>
#include <memory>
-#include "driver/common/utils.h"
+#define ADBC_FRAMEWORK_USE_FMT
+#include "driver/framework/status.h"
#include "error.h"
namespace adbcpq {
PqResultHelper::~PqResultHelper() { ClearResult(); }
-AdbcStatusCode PqResultHelper::PrepareInternal(int n_params, const Oid*
param_oids,
- struct AdbcError* error) {
+Status PqResultHelper::PrepareInternal(int n_params, const Oid* param_oids) {
// TODO: make stmtName a unique identifier?
PGresult* result =
PQprepare(conn_, /*stmtName=*/"", query_.c_str(), n_params, param_oids);
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
- AdbcStatusCode code =
- SetError(error, result, "[libpq] Failed to prepare query: %s\nQuery
was:%s",
- PQerrorMessage(conn_), query_.c_str());
+ auto status = MakeStatus(result, "Failed to prepare query: {}\nQuery
was:{}",
+ PQerrorMessage(conn_), query_.c_str());
PQclear(result);
- return code;
+ return status;
}
PQclear(result);
- return ADBC_STATUS_OK;
+ return Status::Ok();
}
-AdbcStatusCode PqResultHelper::Prepare(struct AdbcError* error) {
- return PrepareInternal(0, nullptr, error);
-}
+Status PqResultHelper::Prepare() { return PrepareInternal(0, nullptr); }
-AdbcStatusCode PqResultHelper::Prepare(const std::vector<Oid>& param_oids,
- struct AdbcError* error) {
- return PrepareInternal(param_oids.size(), param_oids.data(), error);
+Status PqResultHelper::Prepare(const std::vector<Oid>& param_oids) {
+ return PrepareInternal(param_oids.size(), param_oids.data());
}
-AdbcStatusCode PqResultHelper::DescribePrepared(struct AdbcError* error) {
+Status PqResultHelper::DescribePrepared() {
ClearResult();
result_ = PQdescribePrepared(conn_, /*stmtName=*/"");
if (PQresultStatus(result_) != PGRES_COMMAND_OK) {
- AdbcStatusCode code = SetError(
- error, result_, "[libpq] Failed to describe prepared statement:
%s\nQuery was:%s",
+ Status status = MakeStatus(
+ result_, "[libpq] Failed to describe prepared statement: {}\nQuery
was:{}",
PQerrorMessage(conn_), query_.c_str());
ClearResult();
- return code;
+ return status;
}
- return ADBC_STATUS_OK;
+ return Status::Ok();
}
-AdbcStatusCode PqResultHelper::Execute(struct AdbcError* error,
- const std::vector<std::string>& params,
- PostgresType* param_types) {
+Status PqResultHelper::Execute(const std::vector<std::string>& params,
+ PostgresType* param_types) {
if (params.size() == 0 && param_types == nullptr && output_format_ ==
Format::kText) {
ClearResult();
result_ = PQexec(conn_, query_.c_str());
@@ -102,16 +97,14 @@ AdbcStatusCode PqResultHelper::Execute(struct AdbcError*
error,
ExecStatusType status = PQresultStatus(result_);
if (status != PGRES_TUPLES_OK && status != PGRES_COMMAND_OK) {
- AdbcStatusCode status =
- SetError(error, result_, "[libpq] Failed to execute query '%s': %s",
- query_.c_str(), PQerrorMessage(conn_));
- return status;
+ return MakeStatus(result_, "[libpq] Failed to execute query '{}': {}",
query_.c_str(),
+ PQerrorMessage(conn_));
}
- return ADBC_STATUS_OK;
+ return Status::Ok();
}
-AdbcStatusCode PqResultHelper::ExecuteCopy(struct AdbcError* error) {
+Status PqResultHelper::ExecuteCopy() {
// Remove trailing semicolon(s) from the query before feeding it into COPY
while (!query_.empty() && query_.back() == ';') {
query_.pop_back();
@@ -125,20 +118,19 @@ AdbcStatusCode PqResultHelper::ExecuteCopy(struct
AdbcError* error) {
static_cast<int>(Format::kBinary));
if (PQresultStatus(result_) != PGRES_COPY_OUT) {
- AdbcStatusCode code = SetError(
- error, result_,
- "[libpq] Failed to execute query: could not begin COPY: %s\nQuery was:
%s",
+ Status status = MakeStatus(
+ result_,
+ "[libpq] Failed to execute query: could not begin COPY: {}\nQuery was:
{}",
PQerrorMessage(conn_), copy_query.c_str());
ClearResult();
- return code;
+ return status;
}
- return ADBC_STATUS_OK;
+ return Status::Ok();
}
-AdbcStatusCode PqResultHelper::ResolveParamTypes(PostgresTypeResolver&
type_resolver,
- PostgresType* param_types,
- struct AdbcError* error) {
+Status PqResultHelper::ResolveParamTypes(PostgresTypeResolver& type_resolver,
+ PostgresType* param_types) {
struct ArrowError na_error;
ArrowErrorInit(&na_error);
@@ -149,22 +141,22 @@ AdbcStatusCode
PqResultHelper::ResolveParamTypes(PostgresTypeResolver& type_reso
const Oid pg_oid = PQparamtype(result_, i);
PostgresType pg_type;
if (type_resolver.Find(pg_oid, &pg_type, &na_error) != NANOARROW_OK) {
- SetError(error, "%s%d%s%s%s%d", "[libpq] Parameter #", i + 1, " (\"",
- PQfname(result_, i), "\") has unknown type code ", pg_oid);
+ Status status = Status::NotImplemented("[libpq] Parameter #", i + 1, "
(\"",
+ PQfname(result_, i),
+ "\") has unknown type code ",
pg_oid);
ClearResult();
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return status;
}
root_type.AppendChild(PQfname(result_, i), pg_type);
}
*param_types = root_type;
- return ADBC_STATUS_OK;
+ return Status::Ok();
}
-AdbcStatusCode PqResultHelper::ResolveOutputTypes(PostgresTypeResolver&
type_resolver,
- PostgresType* result_types,
- struct AdbcError* error) {
+Status PqResultHelper::ResolveOutputTypes(PostgresTypeResolver& type_resolver,
+ PostgresType* result_types) {
struct ArrowError na_error;
ArrowErrorInit(&na_error);
@@ -175,17 +167,18 @@ AdbcStatusCode
PqResultHelper::ResolveOutputTypes(PostgresTypeResolver& type_res
const Oid pg_oid = PQftype(result_, i);
PostgresType pg_type;
if (type_resolver.Find(pg_oid, &pg_type, &na_error) != NANOARROW_OK) {
- SetError(error, "%s%d%s%s%s%d", "[libpq] Column #", i + 1, " (\"",
- PQfname(result_, i), "\") has unknown type code ", pg_oid);
+ Status status =
+ Status::NotImplemented("[libpq] Column #", i + 1, " (\"",
PQfname(result_, i),
+ "\") has unknown type code ", pg_oid);
ClearResult();
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ return status;
}
root_type.AppendChild(PQfname(result_, i), pg_type);
}
*result_types = root_type;
- return ADBC_STATUS_OK;
+ return Status::Ok();
}
PGresult* PqResultHelper::ReleaseResult() {
diff --git a/c/driver/postgresql/result_helper.h
b/c/driver/postgresql/result_helper.h
index 18de7958b..0880cbc85 100644
--- a/c/driver/postgresql/result_helper.h
+++ b/c/driver/postgresql/result_helper.h
@@ -28,6 +28,9 @@
#include <libpq-fe.h>
#include "copy/reader.h"
+#include "driver/framework/status.h"
+
+using adbc::driver::Status;
namespace adbcpq {
@@ -95,17 +98,16 @@ class PqResultHelper {
void set_param_format(Format format) { param_format_ = format; }
void set_output_format(Format format) { output_format_ = format; }
- AdbcStatusCode Prepare(struct AdbcError* error);
- AdbcStatusCode Prepare(const std::vector<Oid>& param_oids, struct AdbcError*
error);
- AdbcStatusCode DescribePrepared(struct AdbcError* error);
- AdbcStatusCode Execute(struct AdbcError* error,
- const std::vector<std::string>& params = {},
- PostgresType* param_types = nullptr);
- AdbcStatusCode ExecuteCopy(struct AdbcError* error);
- AdbcStatusCode ResolveParamTypes(PostgresTypeResolver& type_resolver,
- PostgresType* param_types, struct
AdbcError* error);
- AdbcStatusCode ResolveOutputTypes(PostgresTypeResolver& type_resolver,
- PostgresType* result_types, struct
AdbcError* error);
+ Status Prepare();
+ Status Prepare(const std::vector<Oid>& param_oids);
+ Status DescribePrepared();
+ Status Execute(const std::vector<std::string>& params = {},
+ PostgresType* param_types = nullptr);
+ Status ExecuteCopy();
+ Status ResolveParamTypes(PostgresTypeResolver& type_resolver,
+ PostgresType* param_types);
+ Status ResolveOutputTypes(PostgresTypeResolver& type_resolver,
+ PostgresType* result_types);
bool HasResult() { return result_ != nullptr; }
@@ -170,8 +172,7 @@ class PqResultHelper {
Format param_format_ = Format::kText;
Format output_format_ = Format::kText;
- AdbcStatusCode PrepareInternal(int n_params, const Oid* param_oids,
- struct AdbcError* error);
+ Status PrepareInternal(int n_params, const Oid* param_oids);
};
} // namespace adbcpq
diff --git a/c/driver/postgresql/result_reader.cc
b/c/driver/postgresql/result_reader.cc
index c29770309..7da936494 100644
--- a/c/driver/postgresql/result_reader.cc
+++ b/c/driver/postgresql/result_reader.cc
@@ -21,9 +21,7 @@
#include <utility>
#include "copy/reader.h"
-#include "driver/common/utils.h"
-
-#include "error.h"
+#include "driver/framework/status.h"
namespace adbcpq {
@@ -31,8 +29,9 @@ int PqResultArrayReader::GetSchema(struct ArrowSchema* out) {
ResetErrors();
if (schema_->release == nullptr) {
- AdbcStatusCode status = Initialize(nullptr, &error_);
- if (status != ADBC_STATUS_OK) {
+ Status status = Initialize(nullptr);
+ if (!status.ok()) {
+ status.ToAdbc(&error_);
return EINVAL;
}
}
@@ -43,10 +42,11 @@ int PqResultArrayReader::GetSchema(struct ArrowSchema* out)
{
int PqResultArrayReader::GetNext(struct ArrowArray* out) {
ResetErrors();
- AdbcStatusCode status;
+ Status status;
if (schema_->release == nullptr) {
- AdbcStatusCode status = Initialize(nullptr, &error_);
- if (status != ADBC_STATUS_OK) {
+ status = Initialize(nullptr);
+ if (!status.ok()) {
+ status.ToAdbc(&error_);
return EINVAL;
}
}
@@ -63,8 +63,9 @@ int PqResultArrayReader::GetNext(struct ArrowArray* out) {
}
// Keep binding and executing until we have a result to return
- status = BindNextAndExecute(nullptr, &error_);
- if (status != ADBC_STATUS_OK) {
+ status = BindNextAndExecute(nullptr);
+ if (!status.ok()) {
+ status.ToAdbc(&error_);
return EIO;
}
@@ -133,25 +134,33 @@ const char* PqResultArrayReader::GetLastError() {
}
}
-AdbcStatusCode PqResultArrayReader::Initialize(int64_t* rows_affected,
- struct AdbcError* error) {
+Status PqResultArrayReader::Initialize(int64_t* rows_affected) {
helper_.set_output_format(PqResultHelper::Format::kBinary);
helper_.set_param_format(PqResultHelper::Format::kBinary);
// If we have to do binding, set up the bind stream an execute until
// there is a result with more than zero rows to populate.
+ AdbcStatusCode status_code;
if (bind_stream_) {
- RAISE_ADBC(bind_stream_->Begin([] { return ADBC_STATUS_OK; }, error));
- RAISE_ADBC(bind_stream_->SetParamTypes(conn_, *type_resolver_,
autocommit_, error));
- RAISE_ADBC(helper_.Prepare(bind_stream_->param_types, error));
+ status_code = bind_stream_->Begin([] { return ADBC_STATUS_OK; }, &error_);
+ if (status_code != ADBC_STATUS_OK) {
+ return Status::FromAdbc(status_code, error_);
+ }
+ status_code =
+ bind_stream_->SetParamTypes(conn_, *type_resolver_, autocommit_,
&error_);
+ if (status_code != ADBC_STATUS_OK) {
+ return Status::FromAdbc(status_code, error_);
+ }
- RAISE_ADBC(BindNextAndExecute(nullptr, error));
+ UNWRAP_STATUS(helper_.Prepare(bind_stream_->param_types));
+
+ UNWRAP_STATUS(BindNextAndExecute(nullptr));
// If there were no arrays in the bind stream, we still need a result
// to populate the schema. If there were any arrays in the bind stream,
// the last one will still be in helper_ even if it had zero rows.
if (!helper_.HasResult()) {
- RAISE_ADBC(helper_.DescribePrepared(error));
+ UNWRAP_STATUS(helper_.DescribePrepared());
}
// We can't provide affected row counts if there is a bind stream and
@@ -161,7 +170,7 @@ AdbcStatusCode PqResultArrayReader::Initialize(int64_t*
rows_affected,
*rows_affected = -1;
}
} else {
- RAISE_ADBC(helper_.Execute(error));
+ UNWRAP_STATUS(helper_.Execute());
if (rows_affected != nullptr) {
*rows_affected = helper_.AffectedRows();
}
@@ -169,90 +178,104 @@ AdbcStatusCode PqResultArrayReader::Initialize(int64_t*
rows_affected,
// Build the schema for which we are about to build results
ArrowSchemaInit(schema_.get());
- CHECK_NA_DETAIL(INTERNAL, ArrowSchemaSetTypeStruct(schema_.get(),
helper_.NumColumns()),
- &na_error_, error);
+ UNWRAP_NANOARROW(na_error_, Internal,
+ ArrowSchemaSetTypeStruct(schema_.get(),
helper_.NumColumns()));
for (int i = 0; i < helper_.NumColumns(); i++) {
PostgresType child_type;
- CHECK_NA_DETAIL(INTERNAL,
- type_resolver_->Find(helper_.FieldType(i), &child_type,
&na_error_),
- &na_error_, error);
+ UNWRAP_NANOARROW(na_error_, Internal,
+ type_resolver_->Find(helper_.FieldType(i), &child_type,
&na_error_));
- CHECK_NA(INTERNAL, child_type.SetSchema(schema_->children[i]), error);
- CHECK_NA(INTERNAL, ArrowSchemaSetName(schema_->children[i],
helper_.FieldName(i)),
- error);
+ UNWRAP_ERRNO(Internal, child_type.SetSchema(schema_->children[i]));
+ UNWRAP_ERRNO(Internal,
+ ArrowSchemaSetName(schema_->children[i],
helper_.FieldName(i)));
std::unique_ptr<PostgresCopyFieldReader> child_reader;
- CHECK_NA_DETAIL(
- INTERNAL,
- MakeCopyFieldReader(child_type, schema_->children[i], &child_reader,
&na_error_),
- &na_error_, error);
+ UNWRAP_NANOARROW(
+ na_error_, Internal,
+ MakeCopyFieldReader(child_type, schema_->children[i], &child_reader,
&na_error_));
child_reader->Init(child_type);
- CHECK_NA_DETAIL(INTERNAL, child_reader->InitSchema(schema_->children[i]),
&na_error_,
- error);
+ UNWRAP_NANOARROW(na_error_, Internal,
child_reader->InitSchema(schema_->children[i]));
field_readers_.push_back(std::move(child_reader));
}
- return ADBC_STATUS_OK;
+ return Status::Ok();
}
-AdbcStatusCode PqResultArrayReader::ToArrayStream(int64_t* affected_rows,
- struct ArrowArrayStream* out,
- struct AdbcError* error) {
+Status PqResultArrayReader::ToArrayStream(int64_t* affected_rows,
+ struct ArrowArrayStream* out) {
if (out == nullptr) {
// If there is no output requested, we still need to execute and
// set affected_rows if needed. We don't need an output schema or to set
up a copy
// reader, so we can skip those steps by going straight to Execute(). This
also
// enables us to support queries with multiple statements because we can
call PQexec()
// instead of PQexecParams().
- RAISE_ADBC(ExecuteAll(affected_rows, error));
- return ADBC_STATUS_OK;
+ UNWRAP_STATUS(ExecuteAll(affected_rows));
+ return Status::Ok();
}
// Otherwise, execute until we have a result to return. We need this to
provide row
// counts for DELETE and CREATE TABLE queries as well as to provide more
informative
// errors until this reader class is wired up to provide extended AdbcError
information.
- RAISE_ADBC(Initialize(affected_rows, error));
+ UNWRAP_STATUS(Initialize(affected_rows));
nanoarrow::ArrayStreamFactory<PqResultArrayReader>::InitArrayStream(
new PqResultArrayReader(this), out);
- return ADBC_STATUS_OK;
+ return Status::Ok();
}
-AdbcStatusCode PqResultArrayReader::BindNextAndExecute(int64_t* affected_rows,
- AdbcError* error) {
+Status PqResultArrayReader::BindNextAndExecute(int64_t* affected_rows) {
// Keep pulling from the bind stream and executing as long as
// we receive results with zero rows.
+ AdbcStatusCode status_code;
do {
- RAISE_ADBC(bind_stream_->EnsureNextRow(error));
+ status_code = bind_stream_->EnsureNextRow(&error_);
+ if (status_code != ADBC_STATUS_OK) {
+ return Status::FromAdbc(status_code, error_);
+ }
+
if (!bind_stream_->current->release) {
- RAISE_ADBC(bind_stream_->Cleanup(conn_, error));
+ status_code = bind_stream_->Cleanup(conn_, &error_);
+ if (status_code != ADBC_STATUS_OK) {
+ return Status::FromAdbc(status_code, error_);
+ }
bind_stream_.reset();
- return ADBC_STATUS_OK;
+ return Status::Ok();
}
PGresult* result;
- RAISE_ADBC(bind_stream_->BindAndExecuteCurrentRow(
- conn_, &result, /*result_format*/ kPgBinaryFormat, error));
+ status_code = bind_stream_->BindAndExecuteCurrentRow(
+ conn_, &result, /*result_format*/ kPgBinaryFormat, &error_);
+ if (status_code != ADBC_STATUS_OK) {
+ return Status::FromAdbc(status_code, error_);
+ }
helper_.SetResult(result);
if (affected_rows) {
(*affected_rows) += helper_.AffectedRows();
}
} while (helper_.NumRows() == 0);
- return ADBC_STATUS_OK;
+ return Status::Ok();
}
-AdbcStatusCode PqResultArrayReader::ExecuteAll(int64_t* affected_rows,
AdbcError* error) {
+Status PqResultArrayReader::ExecuteAll(int64_t* affected_rows) {
// For the case where we don't need a result, we either need to exhaust the
bind
// stream (if there is one) or execute the query without binding.
if (bind_stream_) {
- RAISE_ADBC(bind_stream_->Begin([] { return ADBC_STATUS_OK; }, error));
- RAISE_ADBC(bind_stream_->SetParamTypes(conn_, *type_resolver_,
autocommit_, error));
- RAISE_ADBC(helper_.Prepare(bind_stream_->param_types, error));
+ AdbcStatusCode status_code =
+ bind_stream_->Begin([] { return ADBC_STATUS_OK; }, &error_);
+ if (status_code != ADBC_STATUS_OK) {
+ return Status::FromAdbc(status_code, error_);
+ }
+ status_code =
+ bind_stream_->SetParamTypes(conn_, *type_resolver_, autocommit_,
&error_);
+ if (status_code != ADBC_STATUS_OK) {
+ return Status::FromAdbc(status_code, error_);
+ }
+ UNWRAP_STATUS(helper_.Prepare(bind_stream_->param_types));
// Reset affected rows to zero before binding and executing any
if (affected_rows) {
@@ -260,17 +283,17 @@ AdbcStatusCode PqResultArrayReader::ExecuteAll(int64_t*
affected_rows, AdbcError
}
do {
- RAISE_ADBC(BindNextAndExecute(affected_rows, error));
+ UNWRAP_STATUS(BindNextAndExecute(affected_rows));
} while (bind_stream_);
} else {
- RAISE_ADBC(helper_.Execute(error));
+ UNWRAP_STATUS(helper_.Execute());
if (affected_rows != nullptr) {
*affected_rows = helper_.AffectedRows();
}
}
- return ADBC_STATUS_OK;
+ return Status::Ok();
}
} // namespace adbcpq
diff --git a/c/driver/postgresql/result_reader.h
b/c/driver/postgresql/result_reader.h
index cbbff6a58..5c36dccb2 100644
--- a/c/driver/postgresql/result_reader.h
+++ b/c/driver/postgresql/result_reader.h
@@ -62,10 +62,9 @@ class PqResultArrayReader {
int GetNext(struct ArrowArray* out);
const char* GetLastError();
- AdbcStatusCode ToArrayStream(int64_t* affected_rows, struct
ArrowArrayStream* out,
- struct AdbcError* error);
+ Status ToArrayStream(int64_t* affected_rows, struct ArrowArrayStream* out);
- AdbcStatusCode Initialize(int64_t* affected_rows, struct AdbcError* error);
+ Status Initialize(int64_t* affected_rows);
private:
PGconn* conn_;
@@ -89,8 +88,8 @@ class PqResultArrayReader {
error_ = ADBC_ERROR_INIT;
}
- AdbcStatusCode BindNextAndExecute(int64_t* affected_rows, AdbcError* error);
- AdbcStatusCode ExecuteAll(int64_t* affected_rows, AdbcError* error);
+ Status BindNextAndExecute(int64_t* affected_rows);
+ Status ExecuteAll(int64_t* affected_rows);
void ResetErrors() {
ArrowErrorInit(&na_error_);
diff --git a/c/driver/postgresql/statement.cc b/c/driver/postgresql/statement.cc
index c5252b7db..fe30f708b 100644
--- a/c/driver/postgresql/statement.cc
+++ b/c/driver/postgresql/statement.cc
@@ -459,7 +459,7 @@ AdbcStatusCode PostgresStatement::ExecuteBind(struct
ArrowArrayStream* stream,
PqResultArrayReader reader(connection_->conn(), type_resolver_, query_);
reader.SetAutocommit(connection_->autocommit());
reader.SetBind(&bind_);
- RAISE_ADBC(reader.ToArrayStream(rows_affected, stream, error));
+ RAISE_STATUS(error, reader.ToArrayStream(rows_affected, stream));
return ADBC_STATUS_OK;
}
@@ -487,25 +487,25 @@ AdbcStatusCode PostgresStatement::ExecuteQuery(struct
ArrowArrayStream* stream,
// execute using the PqResultArrayReader.
if (!stream || !use_copy_) {
PqResultArrayReader reader(connection_->conn(), type_resolver_, query_);
- RAISE_ADBC(reader.ToArrayStream(rows_affected, stream, error));
+ RAISE_STATUS(error, reader.ToArrayStream(rows_affected, stream));
return ADBC_STATUS_OK;
}
PqResultHelper helper(connection_->conn(), query_);
- RAISE_ADBC(helper.Prepare(error));
- RAISE_ADBC(helper.DescribePrepared(error));
+ RAISE_STATUS(error, helper.Prepare());
+ RAISE_STATUS(error, helper.DescribePrepared());
// Initialize the copy reader and infer the output schema (i.e., error for
// unsupported types before issuing the COPY query). This could be lazier
// (i.e., executed on the first call to GetSchema() or GetNext()).
PostgresType root_type;
- RAISE_ADBC(helper.ResolveOutputTypes(*type_resolver_, &root_type, error));
+ RAISE_STATUS(error, helper.ResolveOutputTypes(*type_resolver_, &root_type));
// If there will be no columns in the result, we can also avoid COPY
if (root_type.n_children() == 0) {
// Could/should move the helper into the reader instead of repreparing
PqResultArrayReader reader(connection_->conn(), type_resolver_, query_);
- RAISE_ADBC(reader.ToArrayStream(rows_affected, stream, error));
+ RAISE_STATUS(error, reader.ToArrayStream(rows_affected, stream));
return ADBC_STATUS_OK;
}
@@ -519,7 +519,7 @@ AdbcStatusCode PostgresStatement::ExecuteQuery(struct
ArrowArrayStream* stream,
error);
// Execute the COPY query
- RAISE_ADBC(helper.ExecuteCopy(error));
+ RAISE_STATUS(error, helper.ExecuteCopy());
// We need the PQresult back for the reader
reader_.result_ = helper.ReleaseResult();
@@ -562,15 +562,15 @@ AdbcStatusCode PostgresStatement::ExecuteSchema(struct
ArrowSchema* schema,
param_oids[i] = pg_type.oid();
}
- RAISE_ADBC(helper.Prepare(param_oids, error));
+ RAISE_STATUS(error, helper.Prepare(param_oids));
} else {
- RAISE_ADBC(helper.Prepare(error));
+ RAISE_STATUS(error, helper.Prepare());
}
- RAISE_ADBC(helper.DescribePrepared(error));
+ RAISE_STATUS(error, helper.DescribePrepared());
PostgresType output_type;
- RAISE_ADBC(helper.ResolveOutputTypes(*type_resolver_, &output_type, error));
+ RAISE_STATUS(error, helper.ResolveOutputTypes(*type_resolver_,
&output_type));
nanoarrow::UniqueSchema tmp;
ArrowSchemaInit(tmp.get());
@@ -598,7 +598,7 @@ AdbcStatusCode PostgresStatement::ExecuteIngest(struct
ArrowArrayStream* stream,
std::string current_schema;
{
PqResultHelper result_helper{connection_->conn(), "SELECT CURRENT_SCHEMA"};
- RAISE_ADBC(result_helper.Execute(error));
+ RAISE_STATUS(error, result_helper.Execute());
auto it = result_helper.begin();
if (it == result_helper.end()) {
SetError(error, "[libpq] PostgreSQL returned no rows for 'SELECT
CURRENT_SCHEMA'");