This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 83f0768af feat(c/driver/postgresql): Support queries that bind 
parameters and return a result (#2065)
83f0768af is described below

commit 83f0768affaa27b4e413107d95de43e3c2d8c789
Author: Dewey Dunnington <[email protected]>
AuthorDate: Wed Aug 14 20:52:43 2024 -0300

    feat(c/driver/postgresql): Support queries that bind parameters and return 
a result (#2065)
    
    This PR allows statements with a bind stream to return a result. This is
    not trivial because it requires concatenating results from multiple rows
    in a bind stream into one output stream. The existing bind stream had
    been wired up to pull and loop in one big function, but we need
    something with that knows which array/row it's on and can "bind and
    execute" the next one. I think I did this successfully without modifying
    the existing binding behaviour (which could maybe use the copy writer at
    some point in the future for better type support).
    
    The behaviour of the "result reader" now incorporates the bind stream:
    
    - When the result reader is being created/initialized, it executes
    eagerly (the query if there are no bind parameters, and the query for
    all the bind parameters that return zero-row results or until the bind
    stream is exhausted otherwise). If there is no `out` array stream, it
    will exhaust the bind stream before returning. This mirrors what
    currently happens and I think is more of what a user would expect;
    however, it does mean that a user would have to remember to fully
    consume the output array stream to ensure that the full bind stream was
    exhausted (if they specified an output stream in the first place).
    - On each call to `GetNext()` of the result, we check for an existing
    `PGresult` to convert, and try to regenerate it if it's absent (==
    already been converted to an array and returned) using a new value from
    the bind stream. When pulling from the bind stream we pull everything we
    can until we get something with at least one row.
    
    Much existing behaviour (update, create table, insert, delete) flows
    through the first bullet, and I believe this has some more correct
    behaviour for the case where an insert or update or delete updates a
    number of rows that isn't one and/or there is more than one value in the
    bind stream.
    
    ``` r
    library(adbcdrivermanager)
    
    con <- adbc_database_init(
      adbcpostgresql::adbcpostgresql(),
      uri = 
"postgresql://localhost:5432/postgres?user=postgres&password=password"
    ) |>
      adbc_connection_init()
    
    read_adbc(con, "SELECT $1 as x", bind = data.frame(x = c("a", NA, "b", 
"c"))) |>
      as.data.frame()
    #>      x
    #> 1    a
    #> 2 <NA>
    #> 3    b
    #> 4    c
    
    execute_adbc(con, "DROP TABLE IF EXISTS integers")
    write_adbc(data.frame(x = 1:10), con, "integers")
    
    read_adbc(con, "SELECT * from integers where x > $1", bind = data.frame(x = 
8:10)) |>
      as.data.frame()
    #>    x
    #> 1  9
    #> 2 10
    #> 3 10
    ```
    
    <sup>Created on 2024-08-13 with [reprex
    v2.0.2](https://reprex.tidyverse.org)</sup>
    
    Closes #2024.
---
 c/driver/postgresql/CMakeLists.txt     |   1 +
 c/driver/postgresql/bind_stream.h      | 615 ++++++++++++++++++++++++++++++++
 c/driver/postgresql/meson.build        |   1 +
 c/driver/postgresql/postgresql_test.cc | 134 +++++++
 c/driver/postgresql/result_helper.cc   | 154 --------
 c/driver/postgresql/result_helper.h    |  52 +--
 c/driver/postgresql/result_reader.cc   | 276 ++++++++++++++
 c/driver/postgresql/result_reader.h    |  92 +++++
 c/driver/postgresql/statement.cc       | 633 +--------------------------------
 r/adbcpostgresql/src/Makevars.in       |   1 +
 r/adbcpostgresql/src/Makevars.ucrt     |   1 +
 r/adbcpostgresql/src/Makevars.win      |   1 +
 12 files changed, 1137 insertions(+), 824 deletions(-)

diff --git a/c/driver/postgresql/CMakeLists.txt 
b/c/driver/postgresql/CMakeLists.txt
index 8a004e0d0..a720696c6 100644
--- a/c/driver/postgresql/CMakeLists.txt
+++ b/c/driver/postgresql/CMakeLists.txt
@@ -33,6 +33,7 @@ add_arrow_lib(adbc_driver_postgresql
               database.cc
               postgresql.cc
               result_helper.cc
+              result_reader.cc
               statement.cc
               OUTPUTS
               ADBC_LIBRARIES
diff --git a/c/driver/postgresql/bind_stream.h 
b/c/driver/postgresql/bind_stream.h
new file mode 100644
index 000000000..3e440e800
--- /dev/null
+++ b/c/driver/postgresql/bind_stream.h
@@ -0,0 +1,615 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <limits>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <arrow-adbc/adbc.h>
+
+#include "copy/writer.h"
+#include "driver/common/utils.h"
+#include "error.h"
+#include "postgres_type.h"
+#include "postgres_util.h"
+
+namespace adbcpq {
+
+/// The flag indicating to PostgreSQL that we want binary-format values.
+constexpr int kPgBinaryFormat = 1;
+
+/// Helper to manage bind parameters with a prepared statement
+struct BindStream {
+  Handle<struct ArrowArrayStream> bind;
+  Handle<struct ArrowArrayView> array_view;
+  Handle<struct ArrowArray> current;
+  Handle<struct ArrowSchema> bind_schema;
+  int64_t current_row = -1;
+
+  struct ArrowSchemaView bind_schema_view;
+  std::vector<struct ArrowSchemaView> bind_schema_fields;
+
+  // OIDs for parameter types
+  std::vector<uint32_t> param_types;
+  std::vector<char*> param_values;
+  std::vector<int> param_lengths;
+  std::vector<int> param_formats;
+  std::vector<size_t> param_values_offsets;
+  std::vector<char> param_values_buffer;
+  // XXX: this assumes fixed-length fields only - will need more
+  // consideration to deal with variable-length fields
+
+  bool has_tz_field = false;
+  std::string tz_setting;
+
+  struct ArrowError na_error;
+
+  BindStream() {
+    this->bind->release = nullptr;
+    std::memset(&na_error, 0, sizeof(na_error));
+  }
+
+  void SetBind(struct ArrowArrayStream* stream) {
+    this->bind.reset();
+    ArrowArrayStreamMove(stream, &bind.value);
+  }
+
+  template <typename Callback>
+  AdbcStatusCode Begin(Callback&& callback, struct AdbcError* error) {
+    CHECK_NA_DETAIL(INTERNAL,
+                    ArrowArrayStreamGetSchema(&bind.value, &bind_schema.value, 
&na_error),
+                    &na_error, error);
+    CHECK_NA_DETAIL(INTERNAL,
+                    ArrowSchemaViewInit(&bind_schema_view, &bind_schema.value, 
&na_error),
+                    &na_error, error);
+
+    if (bind_schema_view.type != ArrowType::NANOARROW_TYPE_STRUCT) {
+      SetError(error, "%s", "[libpq] Bind parameters must have type STRUCT");
+      return ADBC_STATUS_INVALID_STATE;
+    }
+
+    bind_schema_fields.resize(bind_schema->n_children);
+    for (size_t i = 0; i < bind_schema_fields.size(); i++) {
+      CHECK_NA(INTERNAL,
+               ArrowSchemaViewInit(&bind_schema_fields[i], 
bind_schema->children[i],
+                                   /*error*/ nullptr),
+               error);
+    }
+
+    CHECK_NA_DETAIL(
+        INTERNAL,
+        ArrowArrayViewInitFromSchema(&array_view.value, &bind_schema.value, 
&na_error),
+        &na_error, error);
+
+    return std::move(callback)();
+  }
+
+  AdbcStatusCode SetParamTypes(const PostgresTypeResolver& type_resolver,
+                               struct AdbcError* error) {
+    param_types.resize(bind_schema->n_children);
+    param_values.resize(bind_schema->n_children);
+    param_lengths.resize(bind_schema->n_children);
+    param_formats.resize(bind_schema->n_children, kPgBinaryFormat);
+    param_values_offsets.reserve(bind_schema->n_children);
+
+    for (size_t i = 0; i < bind_schema_fields.size(); i++) {
+      PostgresTypeId type_id;
+      switch (bind_schema_fields[i].type) {
+        case ArrowType::NANOARROW_TYPE_BOOL:
+          type_id = PostgresTypeId::kBool;
+          param_lengths[i] = 1;
+          break;
+        case ArrowType::NANOARROW_TYPE_INT8:
+        case ArrowType::NANOARROW_TYPE_INT16:
+          type_id = PostgresTypeId::kInt2;
+          param_lengths[i] = 2;
+          break;
+        case ArrowType::NANOARROW_TYPE_INT32:
+          type_id = PostgresTypeId::kInt4;
+          param_lengths[i] = 4;
+          break;
+        case ArrowType::NANOARROW_TYPE_INT64:
+          type_id = PostgresTypeId::kInt8;
+          param_lengths[i] = 8;
+          break;
+        case ArrowType::NANOARROW_TYPE_FLOAT:
+          type_id = PostgresTypeId::kFloat4;
+          param_lengths[i] = 4;
+          break;
+        case ArrowType::NANOARROW_TYPE_DOUBLE:
+          type_id = PostgresTypeId::kFloat8;
+          param_lengths[i] = 8;
+          break;
+        case ArrowType::NANOARROW_TYPE_STRING:
+        case ArrowType::NANOARROW_TYPE_LARGE_STRING:
+          type_id = PostgresTypeId::kText;
+          param_lengths[i] = 0;
+          break;
+        case ArrowType::NANOARROW_TYPE_BINARY:
+          type_id = PostgresTypeId::kBytea;
+          param_lengths[i] = 0;
+          break;
+        case ArrowType::NANOARROW_TYPE_DATE32:
+          type_id = PostgresTypeId::kDate;
+          param_lengths[i] = 4;
+          break;
+        case ArrowType::NANOARROW_TYPE_TIMESTAMP:
+          type_id = PostgresTypeId::kTimestamp;
+          param_lengths[i] = 8;
+          break;
+        case ArrowType::NANOARROW_TYPE_DURATION:
+        case ArrowType::NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO:
+          type_id = PostgresTypeId::kInterval;
+          param_lengths[i] = 16;
+          break;
+        case ArrowType::NANOARROW_TYPE_DECIMAL128:
+        case ArrowType::NANOARROW_TYPE_DECIMAL256:
+          type_id = PostgresTypeId::kNumeric;
+          param_lengths[i] = 0;
+          break;
+        case ArrowType::NANOARROW_TYPE_DICTIONARY: {
+          struct ArrowSchemaView value_view;
+          CHECK_NA(INTERNAL,
+                   ArrowSchemaViewInit(&value_view, 
bind_schema->children[i]->dictionary,
+                                       nullptr),
+                   error);
+          switch (value_view.type) {
+            case NANOARROW_TYPE_BINARY:
+            case NANOARROW_TYPE_LARGE_BINARY:
+              type_id = PostgresTypeId::kBytea;
+              param_lengths[i] = 0;
+              break;
+            case NANOARROW_TYPE_STRING:
+            case NANOARROW_TYPE_LARGE_STRING:
+              type_id = PostgresTypeId::kText;
+              param_lengths[i] = 0;
+              break;
+            default:
+              SetError(error, "%s%" PRIu64 "%s%s%s%s", "[libpq] Field #",
+                       static_cast<uint64_t>(i + 1), " ('",
+                       bind_schema->children[i]->name,
+                       "') has unsupported dictionary value parameter type ",
+                       ArrowTypeString(value_view.type));
+              return ADBC_STATUS_NOT_IMPLEMENTED;
+          }
+          break;
+        }
+        default:
+          SetError(error, "%s%" PRIu64 "%s%s%s%s", "[libpq] Field #",
+                   static_cast<uint64_t>(i + 1), " ('", 
bind_schema->children[i]->name,
+                   "') has unsupported parameter type ",
+                   ArrowTypeString(bind_schema_fields[i].type));
+          return ADBC_STATUS_NOT_IMPLEMENTED;
+      }
+
+      param_types[i] = type_resolver.GetOID(type_id);
+      if (param_types[i] == 0) {
+        SetError(error, "%s%" PRIu64 "%s%s%s%s", "[libpq] Field #",
+                 static_cast<uint64_t>(i + 1), " ('", 
bind_schema->children[i]->name,
+                 "') has type with no corresponding PostgreSQL type ",
+                 ArrowTypeString(bind_schema_fields[i].type));
+        return ADBC_STATUS_NOT_IMPLEMENTED;
+      }
+    }
+
+    size_t param_values_length = 0;
+    for (int length : param_lengths) {
+      param_values_offsets.push_back(param_values_length);
+      param_values_length += length;
+    }
+    param_values_buffer.resize(param_values_length);
+    return ADBC_STATUS_OK;
+  }
+
+  AdbcStatusCode Prepare(PGconn* pg_conn, const std::string& query,
+                         struct AdbcError* error, const bool autocommit) {
+    // tz-aware timestamps require special handling to set the timezone to UTC
+    // prior to sending over the binary protocol; must be reset after execute
+    for (int64_t col = 0; col < bind_schema->n_children; col++) {
+      if ((bind_schema_fields[col].type == 
ArrowType::NANOARROW_TYPE_TIMESTAMP) &&
+          (strcmp("", bind_schema_fields[col].timezone))) {
+        has_tz_field = true;
+
+        if (autocommit) {
+          PGresult* begin_result = PQexec(pg_conn, "BEGIN");
+          if (PQresultStatus(begin_result) != PGRES_COMMAND_OK) {
+            AdbcStatusCode code =
+                SetError(error, begin_result,
+                         "[libpq] Failed to begin transaction for timezone 
data: %s",
+                         PQerrorMessage(pg_conn));
+            PQclear(begin_result);
+            return code;
+          }
+          PQclear(begin_result);
+        }
+
+        PGresult* get_tz_result = PQexec(pg_conn, "SELECT 
current_setting('TIMEZONE')");
+        if (PQresultStatus(get_tz_result) != PGRES_TUPLES_OK) {
+          AdbcStatusCode code = SetError(error, get_tz_result,
+                                         "[libpq] Could not query current 
timezone: %s",
+                                         PQerrorMessage(pg_conn));
+          PQclear(get_tz_result);
+          return code;
+        }
+
+        tz_setting = std::string(PQgetvalue(get_tz_result, 0, 0));
+        PQclear(get_tz_result);
+
+        PGresult* set_utc_result = PQexec(pg_conn, "SET TIME ZONE 'UTC'");
+        if (PQresultStatus(set_utc_result) != PGRES_COMMAND_OK) {
+          AdbcStatusCode code = SetError(error, set_utc_result,
+                                         "[libpq] Failed to set time zone to 
UTC: %s",
+                                         PQerrorMessage(pg_conn));
+          PQclear(set_utc_result);
+          return code;
+        }
+        PQclear(set_utc_result);
+        break;
+      }
+    }
+
+    PGresult* result = PQprepare(pg_conn, /*stmtName=*/"", query.c_str(),
+                                 /*nParams=*/bind_schema->n_children, 
param_types.data());
+    if (PQresultStatus(result) != PGRES_COMMAND_OK) {
+      AdbcStatusCode code =
+          SetError(error, result, "[libpq] Failed to prepare query: %s\nQuery 
was:%s",
+                   PQerrorMessage(pg_conn), query.c_str());
+      PQclear(result);
+      return code;
+    }
+    PQclear(result);
+    return ADBC_STATUS_OK;
+  }
+
+  AdbcStatusCode PullNextArray(AdbcError* error) {
+    if (current->release != nullptr) ArrowArrayRelease(&current.value);
+
+    CHECK_NA_DETAIL(IO, ArrowArrayStreamGetNext(&bind.value, &current.value, 
&na_error),
+                    &na_error, error);
+
+    if (current->release != nullptr) {
+      CHECK_NA_DETAIL(
+          INTERNAL, ArrowArrayViewSetArray(&array_view.value, &current.value, 
&na_error),
+          &na_error, error);
+    }
+
+    return ADBC_STATUS_OK;
+  }
+
+  AdbcStatusCode EnsureNextRow(AdbcError* error) {
+    if (current->release != nullptr) {
+      current_row++;
+      if (current_row < current->length) {
+        return ADBC_STATUS_OK;
+      }
+    }
+
+    // Pull until we have an array with at least one row or the stream is 
finished
+    do {
+      RAISE_ADBC(PullNextArray(error));
+      if (current->release == nullptr) {
+        current_row = -1;
+        return ADBC_STATUS_OK;
+      }
+    } while (current->length == 0);
+
+    current_row = 0;
+    return ADBC_STATUS_OK;
+  }
+
+  AdbcStatusCode BindAndExecuteCurrentRow(PGconn* pg_conn, PGresult** 
result_out,
+                                          int result_format, AdbcError* error) 
{
+    int64_t row = current_row;
+
+    for (int64_t col = 0; col < array_view->n_children; col++) {
+      if (ArrowArrayViewIsNull(array_view->children[col], row)) {
+        param_values[col] = nullptr;
+        continue;
+      } else {
+        param_values[col] = param_values_buffer.data() + 
param_values_offsets[col];
+      }
+      switch (bind_schema_fields[col].type) {
+        case ArrowType::NANOARROW_TYPE_BOOL: {
+          const int8_t val =
+              
ArrowBitGet(array_view->children[col]->buffer_views[1].data.as_uint8, row);
+          std::memcpy(param_values[col], &val, sizeof(int8_t));
+          break;
+        }
+
+        case ArrowType::NANOARROW_TYPE_INT8: {
+          const int16_t val =
+              array_view->children[col]->buffer_views[1].data.as_int8[row];
+          const uint16_t value = ToNetworkInt16(val);
+          std::memcpy(param_values[col], &value, sizeof(int16_t));
+          break;
+        }
+        case ArrowType::NANOARROW_TYPE_INT16: {
+          const uint16_t value = ToNetworkInt16(
+              array_view->children[col]->buffer_views[1].data.as_int16[row]);
+          std::memcpy(param_values[col], &value, sizeof(int16_t));
+          break;
+        }
+        case ArrowType::NANOARROW_TYPE_INT32: {
+          const uint32_t value = ToNetworkInt32(
+              array_view->children[col]->buffer_views[1].data.as_int32[row]);
+          std::memcpy(param_values[col], &value, sizeof(int32_t));
+          break;
+        }
+        case ArrowType::NANOARROW_TYPE_INT64: {
+          const int64_t value = ToNetworkInt64(
+              array_view->children[col]->buffer_views[1].data.as_int64[row]);
+          std::memcpy(param_values[col], &value, sizeof(int64_t));
+          break;
+        }
+        case ArrowType::NANOARROW_TYPE_FLOAT: {
+          const uint32_t value = ToNetworkFloat4(
+              array_view->children[col]->buffer_views[1].data.as_float[row]);
+          std::memcpy(param_values[col], &value, sizeof(uint32_t));
+          break;
+        }
+        case ArrowType::NANOARROW_TYPE_DOUBLE: {
+          const uint64_t value = ToNetworkFloat8(
+              array_view->children[col]->buffer_views[1].data.as_double[row]);
+          std::memcpy(param_values[col], &value, sizeof(uint64_t));
+          break;
+        }
+        case ArrowType::NANOARROW_TYPE_STRING:
+        case ArrowType::NANOARROW_TYPE_LARGE_STRING:
+        case ArrowType::NANOARROW_TYPE_BINARY: {
+          const ArrowBufferView view =
+              ArrowArrayViewGetBytesUnsafe(array_view->children[col], row);
+          // TODO: overflow check?
+          param_lengths[col] = static_cast<int>(view.size_bytes);
+          param_values[col] = const_cast<char*>(view.data.as_char);
+          break;
+        }
+        case ArrowType::NANOARROW_TYPE_DATE32: {
+          // 2000-01-01
+          constexpr int32_t kPostgresDateEpoch = 10957;
+          const int32_t raw_value =
+              array_view->children[col]->buffer_views[1].data.as_int32[row];
+          if (raw_value < INT32_MIN + kPostgresDateEpoch) {
+            SetError(error, "[libpq] Field #%" PRId64 "%s%s%s%" PRId64 "%s", 
col + 1,
+                     "('", bind_schema->children[col]->name, "') Row #", row + 
1,
+                     "has value which exceeds postgres date limits");
+            return ADBC_STATUS_INVALID_ARGUMENT;
+          }
+
+          const uint32_t value = ToNetworkInt32(raw_value - 
kPostgresDateEpoch);
+          std::memcpy(param_values[col], &value, sizeof(int32_t));
+          break;
+        }
+        case ArrowType::NANOARROW_TYPE_DURATION:
+        case ArrowType::NANOARROW_TYPE_TIMESTAMP: {
+          int64_t val = 
array_view->children[col]->buffer_views[1].data.as_int64[row];
+
+          bool overflow_safe = true;
+
+          auto unit = bind_schema_fields[col].time_unit;
+
+          switch (unit) {
+            case NANOARROW_TIME_UNIT_SECOND:
+              overflow_safe =
+                  val <= kMaxSafeSecondsToMicros && val >= 
kMinSafeSecondsToMicros;
+              if (overflow_safe) {
+                val *= 1000000;
+              }
+
+              break;
+            case NANOARROW_TIME_UNIT_MILLI:
+              overflow_safe =
+                  val <= kMaxSafeMillisToMicros && val >= 
kMinSafeMillisToMicros;
+              if (overflow_safe) {
+                val *= 1000;
+              }
+              break;
+            case NANOARROW_TIME_UNIT_MICRO:
+              break;
+            case NANOARROW_TIME_UNIT_NANO:
+              val /= 1000;
+              break;
+          }
+
+          if (!overflow_safe) {
+            SetError(error,
+                     "[libpq] Field #%" PRId64 " ('%s') Row #%" PRId64
+                     " has value '%" PRIi64 "' which exceeds PostgreSQL 
timestamp limits",
+                     col + 1, bind_schema->children[col]->name, row + 1,
+                     
array_view->children[col]->buffer_views[1].data.as_int64[row]);
+            return ADBC_STATUS_INVALID_ARGUMENT;
+          }
+
+          if (val < (std::numeric_limits<int64_t>::min)() + 
kPostgresTimestampEpoch) {
+            SetError(error,
+                     "[libpq] Field #%" PRId64 " ('%s') Row #%" PRId64
+                     " has value '%" PRIi64 "' which would underflow",
+                     col + 1, bind_schema->children[col]->name, row + 1,
+                     
array_view->children[col]->buffer_views[1].data.as_int64[row]);
+            return ADBC_STATUS_INVALID_ARGUMENT;
+          }
+
+          if (bind_schema_fields[col].type == 
ArrowType::NANOARROW_TYPE_TIMESTAMP) {
+            const uint64_t value = ToNetworkInt64(val - 
kPostgresTimestampEpoch);
+            std::memcpy(param_values[col], &value, sizeof(int64_t));
+          } else if (bind_schema_fields[col].type == 
ArrowType::NANOARROW_TYPE_DURATION) {
+            // postgres stores an interval as a 64 bit offset in microsecond
+            // resolution alongside a 32 bit day and 32 bit month
+            // for now we just send 0 for the day / month values
+            const uint64_t value = ToNetworkInt64(val);
+            std::memcpy(param_values[col], &value, sizeof(int64_t));
+            std::memset(param_values[col] + sizeof(int64_t), 0, 
sizeof(int64_t));
+          }
+          break;
+        }
+        case ArrowType::NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO: {
+          struct ArrowInterval interval;
+          ArrowIntervalInit(&interval, NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO);
+          ArrowArrayViewGetIntervalUnsafe(array_view->children[col], row, 
&interval);
+
+          const uint32_t months = ToNetworkInt32(interval.months);
+          const uint32_t days = ToNetworkInt32(interval.days);
+          const uint64_t ms = ToNetworkInt64(interval.ns / 1000);
+
+          std::memcpy(param_values[col], &ms, sizeof(uint64_t));
+          std::memcpy(param_values[col] + sizeof(uint64_t), &days, 
sizeof(uint32_t));
+          std::memcpy(param_values[col] + sizeof(uint64_t) + sizeof(uint32_t), 
&months,
+                      sizeof(uint32_t));
+          break;
+        }
+        default:
+          SetError(error, "%s%" PRId64 "%s%s%s%s", "[libpq] Field #", col + 1, 
" ('",
+                   bind_schema->children[col]->name,
+                   "') has unsupported type for ingestion ",
+                   ArrowTypeString(bind_schema_fields[col].type));
+          return ADBC_STATUS_NOT_IMPLEMENTED;
+      }
+    }
+
+    PGresult* result =
+        PQexecPrepared(pg_conn, /*stmtName=*/"",
+                       /*nParams=*/bind_schema->n_children, 
param_values.data(),
+                       param_lengths.data(), param_formats.data(), 
result_format);
+
+    ExecStatusType pg_status = PQresultStatus(result);
+    if (pg_status != PGRES_COMMAND_OK && pg_status != PGRES_TUPLES_OK) {
+      AdbcStatusCode code =
+          SetError(error, result, "[libpq] Failed to execute prepared 
statement: %s %s",
+                   PQresStatus(pg_status), PQerrorMessage(pg_conn));
+      PQclear(result);
+      return code;
+    }
+
+    *result_out = result;
+    return ADBC_STATUS_OK;
+  }
+
+  AdbcStatusCode Cleanup(PGconn* pg_conn, AdbcError* error) {
+    if (has_tz_field) {
+      std::string reset_query = "SET TIME ZONE '" + tz_setting + "'";
+      PGresult* reset_tz_result = PQexec(pg_conn, reset_query.c_str());
+      if (PQresultStatus(reset_tz_result) != PGRES_COMMAND_OK) {
+        AdbcStatusCode code =
+            SetError(error, reset_tz_result, "[libpq] Failed to reset time 
zone: %s",
+                     PQerrorMessage(pg_conn));
+        PQclear(reset_tz_result);
+        return code;
+      }
+      PQclear(reset_tz_result);
+
+      PGresult* commit_result = PQexec(pg_conn, "COMMIT");
+      if (PQresultStatus(commit_result) != PGRES_COMMAND_OK) {
+        AdbcStatusCode code =
+            SetError(error, commit_result, "[libpq] Failed to commit 
transaction: %s",
+                     PQerrorMessage(pg_conn));
+        PQclear(commit_result);
+        return code;
+      }
+      PQclear(commit_result);
+    }
+
+    return ADBC_STATUS_OK;
+  }
+
+  AdbcStatusCode ExecuteCopy(PGconn* pg_conn, const PostgresTypeResolver& 
type_resolver,
+                             int64_t* rows_affected, struct AdbcError* error) {
+    if (rows_affected) *rows_affected = 0;
+
+    PostgresCopyStreamWriter writer;
+    CHECK_NA(INTERNAL, writer.Init(&bind_schema.value), error);
+    CHECK_NA_DETAIL(INTERNAL, writer.InitFieldWriters(type_resolver, 
&na_error),
+                    &na_error, error);
+
+    CHECK_NA_DETAIL(INTERNAL, writer.WriteHeader(&na_error), &na_error, error);
+
+    while (true) {
+      RAISE_ADBC(PullNextArray(error));
+      if (!current->release) break;
+
+      CHECK_NA(INTERNAL, writer.SetArray(&current.value), error);
+
+      // build writer buffer
+      int write_result;
+      do {
+        write_result = writer.WriteRecord(&na_error);
+      } while (write_result == NANOARROW_OK);
+
+      // check if not ENODATA at exit
+      if (write_result != ENODATA) {
+        SetError(error, "Error occurred writing COPY data: %s", 
PQerrorMessage(pg_conn));
+        return ADBC_STATUS_IO;
+      }
+
+      RAISE_ADBC(FlushCopyWriterToConn(pg_conn, writer, error));
+
+      if (rows_affected) *rows_affected += current->length;
+      writer.Rewind();
+    }
+
+    // If there were no arrays in the stream, we haven't flushed yet
+    RAISE_ADBC(FlushCopyWriterToConn(pg_conn, writer, error));
+
+    if (PQputCopyEnd(pg_conn, NULL) <= 0) {
+      SetError(error, "Error message returned by PQputCopyEnd: %s",
+               PQerrorMessage(pg_conn));
+      return ADBC_STATUS_IO;
+    }
+
+    PGresult* result = PQgetResult(pg_conn);
+    ExecStatusType pg_status = PQresultStatus(result);
+    if (pg_status != PGRES_COMMAND_OK) {
+      AdbcStatusCode code =
+          SetError(error, result, "[libpq] Failed to execute COPY statement: 
%s %s",
+                   PQresStatus(pg_status), PQerrorMessage(pg_conn));
+      PQclear(result);
+      return code;
+    }
+
+    PQclear(result);
+    return ADBC_STATUS_OK;
+  }
+
+  AdbcStatusCode FlushCopyWriterToConn(PGconn* pg_conn,
+                                       const PostgresCopyStreamWriter& writer,
+                                       struct AdbcError* error) {
+    // https://github.com/apache/arrow-adbc/issues/1921: PostgreSQL has a max
+    // size for a single message that we need to respect (1 GiB - 1).  Since
+    // the buffer can be chunked up as much as we want, go for 16 MiB as our
+    // limit.
+    // 
https://github.com/postgres/postgres/blob/23c5a0e7d43bc925c6001538f04a458933a11fc1/src/common/stringinfo.c#L28
+    constexpr int64_t kMaxCopyBufferSize = 0x1000000;
+    ArrowBuffer buffer = writer.WriteBuffer();
+
+    auto* data = reinterpret_cast<char*>(buffer.data);
+    int64_t remaining = buffer.size_bytes;
+    while (remaining > 0) {
+      int64_t to_write = std::min<int64_t>(remaining, kMaxCopyBufferSize);
+      if (PQputCopyData(pg_conn, data, to_write) <= 0) {
+        SetError(error, "Error writing tuple field data: %s", 
PQerrorMessage(pg_conn));
+        return ADBC_STATUS_IO;
+      }
+      remaining -= to_write;
+      data += to_write;
+    }
+
+    return ADBC_STATUS_OK;
+  }
+};
+}  // namespace adbcpq
diff --git a/c/driver/postgresql/meson.build b/c/driver/postgresql/meson.build
index 179cae59b..ac075417f 100644
--- a/c/driver/postgresql/meson.build
+++ b/c/driver/postgresql/meson.build
@@ -25,6 +25,7 @@ adbc_postgres_driver_lib = library(
         'database.cc',
         'postgresql.cc',
         'result_helper.cc',
+        'result_reader.cc',
         'statement.cc',
     ],
     include_directories: [include_dir, c_dir],
diff --git a/c/driver/postgresql/postgresql_test.cc 
b/c/driver/postgresql/postgresql_test.cc
index ff3dc0b70..c45168ef6 100644
--- a/c/driver/postgresql/postgresql_test.cc
+++ b/c/driver/postgresql/postgresql_test.cc
@@ -1302,6 +1302,140 @@ TEST_F(PostgresStatementTest, 
ExecuteSchemaParameterizedQuery) {
   ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
 }
 
+TEST_F(PostgresStatementTest, ExecuteParameterizedQueryWithResult) {
+  nanoarrow::UniqueSchema schema_bind;
+  ArrowSchemaInit(schema_bind.get());
+  ASSERT_THAT(ArrowSchemaSetTypeStruct(schema_bind.get(), 1),
+              adbc_validation::IsOkErrno());
+  ASSERT_THAT(ArrowSchemaSetType(schema_bind->children[0], 
NANOARROW_TYPE_INT32),
+              adbc_validation::IsOkErrno());
+
+  nanoarrow::UniqueArray bind;
+  ASSERT_THAT(ArrowArrayInitFromSchema(bind.get(), schema_bind.get(), nullptr),
+              adbc_validation::IsOkErrno());
+  ASSERT_THAT(ArrowArrayStartAppending(bind.get()), 
adbc_validation::IsOkErrno());
+  ASSERT_THAT(ArrowArrayAppendInt(bind->children[0], 123), 
adbc_validation::IsOkErrno());
+  ASSERT_THAT(ArrowArrayFinishElement(bind.get()), 
adbc_validation::IsOkErrno());
+  ASSERT_THAT(ArrowArrayAppendInt(bind->children[0], 456), 
adbc_validation::IsOkErrno());
+  ASSERT_THAT(ArrowArrayFinishElement(bind.get()), 
adbc_validation::IsOkErrno());
+  ASSERT_THAT(ArrowArrayAppendNull(bind->children[0], 1), 
adbc_validation::IsOkErrno());
+  ASSERT_THAT(ArrowArrayFinishElement(bind.get()), 
adbc_validation::IsOkErrno());
+  ASSERT_THAT(ArrowArrayFinishBuildingDefault(bind.get(), nullptr),
+              adbc_validation::IsOkErrno());
+
+  ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), 
IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT $1", &error),
+              IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementBind(&statement, bind.get(), schema_bind.get(), 
&error),
+              IsOkStatus());
+
+  {
+    adbc_validation::StreamReader reader;
+    ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
+                                          &reader.rows_affected, &error),
+                IsOkStatus(&error));
+    ASSERT_EQ(reader.rows_affected, -1);
+    ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
+    ASSERT_EQ(reader.schema->n_children, 1);
+
+    ASSERT_NO_FATAL_FAILURE(reader.Next());
+    ASSERT_EQ(reader.array->length, 1);
+    
ASSERT_EQ(reader.array_view->children[0]->buffer_views[1].data.as_int32[0], 
123);
+
+    ASSERT_NO_FATAL_FAILURE(reader.Next());
+    ASSERT_EQ(reader.array->length, 1);
+    
ASSERT_EQ(reader.array_view->children[0]->buffer_views[1].data.as_int32[0], 
456);
+
+    ASSERT_NO_FATAL_FAILURE(reader.Next());
+    ASSERT_EQ(reader.array->length, 1);
+    ASSERT_EQ(reader.array->children[0]->null_count, 1);
+
+    ASSERT_NO_FATAL_FAILURE(reader.Next());
+    ASSERT_EQ(reader.array->release, nullptr);
+  }
+}
+
+TEST_F(PostgresStatementTest, ExecuteParameterizedQueryWithRowsAffected) {
+  // Check that when executing one or more parameterized queries that the 
corresponding
+  // affected row count is added.
+  ASSERT_THAT(quirks()->DropTable(&connection, "adbc_test", &error), 
IsOkStatus(&error));
+
+  ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), 
IsOkStatus(&error));
+
+  {
+    ASSERT_THAT(
+        AdbcStatementSetSqlQuery(&statement, "CREATE TABLE adbc_test (ints 
INT)", &error),
+        IsOkStatus(&error));
+    adbc_validation::StreamReader reader;
+    ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
+                                          &reader.rows_affected, &error),
+                IsOkStatus(&error));
+    ASSERT_EQ(reader.rows_affected, -1);
+    ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
+    ASSERT_NO_FATAL_FAILURE(reader.Next());
+    ASSERT_EQ(reader.array->release, nullptr);
+  }
+
+  {
+    // Use INSERT INTO
+    ASSERT_THAT(
+        AdbcStatementSetSqlQuery(
+            &statement, "INSERT INTO adbc_test (ints) VALUES (123), (456)", 
&error),
+        IsOkStatus(&error));
+    adbc_validation::StreamReader reader;
+    ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
+                                          &reader.rows_affected, &error),
+                IsOkStatus(&error));
+    ASSERT_EQ(reader.rows_affected, 2);
+    ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
+    ASSERT_NO_FATAL_FAILURE(reader.Next());
+    ASSERT_EQ(reader.array->release, nullptr);
+  }
+
+  nanoarrow::UniqueSchema schema_bind;
+  ArrowSchemaInit(schema_bind.get());
+  ASSERT_THAT(ArrowSchemaSetTypeStruct(schema_bind.get(), 1),
+              adbc_validation::IsOkErrno());
+  ASSERT_THAT(ArrowSchemaSetType(schema_bind->children[0], 
NANOARROW_TYPE_INT32),
+              adbc_validation::IsOkErrno());
+
+  nanoarrow::UniqueArray bind;
+  ASSERT_THAT(ArrowArrayInitFromSchema(bind.get(), schema_bind.get(), nullptr),
+              adbc_validation::IsOkErrno());
+  ASSERT_THAT(ArrowArrayStartAppending(bind.get()), 
adbc_validation::IsOkErrno());
+  ASSERT_THAT(ArrowArrayAppendInt(bind->children[0], 123), 
adbc_validation::IsOkErrno());
+  ASSERT_THAT(ArrowArrayFinishElement(bind.get()), 
adbc_validation::IsOkErrno());
+  ASSERT_THAT(ArrowArrayAppendInt(bind->children[0], 456), 
adbc_validation::IsOkErrno());
+  ASSERT_THAT(ArrowArrayFinishElement(bind.get()), 
adbc_validation::IsOkErrno());
+  ASSERT_THAT(ArrowArrayFinishBuildingDefault(bind.get(), nullptr),
+              adbc_validation::IsOkErrno());
+
+  ASSERT_THAT(AdbcStatementSetSqlQuery(&statement,
+                                       "DELETE FROM adbc_test WHERE ints = 
$1", &error),
+              IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementBind(&statement, bind.get(), schema_bind.get(), 
&error),
+              IsOkStatus());
+
+  {
+    int64_t rows_affected = -2;
+    ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, 
&error),
+                IsOkStatus(&error));
+    ASSERT_EQ(rows_affected, 2);
+  }
+
+  {
+    ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT * from 
adbc_test", &error),
+                IsOkStatus(&error));
+    adbc_validation::StreamReader reader;
+    ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
+                                          &reader.rows_affected, &error),
+                IsOkStatus(&error));
+    ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
+    ASSERT_NO_FATAL_FAILURE(reader.Next());
+    ASSERT_EQ(reader.array->release, nullptr);
+  }
+}
+
 TEST_F(PostgresStatementTest, BatchSizeHint) {
   ASSERT_THAT(quirks()->EnsureSampleTable(&connection, "batch_size_hint_test", 
&error),
               IsOkStatus(&error));
diff --git a/c/driver/postgresql/result_helper.cc 
b/c/driver/postgresql/result_helper.cc
index df890a7c5..157b100b7 100644
--- a/c/driver/postgresql/result_helper.cc
+++ b/c/driver/postgresql/result_helper.cc
@@ -20,7 +20,6 @@
 #include <charconv>
 #include <memory>
 
-#include "copy/reader.h"
 #include "driver/common/utils.h"
 #include "error.h"
 
@@ -216,157 +215,4 @@ int64_t PqResultHelper::AffectedRows() {
   }
 }
 
-int PqResultArrayReader::GetSchema(struct ArrowSchema* out) {
-  ResetErrors();
-
-  if (schema_->release == nullptr) {
-    AdbcStatusCode status = Initialize(&error_);
-    if (status != ADBC_STATUS_OK) {
-      return EINVAL;
-    }
-  }
-
-  return ArrowSchemaDeepCopy(schema_.get(), out);
-}
-
-int PqResultArrayReader::GetNext(struct ArrowArray* out) {
-  ResetErrors();
-
-  if (schema_->release == nullptr) {
-    AdbcStatusCode status = Initialize(&error_);
-    if (status != ADBC_STATUS_OK) {
-      return EINVAL;
-    }
-  }
-
-  if (!helper_.HasResult()) {
-    out->release = nullptr;
-    return NANOARROW_OK;
-  }
-
-  nanoarrow::UniqueArray tmp;
-  NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromSchema(tmp.get(), schema_.get(), 
&na_error_));
-  NANOARROW_RETURN_NOT_OK(ArrowArrayStartAppending(tmp.get()));
-  for (int i = 0; i < helper_.NumColumns(); i++) {
-    NANOARROW_RETURN_NOT_OK(field_readers_[i]->InitArray(tmp->children[i]));
-  }
-
-  // TODO: If we get an EOVERFLOW here (e.g., big string data), we
-  // would need to keep track of what row number we're on and start
-  // from there instead of begin() on the next call. We could also
-  // respect the size hint here to chunk the batches.
-  struct ArrowBufferView item;
-  for (auto it = helper_.begin(); it != helper_.end(); it++) {
-    auto row = *it;
-    for (int i = 0; i < helper_.NumColumns(); i++) {
-      auto pg_item = row[i];
-      item.data.data = pg_item.data;
-
-      if (pg_item.is_null) {
-        item.size_bytes = -1;
-      } else {
-        item.size_bytes = pg_item.len;
-      }
-
-      NANOARROW_RETURN_NOT_OK(
-          field_readers_[i]->Read(&item, item.size_bytes, tmp->children[i], 
&na_error_));
-    }
-  }
-
-  for (int i = 0; i < helper_.NumColumns(); i++) {
-    NANOARROW_RETURN_NOT_OK(field_readers_[i]->FinishArray(tmp->children[i], 
&na_error_));
-  }
-
-  tmp->length = helper_.NumRows();
-  tmp->null_count = 0;
-  NANOARROW_RETURN_NOT_OK(ArrowArrayFinishBuildingDefault(tmp.get(), 
&na_error_));
-
-  // Ensure that the next call to GetNext() will signal the end of the stream
-  helper_.ClearResult();
-
-  // Canonically return zero-size results as an empty stream
-  if (tmp->length == 0) {
-    out->release = nullptr;
-    return NANOARROW_OK;
-  }
-
-  ArrowArrayMove(tmp.get(), out);
-  return NANOARROW_OK;
-}
-
-const char* PqResultArrayReader::GetLastError() {
-  if (error_.message != nullptr) {
-    return error_.message;
-  } else {
-    return na_error_.message;
-  }
-}
-
-AdbcStatusCode PqResultArrayReader::Initialize(struct AdbcError* error) {
-  helper_.set_output_format(PqResultHelper::Format::kBinary);
-  RAISE_ADBC(helper_.Execute(error));
-
-  ArrowSchemaInit(schema_.get());
-  CHECK_NA_DETAIL(INTERNAL, ArrowSchemaSetTypeStruct(schema_.get(), 
helper_.NumColumns()),
-                  &na_error_, error);
-
-  for (int i = 0; i < helper_.NumColumns(); i++) {
-    PostgresType child_type;
-    CHECK_NA_DETAIL(INTERNAL,
-                    type_resolver_->Find(helper_.FieldType(i), &child_type, 
&na_error_),
-                    &na_error_, error);
-
-    CHECK_NA(INTERNAL, child_type.SetSchema(schema_->children[i]), error);
-    CHECK_NA(INTERNAL, ArrowSchemaSetName(schema_->children[i], 
helper_.FieldName(i)),
-             error);
-
-    std::unique_ptr<PostgresCopyFieldReader> child_reader;
-    CHECK_NA_DETAIL(
-        INTERNAL,
-        MakeCopyFieldReader(child_type, schema_->children[i], &child_reader, 
&na_error_),
-        &na_error_, error);
-
-    child_reader->Init(child_type);
-    CHECK_NA_DETAIL(INTERNAL, child_reader->InitSchema(schema_->children[i]), 
&na_error_,
-                    error);
-
-    field_readers_.push_back(std::move(child_reader));
-  }
-
-  return ADBC_STATUS_OK;
-}
-
-AdbcStatusCode PqResultArrayReader::ToArrayStream(int64_t* affected_rows,
-                                                  struct ArrowArrayStream* out,
-                                                  struct AdbcError* error) {
-  if (out == nullptr) {
-    // If there is no output requested, we still need to execute and set
-    // affected_rows if needed. We don't need an output schema or to set
-    // up a copy reader, so we can skip those steps by going straight
-    // to Execute(). This also enables us to support queries with multiple
-    // statements because we can call PQexec() instead of PQexecParams().
-    RAISE_ADBC(helper_.Execute(error));
-
-    if (affected_rows != nullptr) {
-      *affected_rows = helper_.AffectedRows();
-    }
-
-    return ADBC_STATUS_OK;
-  }
-
-  // Execute eagerly. We need this to provide row counts for DELETE and
-  // CREATE TABLE queries as well as to provide more informative errors
-  // until this reader class is wired up to provide extended AdbcError
-  // information.
-  RAISE_ADBC(Initialize(error));
-  if (affected_rows != nullptr) {
-    *affected_rows = helper_.AffectedRows();
-  }
-
-  nanoarrow::ArrayStreamFactory<PqResultArrayReader>::InitArrayStream(
-      new PqResultArrayReader(this), out);
-
-  return ADBC_STATUS_OK;
-}
-
 }  // namespace adbcpq
diff --git a/c/driver/postgresql/result_helper.h 
b/c/driver/postgresql/result_helper.h
index 43083b8bc..18de7958b 100644
--- a/c/driver/postgresql/result_helper.h
+++ b/c/driver/postgresql/result_helper.h
@@ -109,6 +109,11 @@ class PqResultHelper {
 
   bool HasResult() { return result_ != nullptr; }
 
+  void SetResult(PGresult* result) {
+    ClearResult();
+    result_ = result;
+  }
+
   PGresult* ReleaseResult();
 
   void ClearResult() {
@@ -169,51 +174,4 @@ class PqResultHelper {
                                  struct AdbcError* error);
 };
 
-class PqResultArrayReader {
- public:
-  PqResultArrayReader(PGconn* conn, std::shared_ptr<PostgresTypeResolver> 
type_resolver,
-                      std::string query)
-      : helper_(conn, std::move(query)), type_resolver_(type_resolver) {
-    ArrowErrorInit(&na_error_);
-    error_ = ADBC_ERROR_INIT;
-  }
-
-  ~PqResultArrayReader() { ResetErrors(); }
-
-  int GetSchema(struct ArrowSchema* out);
-  int GetNext(struct ArrowArray* out);
-  const char* GetLastError();
-
-  AdbcStatusCode ToArrayStream(int64_t* affected_rows, struct 
ArrowArrayStream* out,
-                               struct AdbcError* error);
-
-  AdbcStatusCode Initialize(struct AdbcError* error);
-
- private:
-  PqResultHelper helper_;
-  std::shared_ptr<PostgresTypeResolver> type_resolver_;
-  std::vector<std::unique_ptr<PostgresCopyFieldReader>> field_readers_;
-  nanoarrow::UniqueSchema schema_;
-  struct AdbcError error_;
-  struct ArrowError na_error_;
-
-  explicit PqResultArrayReader(PqResultArrayReader* other)
-      : helper_(std::move(other->helper_)),
-        type_resolver_(std::move(other->type_resolver_)),
-        field_readers_(std::move(other->field_readers_)),
-        schema_(std::move(other->schema_)) {
-    ArrowErrorInit(&na_error_);
-    error_ = ADBC_ERROR_INIT;
-  }
-
-  void ResetErrors() {
-    ArrowErrorInit(&na_error_);
-
-    if (error_.private_data != nullptr) {
-      error_.release(&error_);
-    }
-    error_ = ADBC_ERROR_INIT;
-  }
-};
-
 }  // namespace adbcpq
diff --git a/c/driver/postgresql/result_reader.cc 
b/c/driver/postgresql/result_reader.cc
new file mode 100644
index 000000000..21bc2bdbc
--- /dev/null
+++ b/c/driver/postgresql/result_reader.cc
@@ -0,0 +1,276 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "result_reader.h"
+
+#include <memory>
+#include <utility>
+
+#include "copy/reader.h"
+#include "driver/common/utils.h"
+
+#include "error.h"
+
+namespace adbcpq {
+
+int PqResultArrayReader::GetSchema(struct ArrowSchema* out) {
+  ResetErrors();
+
+  if (schema_->release == nullptr) {
+    AdbcStatusCode status = Initialize(nullptr, &error_);
+    if (status != ADBC_STATUS_OK) {
+      return EINVAL;
+    }
+  }
+
+  return ArrowSchemaDeepCopy(schema_.get(), out);
+}
+
+int PqResultArrayReader::GetNext(struct ArrowArray* out) {
+  ResetErrors();
+
+  AdbcStatusCode status;
+  if (schema_->release == nullptr) {
+    AdbcStatusCode status = Initialize(nullptr, &error_);
+    if (status != ADBC_STATUS_OK) {
+      return EINVAL;
+    }
+  }
+
+  // If don't already have a result, populate it by binding the next row
+  // in the bind stream. If this is the first call to GetNext(), we have
+  // already populated the result.
+  if (!helper_.HasResult()) {
+    // If there was no bind stream provided or the existing bind stream has 
been
+    // exhausted, we are done.
+    if (!bind_stream_) {
+      out->release = nullptr;
+      return NANOARROW_OK;
+    }
+
+    // Keep binding and executing until we have a result to return
+    status = BindNextAndExecute(nullptr, &error_);
+    if (status != ADBC_STATUS_OK) {
+      return EIO;
+    }
+
+    // It's possible that there is still nothing to do here
+    if (!helper_.HasResult()) {
+      out->release = nullptr;
+      return NANOARROW_OK;
+    }
+  }
+
+  nanoarrow::UniqueArray tmp;
+  NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromSchema(tmp.get(), schema_.get(), 
&na_error_));
+  NANOARROW_RETURN_NOT_OK(ArrowArrayStartAppending(tmp.get()));
+  for (int i = 0; i < helper_.NumColumns(); i++) {
+    NANOARROW_RETURN_NOT_OK(field_readers_[i]->InitArray(tmp->children[i]));
+  }
+
+  // TODO: If we get an EOVERFLOW here (e.g., big string data), we
+  // would need to keep track of what row number we're on and start
+  // from there instead of begin() on the next call. We could also
+  // respect the size hint here to chunk the batches.
+  struct ArrowBufferView item;
+  for (auto it = helper_.begin(); it != helper_.end(); it++) {
+    auto row = *it;
+    for (int i = 0; i < helper_.NumColumns(); i++) {
+      auto pg_item = row[i];
+      item.data.data = pg_item.data;
+
+      if (pg_item.is_null) {
+        item.size_bytes = -1;
+      } else {
+        item.size_bytes = pg_item.len;
+      }
+
+      NANOARROW_RETURN_NOT_OK(
+          field_readers_[i]->Read(&item, item.size_bytes, tmp->children[i], 
&na_error_));
+    }
+  }
+
+  for (int i = 0; i < helper_.NumColumns(); i++) {
+    NANOARROW_RETURN_NOT_OK(field_readers_[i]->FinishArray(tmp->children[i], 
&na_error_));
+  }
+
+  tmp->length = helper_.NumRows();
+  tmp->null_count = 0;
+  NANOARROW_RETURN_NOT_OK(ArrowArrayFinishBuildingDefault(tmp.get(), 
&na_error_));
+
+  // Signal that the next call to GetNext() will have to populate the result 
again
+  helper_.ClearResult();
+
+  // Canonically return zero-size results as an empty stream
+  if (tmp->length == 0) {
+    out->release = nullptr;
+    return NANOARROW_OK;
+  }
+
+  ArrowArrayMove(tmp.get(), out);
+  return NANOARROW_OK;
+}
+
+const char* PqResultArrayReader::GetLastError() {
+  if (error_.message != nullptr) {
+    return error_.message;
+  } else {
+    return na_error_.message;
+  }
+}
+
+AdbcStatusCode PqResultArrayReader::Initialize(int64_t* rows_affected,
+                                               struct AdbcError* error) {
+  helper_.set_output_format(PqResultHelper::Format::kBinary);
+  helper_.set_param_format(PqResultHelper::Format::kBinary);
+
+  // If we have to do binding, set up the bind stream an execute until
+  // there is a result with more than zero rows to populate.
+  if (bind_stream_) {
+    RAISE_ADBC(bind_stream_->Begin([] { return ADBC_STATUS_OK; }, error));
+    RAISE_ADBC(bind_stream_->SetParamTypes(*type_resolver_, error));
+    RAISE_ADBC(helper_.Prepare(bind_stream_->param_types, error));
+
+    RAISE_ADBC(BindNextAndExecute(nullptr, error));
+
+    // If there were no arrays in the bind stream, we still need a result
+    // to populate the schema. If there were any arrays in the bind stream,
+    // the last one will still be in helper_ even if it had zero rows.
+    if (!helper_.HasResult()) {
+      RAISE_ADBC(helper_.DescribePrepared(error));
+    }
+
+    // We can't provide affected row counts if there is a bind stream and
+    // an output because we don't know how many future bind arrays/rows there
+    // might be.
+    if (rows_affected != nullptr) {
+      *rows_affected = -1;
+    }
+  } else {
+    RAISE_ADBC(helper_.Execute(error));
+    if (rows_affected != nullptr) {
+      *rows_affected = helper_.AffectedRows();
+    }
+  }
+
+  // Build the schema for which we are about to build results
+  ArrowSchemaInit(schema_.get());
+  CHECK_NA_DETAIL(INTERNAL, ArrowSchemaSetTypeStruct(schema_.get(), 
helper_.NumColumns()),
+                  &na_error_, error);
+
+  for (int i = 0; i < helper_.NumColumns(); i++) {
+    PostgresType child_type;
+    CHECK_NA_DETAIL(INTERNAL,
+                    type_resolver_->Find(helper_.FieldType(i), &child_type, 
&na_error_),
+                    &na_error_, error);
+
+    CHECK_NA(INTERNAL, child_type.SetSchema(schema_->children[i]), error);
+    CHECK_NA(INTERNAL, ArrowSchemaSetName(schema_->children[i], 
helper_.FieldName(i)),
+             error);
+
+    std::unique_ptr<PostgresCopyFieldReader> child_reader;
+    CHECK_NA_DETAIL(
+        INTERNAL,
+        MakeCopyFieldReader(child_type, schema_->children[i], &child_reader, 
&na_error_),
+        &na_error_, error);
+
+    child_reader->Init(child_type);
+    CHECK_NA_DETAIL(INTERNAL, child_reader->InitSchema(schema_->children[i]), 
&na_error_,
+                    error);
+
+    field_readers_.push_back(std::move(child_reader));
+  }
+
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode PqResultArrayReader::ToArrayStream(int64_t* affected_rows,
+                                                  struct ArrowArrayStream* out,
+                                                  struct AdbcError* error) {
+  if (out == nullptr) {
+    // If there is no output requested, we still need to execute and
+    // set affected_rows if needed. We don't need an output schema or to set 
up a copy
+    // reader, so we can skip those steps by going straight to Execute(). This 
also
+    // enables us to support queries with multiple statements because we can 
call PQexec()
+    // instead of PQexecParams().
+    RAISE_ADBC(ExecuteAll(affected_rows, error));
+    return ADBC_STATUS_OK;
+  }
+
+  // Otherwise, execute until we have a result to return. We need this to 
provide row
+  // counts for DELETE and CREATE TABLE queries as well as to provide more 
informative
+  // errors until this reader class is wired up to provide extended AdbcError 
information.
+  RAISE_ADBC(Initialize(affected_rows, error));
+
+  nanoarrow::ArrayStreamFactory<PqResultArrayReader>::InitArrayStream(
+      new PqResultArrayReader(this), out);
+
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode PqResultArrayReader::BindNextAndExecute(int64_t* affected_rows,
+                                                       AdbcError* error) {
+  // Keep pulling from the bind stream and executing as long as
+  // we receive results with zero rows.
+  do {
+    RAISE_ADBC(bind_stream_->EnsureNextRow(error));
+    if (!bind_stream_->current->release) {
+      RAISE_ADBC(bind_stream_->Cleanup(conn_, error));
+      bind_stream_.reset();
+      return ADBC_STATUS_OK;
+    }
+
+    PGresult* result;
+    RAISE_ADBC(bind_stream_->BindAndExecuteCurrentRow(
+        conn_, &result, /*result_format*/ kPgBinaryFormat, error));
+    helper_.SetResult(result);
+    if (affected_rows) {
+      (*affected_rows) += helper_.AffectedRows();
+    }
+  } while (helper_.NumRows() == 0);
+
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode PqResultArrayReader::ExecuteAll(int64_t* affected_rows, 
AdbcError* error) {
+  // For the case where we don't need a result, we either need to exhaust the 
bind
+  // stream (if there is one) or execute the query without binding.
+  if (bind_stream_) {
+    RAISE_ADBC(bind_stream_->Begin([] { return ADBC_STATUS_OK; }, error));
+    RAISE_ADBC(bind_stream_->SetParamTypes(*type_resolver_, error));
+    RAISE_ADBC(helper_.Prepare(bind_stream_->param_types, error));
+
+    // Reset affected rows to zero before binding and executing any
+    if (affected_rows) {
+      (*affected_rows) = 0;
+    }
+
+    do {
+      RAISE_ADBC(BindNextAndExecute(affected_rows, error));
+    } while (bind_stream_);
+  } else {
+    RAISE_ADBC(helper_.Execute(error));
+
+    if (affected_rows != nullptr) {
+      *affected_rows = helper_.AffectedRows();
+    }
+  }
+
+  return ADBC_STATUS_OK;
+}
+
+}  // namespace adbcpq
diff --git a/c/driver/postgresql/result_reader.h 
b/c/driver/postgresql/result_reader.h
new file mode 100644
index 000000000..51da6399e
--- /dev/null
+++ b/c/driver/postgresql/result_reader.h
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <libpq-fe.h>
+
+#include "bind_stream.h"
+#include "copy/reader.h"
+#include "result_helper.h"
+
+namespace adbcpq {
+
+class PqResultArrayReader {
+ public:
+  PqResultArrayReader(PGconn* conn, std::shared_ptr<PostgresTypeResolver> 
type_resolver,
+                      std::string query)
+      : conn_(conn), helper_(conn, std::move(query)), 
type_resolver_(type_resolver) {
+    ArrowErrorInit(&na_error_);
+    error_ = ADBC_ERROR_INIT;
+  }
+
+  ~PqResultArrayReader() { ResetErrors(); }
+
+  void SetBind(struct ArrowArrayStream* stream) {
+    bind_stream_ = std::make_unique<BindStream>();
+    bind_stream_->SetBind(stream);
+  }
+
+  int GetSchema(struct ArrowSchema* out);
+  int GetNext(struct ArrowArray* out);
+  const char* GetLastError();
+
+  AdbcStatusCode ToArrayStream(int64_t* affected_rows, struct 
ArrowArrayStream* out,
+                               struct AdbcError* error);
+
+  AdbcStatusCode Initialize(int64_t* affected_rows, struct AdbcError* error);
+
+ private:
+  PGconn* conn_;
+  PqResultHelper helper_;
+  std::unique_ptr<BindStream> bind_stream_;
+  std::shared_ptr<PostgresTypeResolver> type_resolver_;
+  std::vector<std::unique_ptr<PostgresCopyFieldReader>> field_readers_;
+  nanoarrow::UniqueSchema schema_;
+  struct AdbcError error_;
+  struct ArrowError na_error_;
+
+  explicit PqResultArrayReader(PqResultArrayReader* other)
+      : conn_(other->conn_),
+        helper_(std::move(other->helper_)),
+        bind_stream_(std::move(other->bind_stream_)),
+        type_resolver_(std::move(other->type_resolver_)),
+        field_readers_(std::move(other->field_readers_)),
+        schema_(std::move(other->schema_)) {
+    ArrowErrorInit(&na_error_);
+    error_ = ADBC_ERROR_INIT;
+  }
+
+  AdbcStatusCode BindNextAndExecute(int64_t* affected_rows, AdbcError* error);
+  AdbcStatusCode ExecuteAll(int64_t* affected_rows, AdbcError* error);
+
+  void ResetErrors() {
+    ArrowErrorInit(&na_error_);
+
+    if (error_.private_data != nullptr) {
+      error_.release(&error_);
+    }
+    error_ = ADBC_ERROR_INIT;
+  }
+};
+
+}  // namespace adbcpq
diff --git a/c/driver/postgresql/statement.cc b/c/driver/postgresql/statement.cc
index c6e012581..224472bdc 100644
--- a/c/driver/postgresql/statement.cc
+++ b/c/driver/postgresql/statement.cc
@@ -36,613 +36,18 @@
 #include <libpq-fe.h>
 #include <nanoarrow/nanoarrow.hpp>
 
+#include "bind_stream.h"
 #include "connection.h"
-#include "copy/writer.h"
 #include "driver/common/options.h"
 #include "driver/common/utils.h"
 #include "error.h"
 #include "postgres_type.h"
 #include "postgres_util.h"
 #include "result_helper.h"
+#include "result_reader.h"
 
 namespace adbcpq {
 
-namespace {
-/// The flag indicating to PostgreSQL that we want binary-format values.
-constexpr int kPgBinaryFormat = 1;
-
-/// One-value ArrowArrayStream used to unify the implementations of Bind
-struct OneValueStream {
-  struct ArrowSchema schema;
-  struct ArrowArray array;
-
-  static int GetSchema(struct ArrowArrayStream* self, struct ArrowSchema* out) 
{
-    OneValueStream* stream = static_cast<OneValueStream*>(self->private_data);
-    return ArrowSchemaDeepCopy(&stream->schema, out);
-  }
-  static int GetNext(struct ArrowArrayStream* self, struct ArrowArray* out) {
-    OneValueStream* stream = static_cast<OneValueStream*>(self->private_data);
-    *out = stream->array;
-    stream->array.release = nullptr;
-    return 0;
-  }
-  static const char* GetLastError(struct ArrowArrayStream* self) { return 
NULL; }
-  static void Release(struct ArrowArrayStream* self) {
-    OneValueStream* stream = static_cast<OneValueStream*>(self->private_data);
-    if (stream->schema.release) {
-      stream->schema.release(&stream->schema);
-      stream->schema.release = nullptr;
-    }
-    if (stream->array.release) {
-      stream->array.release(&stream->array);
-      stream->array.release = nullptr;
-    }
-    delete stream;
-    self->release = nullptr;
-  }
-};
-
-/// Helper to manage bind parameters with a prepared statement
-struct BindStream {
-  Handle<struct ArrowArrayStream> bind;
-  Handle<struct ArrowSchema> bind_schema;
-  struct ArrowSchemaView bind_schema_view;
-  std::vector<struct ArrowSchemaView> bind_schema_fields;
-
-  // OIDs for parameter types
-  std::vector<uint32_t> param_types;
-  std::vector<char*> param_values;
-  std::vector<int> param_lengths;
-  std::vector<int> param_formats;
-  std::vector<size_t> param_values_offsets;
-  std::vector<char> param_values_buffer;
-  // XXX: this assumes fixed-length fields only - will need more
-  // consideration to deal with variable-length fields
-
-  bool has_tz_field = false;
-  std::string tz_setting;
-
-  struct ArrowError na_error;
-
-  explicit BindStream(struct ArrowArrayStream&& bind) {
-    this->bind.value = std::move(bind);
-    std::memset(&na_error, 0, sizeof(na_error));
-  }
-
-  template <typename Callback>
-  AdbcStatusCode Begin(Callback&& callback, struct AdbcError* error) {
-    CHECK_NA(INTERNAL, bind->get_schema(&bind.value, &bind_schema.value), 
error);
-    CHECK_NA(
-        INTERNAL,
-        ArrowSchemaViewInit(&bind_schema_view, &bind_schema.value, /*error*/ 
nullptr),
-        error);
-
-    if (bind_schema_view.type != ArrowType::NANOARROW_TYPE_STRUCT) {
-      SetError(error, "%s", "[libpq] Bind parameters must have type STRUCT");
-      return ADBC_STATUS_INVALID_STATE;
-    }
-
-    bind_schema_fields.resize(bind_schema->n_children);
-    for (size_t i = 0; i < bind_schema_fields.size(); i++) {
-      CHECK_NA(INTERNAL,
-               ArrowSchemaViewInit(&bind_schema_fields[i], 
bind_schema->children[i],
-                                   /*error*/ nullptr),
-               error);
-    }
-
-    return std::move(callback)();
-  }
-
-  AdbcStatusCode SetParamTypes(const PostgresTypeResolver& type_resolver,
-                               struct AdbcError* error) {
-    param_types.resize(bind_schema->n_children);
-    param_values.resize(bind_schema->n_children);
-    param_lengths.resize(bind_schema->n_children);
-    param_formats.resize(bind_schema->n_children, kPgBinaryFormat);
-    param_values_offsets.reserve(bind_schema->n_children);
-
-    for (size_t i = 0; i < bind_schema_fields.size(); i++) {
-      PostgresTypeId type_id;
-      switch (bind_schema_fields[i].type) {
-        case ArrowType::NANOARROW_TYPE_BOOL:
-          type_id = PostgresTypeId::kBool;
-          param_lengths[i] = 1;
-          break;
-        case ArrowType::NANOARROW_TYPE_INT8:
-        case ArrowType::NANOARROW_TYPE_INT16:
-          type_id = PostgresTypeId::kInt2;
-          param_lengths[i] = 2;
-          break;
-        case ArrowType::NANOARROW_TYPE_INT32:
-          type_id = PostgresTypeId::kInt4;
-          param_lengths[i] = 4;
-          break;
-        case ArrowType::NANOARROW_TYPE_INT64:
-          type_id = PostgresTypeId::kInt8;
-          param_lengths[i] = 8;
-          break;
-        case ArrowType::NANOARROW_TYPE_FLOAT:
-          type_id = PostgresTypeId::kFloat4;
-          param_lengths[i] = 4;
-          break;
-        case ArrowType::NANOARROW_TYPE_DOUBLE:
-          type_id = PostgresTypeId::kFloat8;
-          param_lengths[i] = 8;
-          break;
-        case ArrowType::NANOARROW_TYPE_STRING:
-        case ArrowType::NANOARROW_TYPE_LARGE_STRING:
-          type_id = PostgresTypeId::kText;
-          param_lengths[i] = 0;
-          break;
-        case ArrowType::NANOARROW_TYPE_BINARY:
-          type_id = PostgresTypeId::kBytea;
-          param_lengths[i] = 0;
-          break;
-        case ArrowType::NANOARROW_TYPE_DATE32:
-          type_id = PostgresTypeId::kDate;
-          param_lengths[i] = 4;
-          break;
-        case ArrowType::NANOARROW_TYPE_TIMESTAMP:
-          type_id = PostgresTypeId::kTimestamp;
-          param_lengths[i] = 8;
-          break;
-        case ArrowType::NANOARROW_TYPE_DURATION:
-        case ArrowType::NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO:
-          type_id = PostgresTypeId::kInterval;
-          param_lengths[i] = 16;
-          break;
-        case ArrowType::NANOARROW_TYPE_DECIMAL128:
-        case ArrowType::NANOARROW_TYPE_DECIMAL256:
-          type_id = PostgresTypeId::kNumeric;
-          param_lengths[i] = 0;
-          break;
-        case ArrowType::NANOARROW_TYPE_DICTIONARY: {
-          struct ArrowSchemaView value_view;
-          CHECK_NA(INTERNAL,
-                   ArrowSchemaViewInit(&value_view, 
bind_schema->children[i]->dictionary,
-                                       nullptr),
-                   error);
-          switch (value_view.type) {
-            case NANOARROW_TYPE_BINARY:
-            case NANOARROW_TYPE_LARGE_BINARY:
-              type_id = PostgresTypeId::kBytea;
-              param_lengths[i] = 0;
-              break;
-            case NANOARROW_TYPE_STRING:
-            case NANOARROW_TYPE_LARGE_STRING:
-              type_id = PostgresTypeId::kText;
-              param_lengths[i] = 0;
-              break;
-            default:
-              SetError(error, "%s%" PRIu64 "%s%s%s%s", "[libpq] Field #",
-                       static_cast<uint64_t>(i + 1), " ('",
-                       bind_schema->children[i]->name,
-                       "') has unsupported dictionary value parameter type ",
-                       ArrowTypeString(value_view.type));
-              return ADBC_STATUS_NOT_IMPLEMENTED;
-          }
-          break;
-        }
-        default:
-          SetError(error, "%s%" PRIu64 "%s%s%s%s", "[libpq] Field #",
-                   static_cast<uint64_t>(i + 1), " ('", 
bind_schema->children[i]->name,
-                   "') has unsupported parameter type ",
-                   ArrowTypeString(bind_schema_fields[i].type));
-          return ADBC_STATUS_NOT_IMPLEMENTED;
-      }
-
-      param_types[i] = type_resolver.GetOID(type_id);
-      if (param_types[i] == 0) {
-        SetError(error, "%s%" PRIu64 "%s%s%s%s", "[libpq] Field #",
-                 static_cast<uint64_t>(i + 1), " ('", 
bind_schema->children[i]->name,
-                 "') has type with no corresponding PostgreSQL type ",
-                 ArrowTypeString(bind_schema_fields[i].type));
-        return ADBC_STATUS_NOT_IMPLEMENTED;
-      }
-    }
-
-    size_t param_values_length = 0;
-    for (int length : param_lengths) {
-      param_values_offsets.push_back(param_values_length);
-      param_values_length += length;
-    }
-    param_values_buffer.resize(param_values_length);
-    return ADBC_STATUS_OK;
-  }
-
-  AdbcStatusCode Prepare(const PostgresConnection* conn, const std::string& 
query,
-                         struct AdbcError* error, const bool autocommit) {
-    // tz-aware timestamps require special handling to set the timezone to UTC
-    // prior to sending over the binary protocol; must be reset after execute
-    const auto pg_conn = conn->conn();
-    for (int64_t col = 0; col < bind_schema->n_children; col++) {
-      if ((bind_schema_fields[col].type == 
ArrowType::NANOARROW_TYPE_TIMESTAMP) &&
-          (strcmp("", bind_schema_fields[col].timezone))) {
-        has_tz_field = true;
-
-        if (autocommit) {
-          PGresult* begin_result = PQexec(pg_conn, "BEGIN");
-          if (PQresultStatus(begin_result) != PGRES_COMMAND_OK) {
-            AdbcStatusCode code =
-                SetError(error, begin_result,
-                         "[libpq] Failed to begin transaction for timezone 
data: %s",
-                         PQerrorMessage(pg_conn));
-            PQclear(begin_result);
-            return code;
-          }
-          PQclear(begin_result);
-        }
-
-        PGresult* get_tz_result = PQexec(pg_conn, "SELECT 
current_setting('TIMEZONE')");
-        if (PQresultStatus(get_tz_result) != PGRES_TUPLES_OK) {
-          AdbcStatusCode code = SetError(error, get_tz_result,
-                                         "[libpq] Could not query current 
timezone: %s",
-                                         PQerrorMessage(pg_conn));
-          PQclear(get_tz_result);
-          return code;
-        }
-
-        tz_setting = std::string(PQgetvalue(get_tz_result, 0, 0));
-        PQclear(get_tz_result);
-
-        PGresult* set_utc_result = PQexec(pg_conn, "SET TIME ZONE 'UTC'");
-        if (PQresultStatus(set_utc_result) != PGRES_COMMAND_OK) {
-          AdbcStatusCode code = SetError(error, set_utc_result,
-                                         "[libpq] Failed to set time zone to 
UTC: %s",
-                                         PQerrorMessage(pg_conn));
-          PQclear(set_utc_result);
-          return code;
-        }
-        PQclear(set_utc_result);
-        break;
-      }
-    }
-
-    PGresult* result = PQprepare(pg_conn, /*stmtName=*/"", query.c_str(),
-                                 /*nParams=*/bind_schema->n_children, 
param_types.data());
-    if (PQresultStatus(result) != PGRES_COMMAND_OK) {
-      AdbcStatusCode code =
-          SetError(error, result, "[libpq] Failed to prepare query: %s\nQuery 
was:%s",
-                   PQerrorMessage(pg_conn), query.c_str());
-      PQclear(result);
-      return code;
-    }
-    PQclear(result);
-    return ADBC_STATUS_OK;
-  }
-
-  AdbcStatusCode Execute(const PostgresConnection* conn, int64_t* 
rows_affected,
-                         struct AdbcError* error) {
-    if (rows_affected) *rows_affected = 0;
-    PGresult* result = nullptr;
-    const auto pg_conn = conn->conn();
-
-    while (true) {
-      Handle<struct ArrowArray> array;
-      int res = bind->get_next(&bind.value, &array.value);
-      if (res != 0) {
-        SetError(error,
-                 "[libpq] Failed to read next batch from stream of bind 
parameters: "
-                 "(%d) %s %s",
-                 res, std::strerror(res), bind->get_last_error(&bind.value));
-        return ADBC_STATUS_IO;
-      }
-      if (!array->release) break;
-
-      Handle<struct ArrowArrayView> array_view;
-      // TODO: include error messages
-      CHECK_NA(
-          INTERNAL,
-          ArrowArrayViewInitFromSchema(&array_view.value, &bind_schema.value, 
nullptr),
-          error);
-      CHECK_NA(INTERNAL, ArrowArrayViewSetArray(&array_view.value, 
&array.value, nullptr),
-               error);
-
-      for (int64_t row = 0; row < array->length; row++) {
-        for (int64_t col = 0; col < array_view->n_children; col++) {
-          if (ArrowArrayViewIsNull(array_view->children[col], row)) {
-            param_values[col] = nullptr;
-            continue;
-          } else {
-            param_values[col] = param_values_buffer.data() + 
param_values_offsets[col];
-          }
-          switch (bind_schema_fields[col].type) {
-            case ArrowType::NANOARROW_TYPE_BOOL: {
-              const int8_t val = ArrowBitGet(
-                  array_view->children[col]->buffer_views[1].data.as_uint8, 
row);
-              std::memcpy(param_values[col], &val, sizeof(int8_t));
-              break;
-            }
-
-            case ArrowType::NANOARROW_TYPE_INT8: {
-              const int16_t val =
-                  array_view->children[col]->buffer_views[1].data.as_int8[row];
-              const uint16_t value = ToNetworkInt16(val);
-              std::memcpy(param_values[col], &value, sizeof(int16_t));
-              break;
-            }
-            case ArrowType::NANOARROW_TYPE_INT16: {
-              const uint16_t value = ToNetworkInt16(
-                  
array_view->children[col]->buffer_views[1].data.as_int16[row]);
-              std::memcpy(param_values[col], &value, sizeof(int16_t));
-              break;
-            }
-            case ArrowType::NANOARROW_TYPE_INT32: {
-              const uint32_t value = ToNetworkInt32(
-                  
array_view->children[col]->buffer_views[1].data.as_int32[row]);
-              std::memcpy(param_values[col], &value, sizeof(int32_t));
-              break;
-            }
-            case ArrowType::NANOARROW_TYPE_INT64: {
-              const int64_t value = ToNetworkInt64(
-                  
array_view->children[col]->buffer_views[1].data.as_int64[row]);
-              std::memcpy(param_values[col], &value, sizeof(int64_t));
-              break;
-            }
-            case ArrowType::NANOARROW_TYPE_FLOAT: {
-              const uint32_t value = ToNetworkFloat4(
-                  
array_view->children[col]->buffer_views[1].data.as_float[row]);
-              std::memcpy(param_values[col], &value, sizeof(uint32_t));
-              break;
-            }
-            case ArrowType::NANOARROW_TYPE_DOUBLE: {
-              const uint64_t value = ToNetworkFloat8(
-                  
array_view->children[col]->buffer_views[1].data.as_double[row]);
-              std::memcpy(param_values[col], &value, sizeof(uint64_t));
-              break;
-            }
-            case ArrowType::NANOARROW_TYPE_STRING:
-            case ArrowType::NANOARROW_TYPE_LARGE_STRING:
-            case ArrowType::NANOARROW_TYPE_BINARY: {
-              const ArrowBufferView view =
-                  ArrowArrayViewGetBytesUnsafe(array_view->children[col], row);
-              // TODO: overflow check?
-              param_lengths[col] = static_cast<int>(view.size_bytes);
-              param_values[col] = const_cast<char*>(view.data.as_char);
-              break;
-            }
-            case ArrowType::NANOARROW_TYPE_DATE32: {
-              // 2000-01-01
-              constexpr int32_t kPostgresDateEpoch = 10957;
-              const int32_t raw_value =
-                  
array_view->children[col]->buffer_views[1].data.as_int32[row];
-              if (raw_value < INT32_MIN + kPostgresDateEpoch) {
-                SetError(error, "[libpq] Field #%" PRId64 "%s%s%s%" PRId64 
"%s", col + 1,
-                         "('", bind_schema->children[col]->name, "') Row #", 
row + 1,
-                         "has value which exceeds postgres date limits");
-                return ADBC_STATUS_INVALID_ARGUMENT;
-              }
-
-              const uint32_t value = ToNetworkInt32(raw_value - 
kPostgresDateEpoch);
-              std::memcpy(param_values[col], &value, sizeof(int32_t));
-              break;
-            }
-            case ArrowType::NANOARROW_TYPE_DURATION:
-            case ArrowType::NANOARROW_TYPE_TIMESTAMP: {
-              int64_t val = 
array_view->children[col]->buffer_views[1].data.as_int64[row];
-
-              bool overflow_safe = true;
-
-              auto unit = bind_schema_fields[col].time_unit;
-
-              switch (unit) {
-                case NANOARROW_TIME_UNIT_SECOND:
-                  overflow_safe =
-                      val <= kMaxSafeSecondsToMicros && val >= 
kMinSafeSecondsToMicros;
-                  if (overflow_safe) {
-                    val *= 1000000;
-                  }
-
-                  break;
-                case NANOARROW_TIME_UNIT_MILLI:
-                  overflow_safe =
-                      val <= kMaxSafeMillisToMicros && val >= 
kMinSafeMillisToMicros;
-                  if (overflow_safe) {
-                    val *= 1000;
-                  }
-                  break;
-                case NANOARROW_TIME_UNIT_MICRO:
-                  break;
-                case NANOARROW_TIME_UNIT_NANO:
-                  val /= 1000;
-                  break;
-              }
-
-              if (!overflow_safe) {
-                SetError(error,
-                         "[libpq] Field #%" PRId64 " ('%s') Row #%" PRId64
-                         " has value '%" PRIi64
-                         "' which exceeds PostgreSQL timestamp limits",
-                         col + 1, bind_schema->children[col]->name, row + 1,
-                         
array_view->children[col]->buffer_views[1].data.as_int64[row]);
-                return ADBC_STATUS_INVALID_ARGUMENT;
-              }
-
-              if (val < (std::numeric_limits<int64_t>::min)() + 
kPostgresTimestampEpoch) {
-                SetError(error,
-                         "[libpq] Field #%" PRId64 " ('%s') Row #%" PRId64
-                         " has value '%" PRIi64 "' which would underflow",
-                         col + 1, bind_schema->children[col]->name, row + 1,
-                         
array_view->children[col]->buffer_views[1].data.as_int64[row]);
-                return ADBC_STATUS_INVALID_ARGUMENT;
-              }
-
-              if (bind_schema_fields[col].type == 
ArrowType::NANOARROW_TYPE_TIMESTAMP) {
-                const uint64_t value = ToNetworkInt64(val - 
kPostgresTimestampEpoch);
-                std::memcpy(param_values[col], &value, sizeof(int64_t));
-              } else if (bind_schema_fields[col].type ==
-                         ArrowType::NANOARROW_TYPE_DURATION) {
-                // postgres stores an interval as a 64 bit offset in 
microsecond
-                // resolution alongside a 32 bit day and 32 bit month
-                // for now we just send 0 for the day / month values
-                const uint64_t value = ToNetworkInt64(val);
-                std::memcpy(param_values[col], &value, sizeof(int64_t));
-                std::memset(param_values[col] + sizeof(int64_t), 0, 
sizeof(int64_t));
-              }
-              break;
-            }
-            case ArrowType::NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO: {
-              struct ArrowInterval interval;
-              ArrowIntervalInit(&interval, 
NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO);
-              ArrowArrayViewGetIntervalUnsafe(array_view->children[col], row, 
&interval);
-
-              const uint32_t months = ToNetworkInt32(interval.months);
-              const uint32_t days = ToNetworkInt32(interval.days);
-              const uint64_t ms = ToNetworkInt64(interval.ns / 1000);
-
-              std::memcpy(param_values[col], &ms, sizeof(uint64_t));
-              std::memcpy(param_values[col] + sizeof(uint64_t), &days, 
sizeof(uint32_t));
-              std::memcpy(param_values[col] + sizeof(uint64_t) + 
sizeof(uint32_t),
-                          &months, sizeof(uint32_t));
-              break;
-            }
-            default:
-              SetError(error, "%s%" PRId64 "%s%s%s%s", "[libpq] Field #", col 
+ 1, " ('",
-                       bind_schema->children[col]->name,
-                       "') has unsupported type for ingestion ",
-                       ArrowTypeString(bind_schema_fields[col].type));
-              return ADBC_STATUS_NOT_IMPLEMENTED;
-          }
-        }
-
-        result = PQexecPrepared(pg_conn, /*stmtName=*/"",
-                                /*nParams=*/bind_schema->n_children, 
param_values.data(),
-                                param_lengths.data(), param_formats.data(),
-                                /*resultFormat=*/0 /*text*/);
-
-        ExecStatusType pg_status = PQresultStatus(result);
-        if (pg_status != PGRES_COMMAND_OK) {
-          AdbcStatusCode code = SetError(
-              error, result, "[libpq] Failed to execute prepared statement: %s 
%s",
-              PQresStatus(pg_status), PQerrorMessage(pg_conn));
-          PQclear(result);
-          return code;
-        }
-
-        PQclear(result);
-      }
-      if (rows_affected) *rows_affected += array->length;
-
-      if (has_tz_field) {
-        std::string reset_query = "SET TIME ZONE '" + tz_setting + "'";
-        PGresult* reset_tz_result = PQexec(pg_conn, reset_query.c_str());
-        if (PQresultStatus(reset_tz_result) != PGRES_COMMAND_OK) {
-          AdbcStatusCode code =
-              SetError(error, reset_tz_result, "[libpq] Failed to reset time 
zone: %s",
-                       PQerrorMessage(pg_conn));
-          PQclear(reset_tz_result);
-          return code;
-        }
-        PQclear(reset_tz_result);
-
-        PGresult* commit_result = PQexec(pg_conn, "COMMIT");
-        if (PQresultStatus(commit_result) != PGRES_COMMAND_OK) {
-          AdbcStatusCode code =
-              SetError(error, commit_result, "[libpq] Failed to commit 
transaction: %s",
-                       PQerrorMessage(pg_conn));
-          PQclear(commit_result);
-          return code;
-        }
-        PQclear(commit_result);
-      }
-    }
-    return ADBC_STATUS_OK;
-  }
-
-  AdbcStatusCode ExecuteCopy(const PostgresConnection* conn, int64_t* 
rows_affected,
-                             struct AdbcError* error) {
-    if (rows_affected) *rows_affected = 0;
-    const auto pg_conn = conn->conn();
-
-    PostgresCopyStreamWriter writer;
-    CHECK_NA(INTERNAL, writer.Init(&bind_schema.value), error);
-    CHECK_NA(INTERNAL, writer.InitFieldWriters(*conn->type_resolver(), 
nullptr), error);
-
-    CHECK_NA(INTERNAL, writer.WriteHeader(nullptr), error);
-
-    while (true) {
-      Handle<struct ArrowArray> array;
-      int res = bind->get_next(&bind.value, &array.value);
-      if (res != 0) {
-        SetError(error,
-                 "[libpq] Failed to read next batch from stream of bind 
parameters: "
-                 "(%d) %s %s",
-                 res, std::strerror(res), bind->get_last_error(&bind.value));
-        return ADBC_STATUS_IO;
-      }
-      if (!array->release) break;
-
-      CHECK_NA(INTERNAL, writer.SetArray(&array.value), error);
-
-      // build writer buffer
-      int write_result;
-      do {
-        write_result = writer.WriteRecord(nullptr);
-      } while (write_result == NANOARROW_OK);
-
-      // check if not ENODATA at exit
-      if (write_result != ENODATA) {
-        SetError(error, "Error occurred writing COPY data: %s", 
PQerrorMessage(pg_conn));
-        return ADBC_STATUS_IO;
-      }
-
-      RAISE_ADBC(FlushCopyWriterToConn(pg_conn, writer, error));
-
-      if (rows_affected) *rows_affected += array->length;
-      writer.Rewind();
-    }
-
-    // If there were no arrays in the stream, we haven't flushed yet
-    RAISE_ADBC(FlushCopyWriterToConn(pg_conn, writer, error));
-
-    if (PQputCopyEnd(pg_conn, NULL) <= 0) {
-      SetError(error, "Error message returned by PQputCopyEnd: %s",
-               PQerrorMessage(pg_conn));
-      return ADBC_STATUS_IO;
-    }
-
-    PGresult* result = PQgetResult(pg_conn);
-    ExecStatusType pg_status = PQresultStatus(result);
-    if (pg_status != PGRES_COMMAND_OK) {
-      AdbcStatusCode code =
-          SetError(error, result, "[libpq] Failed to execute COPY statement: 
%s %s",
-                   PQresStatus(pg_status), PQerrorMessage(pg_conn));
-      PQclear(result);
-      return code;
-    }
-
-    PQclear(result);
-    return ADBC_STATUS_OK;
-  }
-
-  AdbcStatusCode FlushCopyWriterToConn(PGconn* pg_conn,
-                                       const PostgresCopyStreamWriter& writer,
-                                       struct AdbcError* error) {
-    // https://github.com/apache/arrow-adbc/issues/1921: PostgreSQL has a max
-    // size for a single message that we need to respect (1 GiB - 1).  Since
-    // the buffer can be chunked up as much as we want, go for 16 MiB as our
-    // limit.
-    // 
https://github.com/postgres/postgres/blob/23c5a0e7d43bc925c6001538f04a458933a11fc1/src/common/stringinfo.c#L28
-    constexpr int64_t kMaxCopyBufferSize = 0x1000000;
-    ArrowBuffer buffer = writer.WriteBuffer();
-
-    auto* data = reinterpret_cast<char*>(buffer.data);
-    int64_t remaining = buffer.size_bytes;
-    while (remaining > 0) {
-      int64_t to_write = std::min<int64_t>(remaining, kMaxCopyBufferSize);
-      if (PQputCopyData(pg_conn, data, to_write) <= 0) {
-        SetError(error, "Error writing tuple field data: %s", 
PQerrorMessage(pg_conn));
-        return ADBC_STATUS_IO;
-      }
-      remaining -= to_write;
-      data += to_write;
-    }
-
-    return ADBC_STATUS_OK;
-  }
-};
-}  // namespace
-
 int TupleReader::GetSchema(struct ArrowSchema* out) {
   assert(copy_reader_ != nullptr);
 
@@ -902,13 +307,7 @@ AdbcStatusCode PostgresStatement::Bind(struct ArrowArray* 
values,
 
   if (bind_.release) bind_.release(&bind_);
   // Make a one-value stream
-  bind_.private_data = new OneValueStream{*schema, *values};
-  bind_.get_schema = &OneValueStream::GetSchema;
-  bind_.get_next = &OneValueStream::GetNext;
-  bind_.get_last_error = &OneValueStream::GetLastError;
-  bind_.release = &OneValueStream::Release;
-  std::memset(values, 0, sizeof(*values));
-  std::memset(schema, 0, sizeof(*schema));
+  nanoarrow::VectorArrayStream(schema, values).ToArrayStream(&bind_);
   return ADBC_STATUS_OK;
 }
 
@@ -1136,23 +535,9 @@ AdbcStatusCode PostgresStatement::CreateBulkTable(
 AdbcStatusCode PostgresStatement::ExecuteBind(struct ArrowArrayStream* stream,
                                               int64_t* rows_affected,
                                               struct AdbcError* error) {
-  if (stream) {
-    // TODO:
-    SetError(error, "%s",
-             "[libpq] Prepared statements with parameters returning result 
sets are not "
-             "implemented");
-
-    return ADBC_STATUS_NOT_IMPLEMENTED;
-  }
-
-  BindStream bind_stream(std::move(bind_));
-  std::memset(&bind_, 0, sizeof(bind_));
-
-  RAISE_ADBC(bind_stream.Begin([&]() { return ADBC_STATUS_OK; }, error));
-  RAISE_ADBC(bind_stream.SetParamTypes(*type_resolver_, error));
-  RAISE_ADBC(
-      bind_stream.Prepare(connection_.get(), query_, error, 
connection_->autocommit()));
-  RAISE_ADBC(bind_stream.Execute(connection_.get(), rows_affected, error));
+  PqResultArrayReader reader(connection_->conn(), type_resolver_, query_);
+  reader.SetBind(&bind_);
+  RAISE_ADBC(reader.ToArrayStream(rows_affected, stream, error));
   return ADBC_STATUS_OK;
 }
 
@@ -1300,7 +685,8 @@ AdbcStatusCode PostgresStatement::ExecuteIngest(struct 
ArrowArrayStream* stream,
     current_schema = (*it)[0].data;
   }
 
-  BindStream bind_stream(std::move(bind_));
+  BindStream bind_stream;
+  bind_stream.SetBind(&bind_);
   std::memset(&bind_, 0, sizeof(bind_));
   std::string escaped_table;
   std::string escaped_field_list;
@@ -1328,7 +714,8 @@ AdbcStatusCode PostgresStatement::ExecuteIngest(struct 
ArrowArrayStream* stream,
   }
 
   PQclear(result);
-  RAISE_ADBC(bind_stream.ExecuteCopy(connection_.get(), rows_affected, error));
+  RAISE_ADBC(bind_stream.ExecuteCopy(connection_->conn(), 
*connection_->type_resolver(),
+                                     rows_affected, error));
   return ADBC_STATUS_OK;
 }
 
diff --git a/r/adbcpostgresql/src/Makevars.in b/r/adbcpostgresql/src/Makevars.in
index d3c62fad6..c34b0e160 100644
--- a/r/adbcpostgresql/src/Makevars.in
+++ b/r/adbcpostgresql/src/Makevars.in
@@ -27,5 +27,6 @@ OBJECTS = init.o \
     c/driver/postgresql/error.o \
     c/driver/postgresql/postgresql.o \
     c/driver/postgresql/result_helper.o \
+    c/driver/postgresql/result_reader.o \
     c/driver/postgresql/statement.o \
     c/vendor/nanoarrow/nanoarrow.o
diff --git a/r/adbcpostgresql/src/Makevars.ucrt 
b/r/adbcpostgresql/src/Makevars.ucrt
index 857c45b77..37b101344 100644
--- a/r/adbcpostgresql/src/Makevars.ucrt
+++ b/r/adbcpostgresql/src/Makevars.ucrt
@@ -28,5 +28,6 @@ OBJECTS = init.o \
     c/driver/postgresql/error.o \
     c/driver/postgresql/postgresql.o \
     c/driver/postgresql/result_helper.o \
+    c/driver/postgresql/result_reader.o \
     c/driver/postgresql/statement.o \
     c/vendor/nanoarrow/nanoarrow.o
diff --git a/r/adbcpostgresql/src/Makevars.win 
b/r/adbcpostgresql/src/Makevars.win
index abd5d82aa..fe715ef3b 100644
--- a/r/adbcpostgresql/src/Makevars.win
+++ b/r/adbcpostgresql/src/Makevars.win
@@ -30,6 +30,7 @@ OBJECTS = init.o \
     c/driver/postgresql/error.o \
     c/driver/postgresql/postgresql.o \
     c/driver/postgresql/result_helper.o \
+    c/driver/postgresql/result_reader.o \
     c/driver/postgresql/statement.o \
     c/vendor/nanoarrow/nanoarrow.o
 

Reply via email to