This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 70a20030f refactor(c/driver/postgresql): Use Status for error handling 
in BindStream (#2187)
70a20030f is described below

commit 70a20030f4982dfb52c5905182c1eaabced7afc3
Author: Dewey Dunnington <[email protected]>
AuthorDate: Wed Sep 25 11:14:24 2024 -0500

    refactor(c/driver/postgresql): Use Status for error handling in BindStream 
(#2187)
    
    This PR migrates the `BindStream` to use the `PqResultHelper` and
    `Status`!
    
    I believe the CI failure are still glib and Python packaging.
---
 c/driver/postgresql/bind_stream.h    | 240 +++++++++++++----------------------
 c/driver/postgresql/result_helper.h  |   2 +
 c/driver/postgresql/result_reader.cc |  43 ++-----
 c/driver/postgresql/statement.cc     |  18 +--
 4 files changed, 111 insertions(+), 192 deletions(-)

diff --git a/c/driver/postgresql/bind_stream.h 
b/c/driver/postgresql/bind_stream.h
index e5e218ea4..df0b9d2ca 100644
--- a/c/driver/postgresql/bind_stream.h
+++ b/c/driver/postgresql/bind_stream.h
@@ -27,10 +27,10 @@
 #include <arrow-adbc/adbc.h>
 
 #include "copy/writer.h"
-#include "driver/common/utils.h"
 #include "error.h"
 #include "postgres_type.h"
 #include "postgres_util.h"
+#include "result_helper.h"
 
 namespace adbcpq {
 
@@ -71,40 +71,37 @@ struct BindStream {
   }
 
   template <typename Callback>
-  AdbcStatusCode Begin(Callback&& callback, struct AdbcError* error) {
-    CHECK_NA_DETAIL(INTERNAL,
-                    ArrowArrayStreamGetSchema(&bind.value, &bind_schema.value, 
&na_error),
-                    &na_error, error);
+  Status Begin(Callback&& callback) {
+    UNWRAP_NANOARROW(
+        na_error, Internal,
+        ArrowArrayStreamGetSchema(&bind.value, &bind_schema.value, &na_error));
 
     struct ArrowSchemaView bind_schema_view;
-    CHECK_NA_DETAIL(INTERNAL,
-                    ArrowSchemaViewInit(&bind_schema_view, &bind_schema.value, 
&na_error),
-                    &na_error, error);
+    UNWRAP_NANOARROW(
+        na_error, Internal,
+        ArrowSchemaViewInit(&bind_schema_view, &bind_schema.value, &na_error));
     if (bind_schema_view.type != ArrowType::NANOARROW_TYPE_STRUCT) {
-      SetError(error, "%s", "[libpq] Bind parameters must have type STRUCT");
-      return ADBC_STATUS_INVALID_STATE;
+      return Status::InvalidState("[libpq] Bind parameters must have type 
STRUCT");
     }
 
     bind_schema_fields.resize(bind_schema->n_children);
     for (size_t i = 0; i < bind_schema_fields.size(); i++) {
-      CHECK_NA(INTERNAL,
-               ArrowSchemaViewInit(&bind_schema_fields[i], 
bind_schema->children[i],
-                                   /*error*/ nullptr),
-               error);
+      UNWRAP_ERRNO(Internal,
+                   ArrowSchemaViewInit(&bind_schema_fields[i], 
bind_schema->children[i],
+                                       /*error*/ nullptr));
     }
 
-    CHECK_NA_DETAIL(
-        INTERNAL,
-        ArrowArrayViewInitFromSchema(&array_view.value, &bind_schema.value, 
&na_error),
-        &na_error, error);
+    UNWRAP_NANOARROW(
+        na_error, Internal,
+        ArrowArrayViewInitFromSchema(&array_view.value, &bind_schema.value, 
&na_error));
 
     ArrowBufferInit(&param_buffer.value);
 
     return std::move(callback)();
   }
 
-  AdbcStatusCode SetParamTypes(PGconn* pg_conn, const PostgresTypeResolver& 
type_resolver,
-                               const bool autocommit, struct AdbcError* error) 
{
+  Status SetParamTypes(PGconn* pg_conn, const PostgresTypeResolver& 
type_resolver,
+                       const bool autocommit) {
     param_types.resize(bind_schema->n_children);
     param_values.resize(bind_schema->n_children);
     param_lengths.resize(bind_schema->n_children);
@@ -113,126 +110,93 @@ struct BindStream {
 
     for (size_t i = 0; i < bind_field_writers.size(); i++) {
       PostgresType type;
-      CHECK_NA_DETAIL(INTERNAL,
-                      PostgresType::FromSchema(type_resolver, 
bind_schema->children[i],
-                                               &type, &na_error),
-                      &na_error, error);
+      UNWRAP_NANOARROW(na_error, Internal,
+                       PostgresType::FromSchema(type_resolver, 
bind_schema->children[i],
+                                                &type, &na_error));
 
       // tz-aware timestamps require special handling to set the timezone to 
UTC
       // prior to sending over the binary protocol; must be reset after execute
       if (!has_tz_field && type.type_id() == PostgresTypeId::kTimestamptz) {
-        RAISE_ADBC(SetDatabaseTimezoneUTC(pg_conn, autocommit, error));
+        UNWRAP_STATUS(SetDatabaseTimezoneUTC(pg_conn, autocommit));
         has_tz_field = true;
       }
 
       std::unique_ptr<PostgresCopyFieldWriter> writer;
-      CHECK_NA_DETAIL(
-          INTERNAL,
+      UNWRAP_NANOARROW(
+          na_error, Internal,
           MakeCopyFieldWriter(bind_schema->children[i], 
array_view->children[i],
-                              type_resolver, &writer, &na_error),
-          &na_error, error);
+                              type_resolver, &writer, &na_error));
 
       param_types[i] = type.oid();
       param_formats[i] = kPgBinaryFormat;
       bind_field_writers[i] = std::move(writer);
     }
 
-    return ADBC_STATUS_OK;
+    return Status::Ok();
   }
 
-  AdbcStatusCode SetDatabaseTimezoneUTC(PGconn* pg_conn, const bool autocommit,
-                                        struct AdbcError* error) {
+  Status SetDatabaseTimezoneUTC(PGconn* pg_conn, const bool autocommit) {
     if (autocommit) {
-      PGresult* begin_result = PQexec(pg_conn, "BEGIN");
-      if (PQresultStatus(begin_result) != PGRES_COMMAND_OK) {
-        AdbcStatusCode code =
-            SetError(error, begin_result,
-                     "[libpq] Failed to begin transaction for timezone data: 
%s",
-                     PQerrorMessage(pg_conn));
-        PQclear(begin_result);
-        return code;
-      }
-      PQclear(begin_result);
+      PqResultHelper helper(pg_conn, "BEGIN");
+      UNWRAP_STATUS(helper.Execute());
     }
 
-    PGresult* get_tz_result = PQexec(pg_conn, "SELECT 
current_setting('TIMEZONE')");
-    if (PQresultStatus(get_tz_result) != PGRES_TUPLES_OK) {
-      AdbcStatusCode code =
-          SetError(error, get_tz_result, "[libpq] Could not query current 
timezone: %s",
-                   PQerrorMessage(pg_conn));
-      PQclear(get_tz_result);
-      return code;
+    PqResultHelper get_tz(pg_conn, "SELECT current_setting('TIMEZONE')");
+    UNWRAP_STATUS(get_tz.Execute());
+    for (auto row : get_tz) {
+      tz_setting = row[0].value();
     }
 
-    tz_setting = std::string(PQgetvalue(get_tz_result, 0, 0));
-    PQclear(get_tz_result);
-
-    PGresult* set_utc_result = PQexec(pg_conn, "SET TIME ZONE 'UTC'");
-    if (PQresultStatus(set_utc_result) != PGRES_COMMAND_OK) {
-      AdbcStatusCode code =
-          SetError(error, set_utc_result, "[libpq] Failed to set time zone to 
UTC: %s",
-                   PQerrorMessage(pg_conn));
-      PQclear(set_utc_result);
-      return code;
-    }
-    PQclear(set_utc_result);
+    PqResultHelper set_utc(pg_conn, "SET TIME ZONE 'UTC'");
+    UNWRAP_STATUS(set_utc.Execute());
 
-    return ADBC_STATUS_OK;
+    return Status::Ok();
   }
 
-  AdbcStatusCode Prepare(PGconn* pg_conn, const std::string& query,
-                         struct AdbcError* error) {
-    PGresult* result = PQprepare(pg_conn, /*stmtName=*/"", query.c_str(),
-                                 /*nParams=*/bind_schema->n_children, 
param_types.data());
-    if (PQresultStatus(result) != PGRES_COMMAND_OK) {
-      AdbcStatusCode code =
-          SetError(error, result, "[libpq] Failed to prepare query: %s\nQuery 
was:%s",
-                   PQerrorMessage(pg_conn), query.c_str());
-      PQclear(result);
-      return code;
-    }
-    PQclear(result);
-    return ADBC_STATUS_OK;
+  Status Prepare(PGconn* pg_conn, const std::string& query) {
+    PqResultHelper helper(pg_conn, query);
+    UNWRAP_STATUS(helper.Prepare(param_types));
+    return Status::Ok();
   }
 
-  AdbcStatusCode PullNextArray(AdbcError* error) {
+  Status PullNextArray() {
     if (current->release != nullptr) ArrowArrayRelease(&current.value);
 
-    CHECK_NA_DETAIL(IO, ArrowArrayStreamGetNext(&bind.value, &current.value, 
&na_error),
-                    &na_error, error);
+    UNWRAP_NANOARROW(na_error, IO,
+                     ArrowArrayStreamGetNext(&bind.value, &current.value, 
&na_error));
 
     if (current->release != nullptr) {
-      CHECK_NA_DETAIL(
-          INTERNAL, ArrowArrayViewSetArray(&array_view.value, &current.value, 
&na_error),
-          &na_error, error);
+      UNWRAP_NANOARROW(
+          na_error, Internal,
+          ArrowArrayViewSetArray(&array_view.value, &current.value, 
&na_error));
     }
 
-    return ADBC_STATUS_OK;
+    return Status::Ok();
   }
 
-  AdbcStatusCode EnsureNextRow(AdbcError* error) {
+  Status EnsureNextRow() {
     if (current->release != nullptr) {
       current_row++;
       if (current_row < current->length) {
-        return ADBC_STATUS_OK;
+        return Status::Ok();
       }
     }
 
     // Pull until we have an array with at least one row or the stream is 
finished
     do {
-      RAISE_ADBC(PullNextArray(error));
+      UNWRAP_STATUS(PullNextArray());
       if (current->release == nullptr) {
         current_row = -1;
-        return ADBC_STATUS_OK;
+        return Status::Ok();
       }
     } while (current->length == 0);
 
     current_row = 0;
-    return ADBC_STATUS_OK;
+    return Status::Ok();
   }
 
-  AdbcStatusCode BindAndExecuteCurrentRow(PGconn* pg_conn, PGresult** 
result_out,
-                                          int result_format, AdbcError* error) 
{
+  Status BindAndExecuteCurrentRow(PGconn* pg_conn, PGresult** result_out,
+                                  int result_format) {
     param_buffer->size_bytes = 0;
     int64_t last_offset = 0;
 
@@ -240,18 +204,16 @@ struct BindStream {
       if (!ArrowArrayViewIsNull(array_view->children[col], current_row)) {
         // Note that this Write() call currently writes the (int32_t) byte 
size of the
         // field in addition to the serialized value.
-        CHECK_NA_DETAIL(
-            INTERNAL,
-            bind_field_writers[col]->Write(&param_buffer.value, current_row, 
&na_error),
-            &na_error, error);
+        UNWRAP_NANOARROW(
+            na_error, Internal,
+            bind_field_writers[col]->Write(&param_buffer.value, current_row, 
&na_error));
       } else {
-        CHECK_NA(INTERNAL, ArrowBufferAppendInt32(&param_buffer.value, 0), 
error);
+        UNWRAP_ERRNO(Internal, ArrowBufferAppendInt32(&param_buffer.value, 0));
       }
 
       int64_t param_length = param_buffer->size_bytes - last_offset - 
sizeof(int32_t);
       if (param_length > (std::numeric_limits<int>::max)()) {
-        SetError(error, "Parameter %" PRId64 " serialized to >2GB of binary", 
col);
-        return ADBC_STATUS_INTERNAL;
+        return Status::Internal("Paramter ", col, "serialized to >2GB of 
binary");
       }
 
       param_lengths[col] = static_cast<int>(param_length);
@@ -276,60 +238,45 @@ struct BindStream {
 
     ExecStatusType pg_status = PQresultStatus(result);
     if (pg_status != PGRES_COMMAND_OK && pg_status != PGRES_TUPLES_OK) {
-      AdbcStatusCode code =
-          SetError(error, result, "[libpq] Failed to execute prepared 
statement: %s %s",
-                   PQresStatus(pg_status), PQerrorMessage(pg_conn));
+      Status status =
+          MakeStatus(result, "[libpq] Failed to execute prepared statement: {} 
{}",
+                     PQresStatus(pg_status), PQerrorMessage(pg_conn));
       PQclear(result);
-      return code;
+      return status;
     }
 
     *result_out = result;
-    return ADBC_STATUS_OK;
+    return Status::Ok();
   }
 
-  AdbcStatusCode Cleanup(PGconn* pg_conn, AdbcError* error) {
+  Status Cleanup(PGconn* pg_conn) {
     if (has_tz_field) {
-      std::string reset_query = "SET TIME ZONE '" + tz_setting + "'";
-      PGresult* reset_tz_result = PQexec(pg_conn, reset_query.c_str());
-      if (PQresultStatus(reset_tz_result) != PGRES_COMMAND_OK) {
-        AdbcStatusCode code =
-            SetError(error, reset_tz_result, "[libpq] Failed to reset time 
zone: %s",
-                     PQerrorMessage(pg_conn));
-        PQclear(reset_tz_result);
-        return code;
-      }
-      PQclear(reset_tz_result);
-
-      PGresult* commit_result = PQexec(pg_conn, "COMMIT");
-      if (PQresultStatus(commit_result) != PGRES_COMMAND_OK) {
-        AdbcStatusCode code =
-            SetError(error, commit_result, "[libpq] Failed to commit 
transaction: %s",
-                     PQerrorMessage(pg_conn));
-        PQclear(commit_result);
-        return code;
-      }
-      PQclear(commit_result);
+      PqResultHelper reset(pg_conn, "SET TIME ZONE '" + tz_setting + "'");
+      UNWRAP_STATUS(reset.Execute());
+
+      PqResultHelper commit(pg_conn, "COMMIT");
+      UNWRAP_STATUS(reset.Execute());
     }
 
-    return ADBC_STATUS_OK;
+    return Status::Ok();
   }
 
-  AdbcStatusCode ExecuteCopy(PGconn* pg_conn, const PostgresTypeResolver& 
type_resolver,
-                             int64_t* rows_affected, struct AdbcError* error) {
+  Status ExecuteCopy(PGconn* pg_conn, const PostgresTypeResolver& 
type_resolver,
+                     int64_t* rows_affected) {
     if (rows_affected) *rows_affected = 0;
 
     PostgresCopyStreamWriter writer;
-    CHECK_NA(INTERNAL, writer.Init(&bind_schema.value), error);
-    CHECK_NA_DETAIL(INTERNAL, writer.InitFieldWriters(type_resolver, 
&na_error),
-                    &na_error, error);
+    UNWRAP_ERRNO(Internal, writer.Init(&bind_schema.value));
+    UNWRAP_NANOARROW(na_error, Internal,
+                     writer.InitFieldWriters(type_resolver, &na_error));
 
-    CHECK_NA_DETAIL(INTERNAL, writer.WriteHeader(&na_error), &na_error, error);
+    UNWRAP_NANOARROW(na_error, Internal, writer.WriteHeader(&na_error));
 
     while (true) {
-      RAISE_ADBC(PullNextArray(error));
+      UNWRAP_STATUS(PullNextArray());
       if (!current->release) break;
 
-      CHECK_NA(INTERNAL, writer.SetArray(&current.value), error);
+      UNWRAP_ERRNO(Internal, writer.SetArray(&current.value));
 
       // build writer buffer
       int write_result;
@@ -339,42 +286,38 @@ struct BindStream {
 
       // check if not ENODATA at exit
       if (write_result != ENODATA) {
-        SetError(error, "Error occurred writing COPY data: %s", 
PQerrorMessage(pg_conn));
-        return ADBC_STATUS_IO;
+        return Status::IO("Error occurred writing COPY data: ", 
PQerrorMessage(pg_conn));
       }
 
-      RAISE_ADBC(FlushCopyWriterToConn(pg_conn, writer, error));
+      UNWRAP_STATUS(FlushCopyWriterToConn(pg_conn, writer));
 
       if (rows_affected) *rows_affected += current->length;
       writer.Rewind();
     }
 
     // If there were no arrays in the stream, we haven't flushed yet
-    RAISE_ADBC(FlushCopyWriterToConn(pg_conn, writer, error));
+    UNWRAP_STATUS(FlushCopyWriterToConn(pg_conn, writer));
 
     if (PQputCopyEnd(pg_conn, NULL) <= 0) {
-      SetError(error, "Error message returned by PQputCopyEnd: %s",
-               PQerrorMessage(pg_conn));
-      return ADBC_STATUS_IO;
+      return Status::IO("Error message returned by PQputCopyEnd: ",
+                        PQerrorMessage(pg_conn));
     }
 
     PGresult* result = PQgetResult(pg_conn);
     ExecStatusType pg_status = PQresultStatus(result);
     if (pg_status != PGRES_COMMAND_OK) {
-      AdbcStatusCode code =
-          SetError(error, result, "[libpq] Failed to execute COPY statement: 
%s %s",
-                   PQresStatus(pg_status), PQerrorMessage(pg_conn));
+      Status status =
+          MakeStatus(result, "[libpq] Failed to execute COPY statement: {} {}",
+                     PQresStatus(pg_status), PQerrorMessage(pg_conn));
       PQclear(result);
-      return code;
+      return status;
     }
 
     PQclear(result);
-    return ADBC_STATUS_OK;
+    return Status::Ok();
   }
 
-  AdbcStatusCode FlushCopyWriterToConn(PGconn* pg_conn,
-                                       const PostgresCopyStreamWriter& writer,
-                                       struct AdbcError* error) {
+  Status FlushCopyWriterToConn(PGconn* pg_conn, const 
PostgresCopyStreamWriter& writer) {
     // https://github.com/apache/arrow-adbc/issues/1921: PostgreSQL has a max
     // size for a single message that we need to respect (1 GiB - 1).  Since
     // the buffer can be chunked up as much as we want, go for 16 MiB as our
@@ -388,14 +331,13 @@ struct BindStream {
     while (remaining > 0) {
       int64_t to_write = std::min<int64_t>(remaining, kMaxCopyBufferSize);
       if (PQputCopyData(pg_conn, data, to_write) <= 0) {
-        SetError(error, "Error writing tuple field data: %s", 
PQerrorMessage(pg_conn));
-        return ADBC_STATUS_IO;
+        return Status::IO("Error writing tuple field data: ", 
PQerrorMessage(pg_conn));
       }
       remaining -= to_write;
       data += to_write;
     }
 
-    return ADBC_STATUS_OK;
+    return Status::Ok();
   }
 };
 }  // namespace adbcpq
diff --git a/c/driver/postgresql/result_helper.h 
b/c/driver/postgresql/result_helper.h
index 0880cbc85..6dc7debf3 100644
--- a/c/driver/postgresql/result_helper.h
+++ b/c/driver/postgresql/result_helper.h
@@ -48,6 +48,8 @@ struct PqRecord {
     }
     return result;
   }
+
+  std::string_view value() { return std::string_view(data, len); }
 };
 
 // Used by PqResultHelper to provide index-based access to the records within 
each
diff --git a/c/driver/postgresql/result_reader.cc 
b/c/driver/postgresql/result_reader.cc
index 7da936494..c350ab8a3 100644
--- a/c/driver/postgresql/result_reader.cc
+++ b/c/driver/postgresql/result_reader.cc
@@ -140,20 +140,11 @@ Status PqResultArrayReader::Initialize(int64_t* 
rows_affected) {
 
   // If we have to do binding, set up the bind stream an execute until
   // there is a result with more than zero rows to populate.
-  AdbcStatusCode status_code;
   if (bind_stream_) {
-    status_code = bind_stream_->Begin([] { return ADBC_STATUS_OK; }, &error_);
-    if (status_code != ADBC_STATUS_OK) {
-      return Status::FromAdbc(status_code, error_);
-    }
-    status_code =
-        bind_stream_->SetParamTypes(conn_, *type_resolver_, autocommit_, 
&error_);
-    if (status_code != ADBC_STATUS_OK) {
-      return Status::FromAdbc(status_code, error_);
-    }
+    UNWRAP_STATUS(bind_stream_->Begin([] { return Status::Ok(); }));
 
+    UNWRAP_STATUS(bind_stream_->SetParamTypes(conn_, *type_resolver_, 
autocommit_));
     UNWRAP_STATUS(helper_.Prepare(bind_stream_->param_types));
-
     UNWRAP_STATUS(BindNextAndExecute(nullptr));
 
     // If there were no arrays in the bind stream, we still need a result
@@ -230,28 +221,18 @@ Status PqResultArrayReader::ToArrayStream(int64_t* 
affected_rows,
 Status PqResultArrayReader::BindNextAndExecute(int64_t* affected_rows) {
   // Keep pulling from the bind stream and executing as long as
   // we receive results with zero rows.
-  AdbcStatusCode status_code;
   do {
-    status_code = bind_stream_->EnsureNextRow(&error_);
-    if (status_code != ADBC_STATUS_OK) {
-      return Status::FromAdbc(status_code, error_);
-    }
+    UNWRAP_STATUS(bind_stream_->EnsureNextRow());
 
     if (!bind_stream_->current->release) {
-      status_code = bind_stream_->Cleanup(conn_, &error_);
-      if (status_code != ADBC_STATUS_OK) {
-        return Status::FromAdbc(status_code, error_);
-      }
+      UNWRAP_STATUS(bind_stream_->Cleanup(conn_));
       bind_stream_.reset();
       return Status::Ok();
     }
 
     PGresult* result;
-    status_code = bind_stream_->BindAndExecuteCurrentRow(
-        conn_, &result, /*result_format*/ kPgBinaryFormat, &error_);
-    if (status_code != ADBC_STATUS_OK) {
-      return Status::FromAdbc(status_code, error_);
-    }
+    UNWRAP_STATUS(bind_stream_->BindAndExecuteCurrentRow(
+        conn_, &result, /*result_format*/ kPgBinaryFormat));
     helper_.SetResult(result);
     if (affected_rows) {
       (*affected_rows) += helper_.AffectedRows();
@@ -265,16 +246,8 @@ Status PqResultArrayReader::ExecuteAll(int64_t* 
affected_rows) {
   // For the case where we don't need a result, we either need to exhaust the 
bind
   // stream (if there is one) or execute the query without binding.
   if (bind_stream_) {
-    AdbcStatusCode status_code =
-        bind_stream_->Begin([] { return ADBC_STATUS_OK; }, &error_);
-    if (status_code != ADBC_STATUS_OK) {
-      return Status::FromAdbc(status_code, error_);
-    }
-    status_code =
-        bind_stream_->SetParamTypes(conn_, *type_resolver_, autocommit_, 
&error_);
-    if (status_code != ADBC_STATUS_OK) {
-      return Status::FromAdbc(status_code, error_);
-    }
+    UNWRAP_STATUS(bind_stream_->Begin([] { return Status::Ok(); }));
+    UNWRAP_STATUS(bind_stream_->SetParamTypes(conn_, *type_resolver_, 
autocommit_));
     UNWRAP_STATUS(helper_.Prepare(bind_stream_->param_types));
 
     // Reset affected rows to zero before binding and executing any
diff --git a/c/driver/postgresql/statement.cc b/c/driver/postgresql/statement.cc
index fe30f708b..32558b494 100644
--- a/c/driver/postgresql/statement.cc
+++ b/c/driver/postgresql/statement.cc
@@ -612,12 +612,13 @@ AdbcStatusCode PostgresStatement::ExecuteIngest(struct 
ArrowArrayStream* stream,
   std::memset(&bind_, 0, sizeof(bind_));
   std::string escaped_table;
   std::string escaped_field_list;
-  RAISE_ADBC(bind_stream.Begin(
-      [&]() -> AdbcStatusCode {
-        return CreateBulkTable(current_schema, bind_stream.bind_schema.value,
-                               &escaped_table, &escaped_field_list, error);
-      },
-      error));
+  RAISE_STATUS(error, bind_stream.Begin([&]() -> Status {
+    struct AdbcError tmp_error = ADBC_ERROR_INIT;
+    AdbcStatusCode status_code =
+        CreateBulkTable(current_schema, bind_stream.bind_schema.value, 
&escaped_table,
+                        &escaped_field_list, &tmp_error);
+    return Status::FromAdbc(status_code, tmp_error);
+  }));
 
   std::string query = "COPY ";
   query += escaped_table;
@@ -634,8 +635,9 @@ AdbcStatusCode PostgresStatement::ExecuteIngest(struct 
ArrowArrayStream* stream,
   }
 
   PQclear(result);
-  RAISE_ADBC(bind_stream.ExecuteCopy(connection_->conn(), 
*connection_->type_resolver(),
-                                     rows_affected, error));
+  RAISE_STATUS(error,
+               bind_stream.ExecuteCopy(connection_->conn(), 
*connection_->type_resolver(),
+                                       rows_affected));
   return ADBC_STATUS_OK;
 }
 

Reply via email to