This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 70a20030f refactor(c/driver/postgresql): Use Status for error handling
in BindStream (#2187)
70a20030f is described below
commit 70a20030f4982dfb52c5905182c1eaabced7afc3
Author: Dewey Dunnington <[email protected]>
AuthorDate: Wed Sep 25 11:14:24 2024 -0500
refactor(c/driver/postgresql): Use Status for error handling in BindStream
(#2187)
This PR migrates the `BindStream` to use the `PqResultHelper` and
`Status`!
I believe the CI failure are still glib and Python packaging.
---
c/driver/postgresql/bind_stream.h | 240 +++++++++++++----------------------
c/driver/postgresql/result_helper.h | 2 +
c/driver/postgresql/result_reader.cc | 43 ++-----
c/driver/postgresql/statement.cc | 18 +--
4 files changed, 111 insertions(+), 192 deletions(-)
diff --git a/c/driver/postgresql/bind_stream.h
b/c/driver/postgresql/bind_stream.h
index e5e218ea4..df0b9d2ca 100644
--- a/c/driver/postgresql/bind_stream.h
+++ b/c/driver/postgresql/bind_stream.h
@@ -27,10 +27,10 @@
#include <arrow-adbc/adbc.h>
#include "copy/writer.h"
-#include "driver/common/utils.h"
#include "error.h"
#include "postgres_type.h"
#include "postgres_util.h"
+#include "result_helper.h"
namespace adbcpq {
@@ -71,40 +71,37 @@ struct BindStream {
}
template <typename Callback>
- AdbcStatusCode Begin(Callback&& callback, struct AdbcError* error) {
- CHECK_NA_DETAIL(INTERNAL,
- ArrowArrayStreamGetSchema(&bind.value, &bind_schema.value,
&na_error),
- &na_error, error);
+ Status Begin(Callback&& callback) {
+ UNWRAP_NANOARROW(
+ na_error, Internal,
+ ArrowArrayStreamGetSchema(&bind.value, &bind_schema.value, &na_error));
struct ArrowSchemaView bind_schema_view;
- CHECK_NA_DETAIL(INTERNAL,
- ArrowSchemaViewInit(&bind_schema_view, &bind_schema.value,
&na_error),
- &na_error, error);
+ UNWRAP_NANOARROW(
+ na_error, Internal,
+ ArrowSchemaViewInit(&bind_schema_view, &bind_schema.value, &na_error));
if (bind_schema_view.type != ArrowType::NANOARROW_TYPE_STRUCT) {
- SetError(error, "%s", "[libpq] Bind parameters must have type STRUCT");
- return ADBC_STATUS_INVALID_STATE;
+ return Status::InvalidState("[libpq] Bind parameters must have type
STRUCT");
}
bind_schema_fields.resize(bind_schema->n_children);
for (size_t i = 0; i < bind_schema_fields.size(); i++) {
- CHECK_NA(INTERNAL,
- ArrowSchemaViewInit(&bind_schema_fields[i],
bind_schema->children[i],
- /*error*/ nullptr),
- error);
+ UNWRAP_ERRNO(Internal,
+ ArrowSchemaViewInit(&bind_schema_fields[i],
bind_schema->children[i],
+ /*error*/ nullptr));
}
- CHECK_NA_DETAIL(
- INTERNAL,
- ArrowArrayViewInitFromSchema(&array_view.value, &bind_schema.value,
&na_error),
- &na_error, error);
+ UNWRAP_NANOARROW(
+ na_error, Internal,
+ ArrowArrayViewInitFromSchema(&array_view.value, &bind_schema.value,
&na_error));
ArrowBufferInit(¶m_buffer.value);
return std::move(callback)();
}
- AdbcStatusCode SetParamTypes(PGconn* pg_conn, const PostgresTypeResolver&
type_resolver,
- const bool autocommit, struct AdbcError* error)
{
+ Status SetParamTypes(PGconn* pg_conn, const PostgresTypeResolver&
type_resolver,
+ const bool autocommit) {
param_types.resize(bind_schema->n_children);
param_values.resize(bind_schema->n_children);
param_lengths.resize(bind_schema->n_children);
@@ -113,126 +110,93 @@ struct BindStream {
for (size_t i = 0; i < bind_field_writers.size(); i++) {
PostgresType type;
- CHECK_NA_DETAIL(INTERNAL,
- PostgresType::FromSchema(type_resolver,
bind_schema->children[i],
- &type, &na_error),
- &na_error, error);
+ UNWRAP_NANOARROW(na_error, Internal,
+ PostgresType::FromSchema(type_resolver,
bind_schema->children[i],
+ &type, &na_error));
// tz-aware timestamps require special handling to set the timezone to
UTC
// prior to sending over the binary protocol; must be reset after execute
if (!has_tz_field && type.type_id() == PostgresTypeId::kTimestamptz) {
- RAISE_ADBC(SetDatabaseTimezoneUTC(pg_conn, autocommit, error));
+ UNWRAP_STATUS(SetDatabaseTimezoneUTC(pg_conn, autocommit));
has_tz_field = true;
}
std::unique_ptr<PostgresCopyFieldWriter> writer;
- CHECK_NA_DETAIL(
- INTERNAL,
+ UNWRAP_NANOARROW(
+ na_error, Internal,
MakeCopyFieldWriter(bind_schema->children[i],
array_view->children[i],
- type_resolver, &writer, &na_error),
- &na_error, error);
+ type_resolver, &writer, &na_error));
param_types[i] = type.oid();
param_formats[i] = kPgBinaryFormat;
bind_field_writers[i] = std::move(writer);
}
- return ADBC_STATUS_OK;
+ return Status::Ok();
}
- AdbcStatusCode SetDatabaseTimezoneUTC(PGconn* pg_conn, const bool autocommit,
- struct AdbcError* error) {
+ Status SetDatabaseTimezoneUTC(PGconn* pg_conn, const bool autocommit) {
if (autocommit) {
- PGresult* begin_result = PQexec(pg_conn, "BEGIN");
- if (PQresultStatus(begin_result) != PGRES_COMMAND_OK) {
- AdbcStatusCode code =
- SetError(error, begin_result,
- "[libpq] Failed to begin transaction for timezone data:
%s",
- PQerrorMessage(pg_conn));
- PQclear(begin_result);
- return code;
- }
- PQclear(begin_result);
+ PqResultHelper helper(pg_conn, "BEGIN");
+ UNWRAP_STATUS(helper.Execute());
}
- PGresult* get_tz_result = PQexec(pg_conn, "SELECT
current_setting('TIMEZONE')");
- if (PQresultStatus(get_tz_result) != PGRES_TUPLES_OK) {
- AdbcStatusCode code =
- SetError(error, get_tz_result, "[libpq] Could not query current
timezone: %s",
- PQerrorMessage(pg_conn));
- PQclear(get_tz_result);
- return code;
+ PqResultHelper get_tz(pg_conn, "SELECT current_setting('TIMEZONE')");
+ UNWRAP_STATUS(get_tz.Execute());
+ for (auto row : get_tz) {
+ tz_setting = row[0].value();
}
- tz_setting = std::string(PQgetvalue(get_tz_result, 0, 0));
- PQclear(get_tz_result);
-
- PGresult* set_utc_result = PQexec(pg_conn, "SET TIME ZONE 'UTC'");
- if (PQresultStatus(set_utc_result) != PGRES_COMMAND_OK) {
- AdbcStatusCode code =
- SetError(error, set_utc_result, "[libpq] Failed to set time zone to
UTC: %s",
- PQerrorMessage(pg_conn));
- PQclear(set_utc_result);
- return code;
- }
- PQclear(set_utc_result);
+ PqResultHelper set_utc(pg_conn, "SET TIME ZONE 'UTC'");
+ UNWRAP_STATUS(set_utc.Execute());
- return ADBC_STATUS_OK;
+ return Status::Ok();
}
- AdbcStatusCode Prepare(PGconn* pg_conn, const std::string& query,
- struct AdbcError* error) {
- PGresult* result = PQprepare(pg_conn, /*stmtName=*/"", query.c_str(),
- /*nParams=*/bind_schema->n_children,
param_types.data());
- if (PQresultStatus(result) != PGRES_COMMAND_OK) {
- AdbcStatusCode code =
- SetError(error, result, "[libpq] Failed to prepare query: %s\nQuery
was:%s",
- PQerrorMessage(pg_conn), query.c_str());
- PQclear(result);
- return code;
- }
- PQclear(result);
- return ADBC_STATUS_OK;
+ Status Prepare(PGconn* pg_conn, const std::string& query) {
+ PqResultHelper helper(pg_conn, query);
+ UNWRAP_STATUS(helper.Prepare(param_types));
+ return Status::Ok();
}
- AdbcStatusCode PullNextArray(AdbcError* error) {
+ Status PullNextArray() {
if (current->release != nullptr) ArrowArrayRelease(¤t.value);
- CHECK_NA_DETAIL(IO, ArrowArrayStreamGetNext(&bind.value, ¤t.value,
&na_error),
- &na_error, error);
+ UNWRAP_NANOARROW(na_error, IO,
+ ArrowArrayStreamGetNext(&bind.value, ¤t.value,
&na_error));
if (current->release != nullptr) {
- CHECK_NA_DETAIL(
- INTERNAL, ArrowArrayViewSetArray(&array_view.value, ¤t.value,
&na_error),
- &na_error, error);
+ UNWRAP_NANOARROW(
+ na_error, Internal,
+ ArrowArrayViewSetArray(&array_view.value, ¤t.value,
&na_error));
}
- return ADBC_STATUS_OK;
+ return Status::Ok();
}
- AdbcStatusCode EnsureNextRow(AdbcError* error) {
+ Status EnsureNextRow() {
if (current->release != nullptr) {
current_row++;
if (current_row < current->length) {
- return ADBC_STATUS_OK;
+ return Status::Ok();
}
}
// Pull until we have an array with at least one row or the stream is
finished
do {
- RAISE_ADBC(PullNextArray(error));
+ UNWRAP_STATUS(PullNextArray());
if (current->release == nullptr) {
current_row = -1;
- return ADBC_STATUS_OK;
+ return Status::Ok();
}
} while (current->length == 0);
current_row = 0;
- return ADBC_STATUS_OK;
+ return Status::Ok();
}
- AdbcStatusCode BindAndExecuteCurrentRow(PGconn* pg_conn, PGresult**
result_out,
- int result_format, AdbcError* error)
{
+ Status BindAndExecuteCurrentRow(PGconn* pg_conn, PGresult** result_out,
+ int result_format) {
param_buffer->size_bytes = 0;
int64_t last_offset = 0;
@@ -240,18 +204,16 @@ struct BindStream {
if (!ArrowArrayViewIsNull(array_view->children[col], current_row)) {
// Note that this Write() call currently writes the (int32_t) byte
size of the
// field in addition to the serialized value.
- CHECK_NA_DETAIL(
- INTERNAL,
- bind_field_writers[col]->Write(¶m_buffer.value, current_row,
&na_error),
- &na_error, error);
+ UNWRAP_NANOARROW(
+ na_error, Internal,
+ bind_field_writers[col]->Write(¶m_buffer.value, current_row,
&na_error));
} else {
- CHECK_NA(INTERNAL, ArrowBufferAppendInt32(¶m_buffer.value, 0),
error);
+ UNWRAP_ERRNO(Internal, ArrowBufferAppendInt32(¶m_buffer.value, 0));
}
int64_t param_length = param_buffer->size_bytes - last_offset -
sizeof(int32_t);
if (param_length > (std::numeric_limits<int>::max)()) {
- SetError(error, "Parameter %" PRId64 " serialized to >2GB of binary",
col);
- return ADBC_STATUS_INTERNAL;
+ return Status::Internal("Paramter ", col, "serialized to >2GB of
binary");
}
param_lengths[col] = static_cast<int>(param_length);
@@ -276,60 +238,45 @@ struct BindStream {
ExecStatusType pg_status = PQresultStatus(result);
if (pg_status != PGRES_COMMAND_OK && pg_status != PGRES_TUPLES_OK) {
- AdbcStatusCode code =
- SetError(error, result, "[libpq] Failed to execute prepared
statement: %s %s",
- PQresStatus(pg_status), PQerrorMessage(pg_conn));
+ Status status =
+ MakeStatus(result, "[libpq] Failed to execute prepared statement: {}
{}",
+ PQresStatus(pg_status), PQerrorMessage(pg_conn));
PQclear(result);
- return code;
+ return status;
}
*result_out = result;
- return ADBC_STATUS_OK;
+ return Status::Ok();
}
- AdbcStatusCode Cleanup(PGconn* pg_conn, AdbcError* error) {
+ Status Cleanup(PGconn* pg_conn) {
if (has_tz_field) {
- std::string reset_query = "SET TIME ZONE '" + tz_setting + "'";
- PGresult* reset_tz_result = PQexec(pg_conn, reset_query.c_str());
- if (PQresultStatus(reset_tz_result) != PGRES_COMMAND_OK) {
- AdbcStatusCode code =
- SetError(error, reset_tz_result, "[libpq] Failed to reset time
zone: %s",
- PQerrorMessage(pg_conn));
- PQclear(reset_tz_result);
- return code;
- }
- PQclear(reset_tz_result);
-
- PGresult* commit_result = PQexec(pg_conn, "COMMIT");
- if (PQresultStatus(commit_result) != PGRES_COMMAND_OK) {
- AdbcStatusCode code =
- SetError(error, commit_result, "[libpq] Failed to commit
transaction: %s",
- PQerrorMessage(pg_conn));
- PQclear(commit_result);
- return code;
- }
- PQclear(commit_result);
+ PqResultHelper reset(pg_conn, "SET TIME ZONE '" + tz_setting + "'");
+ UNWRAP_STATUS(reset.Execute());
+
+ PqResultHelper commit(pg_conn, "COMMIT");
+ UNWRAP_STATUS(reset.Execute());
}
- return ADBC_STATUS_OK;
+ return Status::Ok();
}
- AdbcStatusCode ExecuteCopy(PGconn* pg_conn, const PostgresTypeResolver&
type_resolver,
- int64_t* rows_affected, struct AdbcError* error) {
+ Status ExecuteCopy(PGconn* pg_conn, const PostgresTypeResolver&
type_resolver,
+ int64_t* rows_affected) {
if (rows_affected) *rows_affected = 0;
PostgresCopyStreamWriter writer;
- CHECK_NA(INTERNAL, writer.Init(&bind_schema.value), error);
- CHECK_NA_DETAIL(INTERNAL, writer.InitFieldWriters(type_resolver,
&na_error),
- &na_error, error);
+ UNWRAP_ERRNO(Internal, writer.Init(&bind_schema.value));
+ UNWRAP_NANOARROW(na_error, Internal,
+ writer.InitFieldWriters(type_resolver, &na_error));
- CHECK_NA_DETAIL(INTERNAL, writer.WriteHeader(&na_error), &na_error, error);
+ UNWRAP_NANOARROW(na_error, Internal, writer.WriteHeader(&na_error));
while (true) {
- RAISE_ADBC(PullNextArray(error));
+ UNWRAP_STATUS(PullNextArray());
if (!current->release) break;
- CHECK_NA(INTERNAL, writer.SetArray(¤t.value), error);
+ UNWRAP_ERRNO(Internal, writer.SetArray(¤t.value));
// build writer buffer
int write_result;
@@ -339,42 +286,38 @@ struct BindStream {
// check if not ENODATA at exit
if (write_result != ENODATA) {
- SetError(error, "Error occurred writing COPY data: %s",
PQerrorMessage(pg_conn));
- return ADBC_STATUS_IO;
+ return Status::IO("Error occurred writing COPY data: ",
PQerrorMessage(pg_conn));
}
- RAISE_ADBC(FlushCopyWriterToConn(pg_conn, writer, error));
+ UNWRAP_STATUS(FlushCopyWriterToConn(pg_conn, writer));
if (rows_affected) *rows_affected += current->length;
writer.Rewind();
}
// If there were no arrays in the stream, we haven't flushed yet
- RAISE_ADBC(FlushCopyWriterToConn(pg_conn, writer, error));
+ UNWRAP_STATUS(FlushCopyWriterToConn(pg_conn, writer));
if (PQputCopyEnd(pg_conn, NULL) <= 0) {
- SetError(error, "Error message returned by PQputCopyEnd: %s",
- PQerrorMessage(pg_conn));
- return ADBC_STATUS_IO;
+ return Status::IO("Error message returned by PQputCopyEnd: ",
+ PQerrorMessage(pg_conn));
}
PGresult* result = PQgetResult(pg_conn);
ExecStatusType pg_status = PQresultStatus(result);
if (pg_status != PGRES_COMMAND_OK) {
- AdbcStatusCode code =
- SetError(error, result, "[libpq] Failed to execute COPY statement:
%s %s",
- PQresStatus(pg_status), PQerrorMessage(pg_conn));
+ Status status =
+ MakeStatus(result, "[libpq] Failed to execute COPY statement: {} {}",
+ PQresStatus(pg_status), PQerrorMessage(pg_conn));
PQclear(result);
- return code;
+ return status;
}
PQclear(result);
- return ADBC_STATUS_OK;
+ return Status::Ok();
}
- AdbcStatusCode FlushCopyWriterToConn(PGconn* pg_conn,
- const PostgresCopyStreamWriter& writer,
- struct AdbcError* error) {
+ Status FlushCopyWriterToConn(PGconn* pg_conn, const
PostgresCopyStreamWriter& writer) {
// https://github.com/apache/arrow-adbc/issues/1921: PostgreSQL has a max
// size for a single message that we need to respect (1 GiB - 1). Since
// the buffer can be chunked up as much as we want, go for 16 MiB as our
@@ -388,14 +331,13 @@ struct BindStream {
while (remaining > 0) {
int64_t to_write = std::min<int64_t>(remaining, kMaxCopyBufferSize);
if (PQputCopyData(pg_conn, data, to_write) <= 0) {
- SetError(error, "Error writing tuple field data: %s",
PQerrorMessage(pg_conn));
- return ADBC_STATUS_IO;
+ return Status::IO("Error writing tuple field data: ",
PQerrorMessage(pg_conn));
}
remaining -= to_write;
data += to_write;
}
- return ADBC_STATUS_OK;
+ return Status::Ok();
}
};
} // namespace adbcpq
diff --git a/c/driver/postgresql/result_helper.h
b/c/driver/postgresql/result_helper.h
index 0880cbc85..6dc7debf3 100644
--- a/c/driver/postgresql/result_helper.h
+++ b/c/driver/postgresql/result_helper.h
@@ -48,6 +48,8 @@ struct PqRecord {
}
return result;
}
+
+ std::string_view value() { return std::string_view(data, len); }
};
// Used by PqResultHelper to provide index-based access to the records within
each
diff --git a/c/driver/postgresql/result_reader.cc
b/c/driver/postgresql/result_reader.cc
index 7da936494..c350ab8a3 100644
--- a/c/driver/postgresql/result_reader.cc
+++ b/c/driver/postgresql/result_reader.cc
@@ -140,20 +140,11 @@ Status PqResultArrayReader::Initialize(int64_t*
rows_affected) {
// If we have to do binding, set up the bind stream an execute until
// there is a result with more than zero rows to populate.
- AdbcStatusCode status_code;
if (bind_stream_) {
- status_code = bind_stream_->Begin([] { return ADBC_STATUS_OK; }, &error_);
- if (status_code != ADBC_STATUS_OK) {
- return Status::FromAdbc(status_code, error_);
- }
- status_code =
- bind_stream_->SetParamTypes(conn_, *type_resolver_, autocommit_,
&error_);
- if (status_code != ADBC_STATUS_OK) {
- return Status::FromAdbc(status_code, error_);
- }
+ UNWRAP_STATUS(bind_stream_->Begin([] { return Status::Ok(); }));
+ UNWRAP_STATUS(bind_stream_->SetParamTypes(conn_, *type_resolver_,
autocommit_));
UNWRAP_STATUS(helper_.Prepare(bind_stream_->param_types));
-
UNWRAP_STATUS(BindNextAndExecute(nullptr));
// If there were no arrays in the bind stream, we still need a result
@@ -230,28 +221,18 @@ Status PqResultArrayReader::ToArrayStream(int64_t*
affected_rows,
Status PqResultArrayReader::BindNextAndExecute(int64_t* affected_rows) {
// Keep pulling from the bind stream and executing as long as
// we receive results with zero rows.
- AdbcStatusCode status_code;
do {
- status_code = bind_stream_->EnsureNextRow(&error_);
- if (status_code != ADBC_STATUS_OK) {
- return Status::FromAdbc(status_code, error_);
- }
+ UNWRAP_STATUS(bind_stream_->EnsureNextRow());
if (!bind_stream_->current->release) {
- status_code = bind_stream_->Cleanup(conn_, &error_);
- if (status_code != ADBC_STATUS_OK) {
- return Status::FromAdbc(status_code, error_);
- }
+ UNWRAP_STATUS(bind_stream_->Cleanup(conn_));
bind_stream_.reset();
return Status::Ok();
}
PGresult* result;
- status_code = bind_stream_->BindAndExecuteCurrentRow(
- conn_, &result, /*result_format*/ kPgBinaryFormat, &error_);
- if (status_code != ADBC_STATUS_OK) {
- return Status::FromAdbc(status_code, error_);
- }
+ UNWRAP_STATUS(bind_stream_->BindAndExecuteCurrentRow(
+ conn_, &result, /*result_format*/ kPgBinaryFormat));
helper_.SetResult(result);
if (affected_rows) {
(*affected_rows) += helper_.AffectedRows();
@@ -265,16 +246,8 @@ Status PqResultArrayReader::ExecuteAll(int64_t*
affected_rows) {
// For the case where we don't need a result, we either need to exhaust the
bind
// stream (if there is one) or execute the query without binding.
if (bind_stream_) {
- AdbcStatusCode status_code =
- bind_stream_->Begin([] { return ADBC_STATUS_OK; }, &error_);
- if (status_code != ADBC_STATUS_OK) {
- return Status::FromAdbc(status_code, error_);
- }
- status_code =
- bind_stream_->SetParamTypes(conn_, *type_resolver_, autocommit_,
&error_);
- if (status_code != ADBC_STATUS_OK) {
- return Status::FromAdbc(status_code, error_);
- }
+ UNWRAP_STATUS(bind_stream_->Begin([] { return Status::Ok(); }));
+ UNWRAP_STATUS(bind_stream_->SetParamTypes(conn_, *type_resolver_,
autocommit_));
UNWRAP_STATUS(helper_.Prepare(bind_stream_->param_types));
// Reset affected rows to zero before binding and executing any
diff --git a/c/driver/postgresql/statement.cc b/c/driver/postgresql/statement.cc
index fe30f708b..32558b494 100644
--- a/c/driver/postgresql/statement.cc
+++ b/c/driver/postgresql/statement.cc
@@ -612,12 +612,13 @@ AdbcStatusCode PostgresStatement::ExecuteIngest(struct
ArrowArrayStream* stream,
std::memset(&bind_, 0, sizeof(bind_));
std::string escaped_table;
std::string escaped_field_list;
- RAISE_ADBC(bind_stream.Begin(
- [&]() -> AdbcStatusCode {
- return CreateBulkTable(current_schema, bind_stream.bind_schema.value,
- &escaped_table, &escaped_field_list, error);
- },
- error));
+ RAISE_STATUS(error, bind_stream.Begin([&]() -> Status {
+ struct AdbcError tmp_error = ADBC_ERROR_INIT;
+ AdbcStatusCode status_code =
+ CreateBulkTable(current_schema, bind_stream.bind_schema.value,
&escaped_table,
+ &escaped_field_list, &tmp_error);
+ return Status::FromAdbc(status_code, tmp_error);
+ }));
std::string query = "COPY ";
query += escaped_table;
@@ -634,8 +635,9 @@ AdbcStatusCode PostgresStatement::ExecuteIngest(struct
ArrowArrayStream* stream,
}
PQclear(result);
- RAISE_ADBC(bind_stream.ExecuteCopy(connection_->conn(),
*connection_->type_resolver(),
- rows_affected, error));
+ RAISE_STATUS(error,
+ bind_stream.ExecuteCopy(connection_->conn(),
*connection_->type_resolver(),
+ rows_affected));
return ADBC_STATUS_OK;
}