This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 28b8c1b73 feat(c/driver/postgresql): Enable basic connect/query
workflow for Redshift (#2219)
28b8c1b73 is described below
commit 28b8c1b73d0e925deb3e21e8c238f32252279933
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue Nov 5 18:16:08 2024 +0000
feat(c/driver/postgresql): Enable basic connect/query workflow for Redshift
(#2219)
Just following up on #1563 to see if the missing `typarray` column is
the only issue. To get the details right might be a large project, but
we might be able to support a basic connection without too much effort.
Paramter binding and non-COPY result fetching seem to work...the default
query fetch method (COPY) is not supported, `connection_get_info()`
fails, and at a glance, `connection_get_objects()` might be returning
incorrect results (and fails at the column depth).
``` r
library(adbcdrivermanager)
db <- adbc_database_init(
adbcpostgresql::adbcpostgresql(),
uri = Sys.getenv("ADBC_REDSHIFT_TEST_URI"),
adbc.postgresql.load_array_types = FALSE
)
con <- db |>
adbc_connection_init()
stmt <- con |>
adbc_statement_init(adbc.postgresql.use_copy = FALSE)
stream <- nanoarrow::nanoarrow_allocate_array_stream()
stmt |>
adbc_statement_bind(data.frame(45)) |>
adbc_statement_set_sql_query("SELECT 1 + $1 as foofy, 'string' as
foofy_string") |>
adbc_statement_execute_query(stream)
#> [1] -1
tibble::as_tibble(stream)
#> # A tibble: 1 × 2
#> foofy foofy_string
#> <dbl> <chr>
#> 1 46 string
```
<sup>Created on 2024-10-04 with [reprex
v2.1.1](https://reprex.tidyverse.org)</sup>
---------
Co-authored-by: William Ayd <[email protected]>
---
c/driver/postgresql/connection.cc | 91 ++++---
c/driver/postgresql/connection.h | 3 +
.../postgresql/copy/postgres_copy_reader_test.cc | 2 +-
c/driver/postgresql/copy/reader.h | 5 +-
c/driver/postgresql/database.cc | 269 +++++++++++++--------
c/driver/postgresql/database.h | 23 +-
c/driver/postgresql/postgres_type.h | 41 +++-
c/driver/postgresql/postgres_type_test.cc | 5 +
c/driver/postgresql/result_reader.cc | 6 +-
c/driver/postgresql/result_reader.h | 5 +
c/driver/postgresql/statement.cc | 30 ++-
c/driver/postgresql/statement.h | 6 +-
docs/source/driver/postgresql.rst | 7 +
13 files changed, 339 insertions(+), 154 deletions(-)
diff --git a/c/driver/postgresql/connection.cc
b/c/driver/postgresql/connection.cc
index 17732810f..b5f12ca73 100644
--- a/c/driver/postgresql/connection.cc
+++ b/c/driver/postgresql/connection.cc
@@ -17,6 +17,7 @@
#include "connection.h"
+#include <array>
#include <cassert>
#include <cinttypes>
#include <cmath>
@@ -175,6 +176,13 @@ class PostgresGetObjectsHelper : public
adbc::driver::GetObjectsHelper {
all_constraints_(conn, kConstraintsQueryAll),
some_constraints_(conn, ConstraintsQuery()) {}
+ // Allow Redshift to execute this query without constraints
+ // TODO(paleolimbot): Investigate to see if we can simplify the constraits
query so that
+ // it works on both!
+ void SetEnableConstraints(bool enable_constraints) {
+ enable_constraints_ = enable_constraints;
+ }
+
Status Load(adbc::driver::GetObjectsDepth depth,
std::optional<std::string_view> catalog_filter,
std::optional<std::string_view> schema_filter,
@@ -262,16 +270,23 @@ class PostgresGetObjectsHelper : public
adbc::driver::GetObjectsHelper {
std::optional<std::string_view> column_filter) override {
if (column_filter.has_value()) {
UNWRAP_STATUS(some_columns_.Execute(
- {std::string(schema), std::string(table),
std::string(*column_filter)}))
- UNWRAP_STATUS(some_constraints_.Execute(
- {std::string(schema), std::string(table),
std::string(*column_filter)}))
+ {std::string(schema), std::string(table),
std::string(*column_filter)}));
next_column_ = some_columns_.Row(-1);
- next_constraint_ = some_constraints_.Row(-1);
} else {
- UNWRAP_STATUS(all_columns_.Execute({std::string(schema),
std::string(table)}))
- UNWRAP_STATUS(all_constraints_.Execute({std::string(schema),
std::string(table)}))
+ UNWRAP_STATUS(all_columns_.Execute({std::string(schema),
std::string(table)}));
next_column_ = all_columns_.Row(-1);
- next_constraint_ = all_constraints_.Row(-1);
+ }
+
+ if (enable_constraints_) {
+ if (column_filter.has_value()) {
+ UNWRAP_STATUS(some_constraints_.Execute(
+ {std::string(schema), std::string(table),
std::string(*column_filter)}))
+ next_constraint_ = some_constraints_.Row(-1);
+ } else {
+ UNWRAP_STATUS(
+ all_constraints_.Execute({std::string(schema),
std::string(table)}));
+ next_constraint_ = all_constraints_.Row(-1);
+ }
}
return Status::Ok();
@@ -348,6 +363,9 @@ class PostgresGetObjectsHelper : public
adbc::driver::GetObjectsHelper {
PqResultHelper all_constraints_;
PqResultHelper some_constraints_;
+ // On Redshift, the constraints query fails
+ bool enable_constraints_{true};
+
// Iterator state for the catalogs/schema/table/column queries
PqResultRow next_catalog_;
PqResultRow next_schema_;
@@ -478,19 +496,30 @@ AdbcStatusCode PostgresConnection::GetInfo(struct
AdbcConnection* connection,
for (size_t i = 0; i < info_codes_length; i++) {
switch (info_codes[i]) {
case ADBC_INFO_VENDOR_NAME:
- infos.push_back({info_codes[i], "PostgreSQL"});
+ infos.push_back({info_codes[i], std::string(VendorName())});
break;
case ADBC_INFO_VENDOR_VERSION: {
- const char* stmt = "SHOW server_version_num";
- auto result_helper = PqResultHelper{conn_, std::string(stmt)};
- RAISE_STATUS(error, result_helper.Execute());
- auto it = result_helper.begin();
- if (it == result_helper.end()) {
- SetError(error, "[libpq] PostgreSQL returned no rows for '%s'",
stmt);
- return ADBC_STATUS_INTERNAL;
+ if (VendorName() == "Redshift") {
+ const std::array<int, 3>& version = VendorVersion();
+ std::string version_string = std::to_string(version[0]) + "." +
+ std::to_string(version[1]) + "." +
+ std::to_string(version[2]);
+ infos.push_back({info_codes[i], std::move(version_string)});
+
+ } else {
+ // Gives a version in the form 140000 instead of 14.0.0
+ const char* stmt = "SHOW server_version_num";
+ auto result_helper = PqResultHelper{conn_, std::string(stmt)};
+ RAISE_STATUS(error, result_helper.Execute());
+ auto it = result_helper.begin();
+ if (it == result_helper.end()) {
+ SetError(error, "[libpq] PostgreSQL returned no rows for '%s'",
stmt);
+ return ADBC_STATUS_INTERNAL;
+ }
+ const char* server_version_num = (*it)[0].data;
+ infos.push_back({info_codes[i], server_version_num});
}
- const char* server_version_num = (*it)[0].data;
- infos.push_back({info_codes[i], server_version_num});
+
break;
}
case ADBC_INFO_DRIVER_NAME:
@@ -520,7 +549,8 @@ AdbcStatusCode PostgresConnection::GetObjects(
struct AdbcConnection* connection, int c_depth, const char* catalog,
const char* db_schema, const char* table_name, const char** table_type,
const char* column_name, struct ArrowArrayStream* out, struct AdbcError*
error) {
- PostgresGetObjectsHelper new_helper(conn_);
+ PostgresGetObjectsHelper helper(conn_);
+ helper.SetEnableConstraints(VendorName() != "Redshift");
const auto catalog_filter =
catalog ? std::make_optional(std::string_view(catalog)) : std::nullopt;
@@ -559,9 +589,9 @@ AdbcStatusCode PostgresConnection::GetObjects(
.ToAdbc(error);
}
- auto status = BuildGetObjects(&new_helper, depth, catalog_filter,
schema_filter,
+ auto status = BuildGetObjects(&helper, depth, catalog_filter, schema_filter,
table_filter, column_filter,
table_type_filter, out);
- RAISE_STATUS(error, new_helper.Close());
+ RAISE_STATUS(error, helper.Close());
RAISE_STATUS(error, status);
return ADBC_STATUS_OK;
@@ -573,11 +603,12 @@ AdbcStatusCode PostgresConnection::GetOption(const char*
option, char* value,
if (std::strcmp(option, ADBC_CONNECTION_OPTION_CURRENT_CATALOG) == 0) {
output = PQdb(conn_);
} else if (std::strcmp(option, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA) ==
0) {
- PqResultHelper result_helper{conn_, "SELECT CURRENT_SCHEMA"};
+ PqResultHelper result_helper{conn_, "SELECT CURRENT_SCHEMA()"};
RAISE_STATUS(error, result_helper.Execute());
auto it = result_helper.begin();
if (it == result_helper.end()) {
- SetError(error, "[libpq] PostgreSQL returned no rows for 'SELECT
CURRENT_SCHEMA'");
+ SetError(error,
+ "[libpq] PostgreSQL returned no rows for 'SELECT
CURRENT_SCHEMA()'");
return ADBC_STATUS_INTERNAL;
}
output = (*it)[0].data;
@@ -989,7 +1020,6 @@ AdbcStatusCode PostgresConnection::GetTableSchema(const
char* catalog,
CHECK_NA(INTERNAL, ArrowSchemaSetTypeStruct(uschema.get(),
result_helper.NumRows()),
error);
- ArrowError na_error;
int row_counter = 0;
for (auto row : result_helper) {
const char* colname = row[0].data;
@@ -997,14 +1027,15 @@ AdbcStatusCode PostgresConnection::GetTableSchema(const
char* catalog,
static_cast<uint32_t>(std::strtol(row[1].data, /*str_end=*/nullptr,
/*base=*/10));
PostgresType pg_type;
- if (type_resolver_->Find(pg_oid, &pg_type, &na_error) != NANOARROW_OK) {
- SetError(error, "%s%d%s%s%s%" PRIu32, "Column #", row_counter + 1, "
(\"", colname,
- "\") has unknown type code ", pg_oid);
+ if (type_resolver_->FindWithDefault(pg_oid, &pg_type) != NANOARROW_OK) {
+ SetError(error, "%s%d%s%s%s%" PRIu32, "Error resolving type code for
column #",
+ row_counter + 1, " (\"", colname, "\") with oid ", pg_oid);
final_status = ADBC_STATUS_NOT_IMPLEMENTED;
break;
}
CHECK_NA(INTERNAL,
-
pg_type.WithFieldName(colname).SetSchema(uschema->children[row_counter]),
+
pg_type.WithFieldName(colname).SetSchema(uschema->children[row_counter],
+
std::string(VendorName())),
error);
row_counter++;
}
@@ -1136,4 +1167,10 @@ AdbcStatusCode PostgresConnection::SetOptionInt(const
char* key, int64_t value,
return ADBC_STATUS_NOT_IMPLEMENTED;
}
+std::string_view PostgresConnection::VendorName() { return
database_->VendorName(); }
+
+const std::array<int, 3>& PostgresConnection::VendorVersion() {
+ return database_->VendorVersion();
+}
+
} // namespace adbcpq
diff --git a/c/driver/postgresql/connection.h b/c/driver/postgresql/connection.h
index 787a7dcda..7683875b5 100644
--- a/c/driver/postgresql/connection.h
+++ b/c/driver/postgresql/connection.h
@@ -17,6 +17,7 @@
#pragma once
+#include <array>
#include <cstdint>
#include <memory>
@@ -73,6 +74,8 @@ class PostgresConnection {
return type_resolver_;
}
bool autocommit() const { return autocommit_; }
+ std::string_view VendorName();
+ const std::array<int, 3>& VendorVersion();
private:
std::shared_ptr<PostgresDatabase> database_;
diff --git a/c/driver/postgresql/copy/postgres_copy_reader_test.cc
b/c/driver/postgresql/copy/postgres_copy_reader_test.cc
index 60e0b6aaf..7b9fe230f 100644
--- a/c/driver/postgresql/copy/postgres_copy_reader_test.cc
+++ b/c/driver/postgresql/copy/postgres_copy_reader_test.cc
@@ -27,7 +27,7 @@ class PostgresCopyStreamTester {
public:
ArrowErrorCode Init(const PostgresType& root_type, ArrowError* error =
nullptr) {
NANOARROW_RETURN_NOT_OK(reader_.Init(root_type));
- NANOARROW_RETURN_NOT_OK(reader_.InferOutputSchema(error));
+ NANOARROW_RETURN_NOT_OK(reader_.InferOutputSchema("PostgreSQL Tester",
error));
NANOARROW_RETURN_NOT_OK(reader_.InitFieldReaders(error));
return NANOARROW_OK;
}
diff --git a/c/driver/postgresql/copy/reader.h
b/c/driver/postgresql/copy/reader.h
index 983f39226..07f91d545 100644
--- a/c/driver/postgresql/copy/reader.h
+++ b/c/driver/postgresql/copy/reader.h
@@ -972,10 +972,11 @@ class PostgresCopyStreamReader {
return NANOARROW_OK;
}
- ArrowErrorCode InferOutputSchema(ArrowError* error) {
+ ArrowErrorCode InferOutputSchema(const std::string& vendor_name, ArrowError*
error) {
schema_.reset();
ArrowSchemaInit(schema_.get());
- NANOARROW_RETURN_NOT_OK(root_reader_.InputType().SetSchema(schema_.get()));
+ NANOARROW_RETURN_NOT_OK(
+ root_reader_.InputType().SetSchema(schema_.get(), vendor_name));
return NANOARROW_OK;
}
diff --git a/c/driver/postgresql/database.cc b/c/driver/postgresql/database.cc
index 97242ad58..cdbad7535 100644
--- a/c/driver/postgresql/database.cc
+++ b/c/driver/postgresql/database.cc
@@ -17,6 +17,8 @@
#include "database.h"
+#include <array>
+#include <charconv>
#include <cinttypes>
#include <cstring>
#include <memory>
@@ -28,6 +30,7 @@
#include <nanoarrow/nanoarrow.h>
#include "driver/common/utils.h"
+#include "result_helper.h"
namespace adbcpq {
@@ -54,8 +57,19 @@ AdbcStatusCode PostgresDatabase::GetOptionDouble(const char*
option, double* val
}
AdbcStatusCode PostgresDatabase::Init(struct AdbcError* error) {
- // Connect to validate the parameters.
- return RebuildTypeResolver(error);
+ // Connect to initialize the version information and build the type table
+ PGconn* conn = nullptr;
+ RAISE_ADBC(Connect(&conn, error));
+
+ Status status = InitVersions(conn);
+ if (!status.ok()) {
+ RAISE_ADBC(Disconnect(&conn, nullptr));
+ return status.ToAdbc(error);
+ }
+
+ status = RebuildTypeResolver(conn);
+ RAISE_ADBC(Disconnect(&conn, nullptr));
+ return status.ToAdbc(error);
}
AdbcStatusCode PostgresDatabase::Release(struct AdbcError* error) {
@@ -123,20 +137,87 @@ AdbcStatusCode PostgresDatabase::Disconnect(PGconn**
conn, struct AdbcError* err
return ADBC_STATUS_OK;
}
-// Helpers for building the type resolver from queries
-static inline int32_t InsertPgAttributeResult(
- PGresult* result, const std::shared_ptr<PostgresTypeResolver>& resolver);
+namespace {
+
+// Parse an individual version in the form of "xxx.xxx.xxx".
+// If the version components aren't numeric, they will be zero.
+std::array<int, 3> ParseVersion(std::string_view version) {
+ std::array<int, 3> out{};
+ size_t component = 0;
+ size_t component_begin = 0;
+ size_t component_end = 0;
+
+ // While there are remaining version components and we haven't reached the
end of the
+ // string
+ while (component_begin < version.size() && component < out.size()) {
+ // Find the next character that marks a version component separation or
the end of the
+ // string
+ component_end = version.find_first_of(".-", component_begin);
+ if (component_end == version.npos) {
+ component_end = version.size();
+ }
-static inline int32_t InsertPgTypeResult(
- PGresult* result, const std::shared_ptr<PostgresTypeResolver>& resolver);
+ // Try to parse the component as an integer (assigning zero if this fails)
+ int value = 0;
+ std::from_chars(version.data() + component_begin, version.data() +
component_end,
+ value);
+ out[component] = value;
-AdbcStatusCode PostgresDatabase::RebuildTypeResolver(struct AdbcError* error) {
- PGconn* conn = nullptr;
- AdbcStatusCode final_status = Connect(&conn, error);
- if (final_status != ADBC_STATUS_OK) {
- return final_status;
+ // Move on to the next component
+ component_begin = component_end + 1;
+ component_end = component_begin;
+ component++;
+ }
+
+ return out;
+}
+
+// Parse the PostgreSQL version() string that looks like:
+// PostgreSQL 8.0.2 on i686-pc-linux-gnu, compiled by GCC gcc (GCC) 3.4.2
20041017 (Red
+// Hat 3.4.2-6.fc3), Redshift 1.0.77467
+std::array<int, 3> ParsePrefixedVersion(std::string_view version_info,
+ std::string_view prefix) {
+ size_t pos = version_info.find(prefix);
+ if (pos == version_info.npos) {
+ return {0, 0, 0};
}
+ // Skip the prefix and any leading whitespace
+ pos = version_info.find_first_not_of(' ', pos + prefix.size());
+ if (pos == version_info.npos) {
+ return {0, 0, 0};
+ }
+
+ return ParseVersion(version_info.substr(pos));
+}
+
+} // namespace
+
+Status PostgresDatabase::InitVersions(PGconn* conn) {
+ PqResultHelper helper(conn, "SELECT version();");
+ UNWRAP_STATUS(helper.Execute());
+ if (helper.NumRows() != 1 || helper.NumColumns() != 1) {
+ return Status::Internal("Expected 1 row and 1 column for SELECT version();
but got ",
+ helper.NumRows(), "/", helper.NumColumns());
+ }
+
+ std::string_view version_info = helper.Row(0)[0].value();
+ postgres_server_version_ = ParsePrefixedVersion(version_info, "PostgreSQL");
+ redshift_server_version_ = ParsePrefixedVersion(version_info, "Redshift");
+
+ return Status::Ok();
+}
+
+// Helpers for building the type resolver from queries
+static std::string BuildPgTypeQuery(bool has_typarray);
+
+static Status InsertPgAttributeResult(
+ const PqResultHelper& result, const std::shared_ptr<PostgresTypeResolver>&
resolver);
+
+static Status InsertPgTypeResult(const PqResultHelper& result,
+ const std::shared_ptr<PostgresTypeResolver>&
resolver);
+
+Status PostgresDatabase::RebuildTypeResolver(PGconn* conn) {
// We need a few queries to build the resolver. The current strategy might
// fail for some recursive definitions (e.g., arrays of records of arrays).
// First, one on the pg_attribute table to resolve column names/oids for
@@ -156,147 +237,131 @@ ORDER BY
// recursive definitions (e.g., record types with array column). This
currently won't
// handle range types because those rows don't have child OID information.
Arrays types
// are inserted after a successful insert of the element type.
- const std::string kTypeQuery = R"(
-SELECT
- oid,
- typname,
- typreceive,
- typbasetype,
- typarray,
- typrelid
-FROM
- pg_catalog.pg_type
-WHERE
- (typreceive != 0 OR typname = 'aclitem') AND typtype != 'r' AND
typreceive::TEXT != 'array_recv'
-ORDER BY
- oid
-)";
+ std::string type_query =
+ BuildPgTypeQuery(/*has_typarray*/ redshift_server_version_[0] == 0);
// Create a new type resolver (this instance's type_resolver_ member
// will be updated at the end if this succeeds).
auto resolver = std::make_shared<PostgresTypeResolver>();
// Insert record type definitions (this includes table schemas)
- PGresult* result = PQexec(conn, kColumnsQuery.c_str());
- ExecStatusType pq_status = PQresultStatus(result);
- if (pq_status == PGRES_TUPLES_OK) {
- InsertPgAttributeResult(result, resolver);
- } else {
- SetError(error, "%s%s",
- "[libpq] Failed to build type mapping table: ",
PQerrorMessage(conn));
- final_status = ADBC_STATUS_IO;
- }
-
- PQclear(result);
+ PqResultHelper columns(conn, kColumnsQuery.c_str());
+ UNWRAP_STATUS(columns.Execute());
+ UNWRAP_STATUS(InsertPgAttributeResult(columns, resolver));
// Attempt filling the resolver a few times to handle recursive definitions.
int32_t max_attempts = 3;
+ PqResultHelper types(conn, type_query);
for (int32_t i = 0; i < max_attempts; i++) {
- result = PQexec(conn, kTypeQuery.c_str());
- ExecStatusType pq_status = PQresultStatus(result);
- if (pq_status == PGRES_TUPLES_OK) {
- InsertPgTypeResult(result, resolver);
- } else {
- SetError(error, "%s%s",
- "[libpq] Failed to build type mapping table: ",
PQerrorMessage(conn));
- final_status = ADBC_STATUS_IO;
- }
-
- PQclear(result);
- if (final_status != ADBC_STATUS_OK) {
- break;
- }
+ UNWRAP_STATUS(types.Execute());
+ UNWRAP_STATUS(InsertPgTypeResult(types, resolver));
}
- // Disconnect since PostgreSQL connections can be heavy.
- {
- AdbcStatusCode status = Disconnect(&conn, error);
- if (status != ADBC_STATUS_OK) final_status = status;
- }
+ type_resolver_ = std::move(resolver);
+ return Status::Ok();
+}
- if (final_status == ADBC_STATUS_OK) {
- type_resolver_ = std::move(resolver);
+static std::string BuildPgTypeQuery(bool has_typarray) {
+ std::string maybe_typarray_col;
+ std::string maybe_array_recv_filter;
+ if (has_typarray) {
+ maybe_typarray_col = ", typarray";
+ maybe_array_recv_filter = "AND typreceive::TEXT != 'array_recv'";
}
- return final_status;
+ return std::string() + "SELECT oid, typname, typreceive, typbasetype,
typrelid" +
+ maybe_typarray_col + " FROM pg_catalog.pg_type " +
+ " WHERE (typreceive != 0 OR typsend != 0) AND typtype != 'r' " +
+ maybe_array_recv_filter;
}
-static inline int32_t InsertPgAttributeResult(
- PGresult* result, const std::shared_ptr<PostgresTypeResolver>& resolver) {
- int num_rows = PQntuples(result);
+static Status InsertPgAttributeResult(
+ const PqResultHelper& result, const std::shared_ptr<PostgresTypeResolver>&
resolver) {
+ int num_rows = result.NumRows();
std::vector<std::pair<std::string, uint32_t>> columns;
- uint32_t current_type_oid = 0;
- int32_t n_added = 0;
+ int64_t current_type_oid = 0;
+
+ if (result.NumColumns() != 3) {
+ return Status::Internal(
+ "Expected 3 columns from type resolver pg_attribute query but got ",
+ result.NumColumns());
+ }
for (int row = 0; row < num_rows; row++) {
- const uint32_t type_oid = static_cast<uint32_t>(
- std::strtol(PQgetvalue(result, row, 0), /*str_end=*/nullptr,
/*base=*/10));
- const char* col_name = PQgetvalue(result, row, 1);
- const uint32_t col_oid = static_cast<uint32_t>(
- std::strtol(PQgetvalue(result, row, 2), /*str_end=*/nullptr,
/*base=*/10));
+ PqResultRow item = result.Row(row);
+ UNWRAP_RESULT(int64_t type_oid, item[0].ParseInteger());
+ std::string_view col_name = item[1].value();
+ UNWRAP_RESULT(int64_t col_oid, item[2].ParseInteger());
if (type_oid != current_type_oid && !columns.empty()) {
resolver->InsertClass(current_type_oid, columns);
columns.clear();
current_type_oid = type_oid;
- n_added++;
}
- columns.push_back({col_name, col_oid});
+ columns.push_back({std::string(col_name), static_cast<uint32_t>(col_oid)});
}
if (!columns.empty()) {
- resolver->InsertClass(current_type_oid, columns);
- n_added++;
+ resolver->InsertClass(static_cast<uint32_t>(current_type_oid), columns);
}
- return n_added;
+ return Status::Ok();
}
-static inline int32_t InsertPgTypeResult(
- PGresult* result, const std::shared_ptr<PostgresTypeResolver>& resolver) {
- int num_rows = PQntuples(result);
- PostgresTypeResolver::Item item;
- int32_t n_added = 0;
+static Status InsertPgTypeResult(const PqResultHelper& result,
+ const std::shared_ptr<PostgresTypeResolver>&
resolver) {
+ if (result.NumColumns() != 5 && result.NumColumns() != 6) {
+ return Status::Internal(
+ "Expected 5 or 6 columns from type resolver pg_type query but got ",
+ result.NumColumns());
+ }
+
+ int num_rows = result.NumRows();
+ int num_cols = result.NumColumns();
+ PostgresTypeResolver::Item type_item;
for (int row = 0; row < num_rows; row++) {
- const uint32_t oid = static_cast<uint32_t>(
- std::strtol(PQgetvalue(result, row, 0), /*str_end=*/nullptr,
/*base=*/10));
- const char* typname = PQgetvalue(result, row, 1);
- const char* typreceive = PQgetvalue(result, row, 2);
- const uint32_t typbasetype = static_cast<uint32_t>(
- std::strtol(PQgetvalue(result, row, 3), /*str_end=*/nullptr,
/*base=*/10));
- const uint32_t typarray = static_cast<uint32_t>(
- std::strtol(PQgetvalue(result, row, 4), /*str_end=*/nullptr,
/*base=*/10));
- const uint32_t typrelid = static_cast<uint32_t>(
- std::strtol(PQgetvalue(result, row, 5), /*str_end=*/nullptr,
/*base=*/10));
+ PqResultRow item = result.Row(row);
+ UNWRAP_RESULT(int64_t oid, item[0].ParseInteger());
+ const char* typname = item[1].data;
+ const char* typreceive = item[2].data;
+ UNWRAP_RESULT(int64_t typbasetype, item[3].ParseInteger());
+ UNWRAP_RESULT(int64_t typrelid, item[4].ParseInteger());
+
+ int64_t typarray;
+ if (num_cols == 6) {
+ UNWRAP_RESULT(typarray, item[5].ParseInteger());
+ } else {
+ typarray = 0;
+ }
// Special case the aclitem because it shows up in a bunch of internal
tables
if (strcmp(typname, "aclitem") == 0) {
typreceive = "aclitem_recv";
}
- item.oid = oid;
- item.typname = typname;
- item.typreceive = typreceive;
- item.class_oid = typrelid;
- item.base_oid = typbasetype;
+ type_item.oid = static_cast<uint32_t>(oid);
+ type_item.typname = typname;
+ type_item.typreceive = typreceive;
+ type_item.class_oid = static_cast<uint32_t>(typrelid);
+ type_item.base_oid = static_cast<uint32_t>(typbasetype);
- int result = resolver->Insert(item, nullptr);
+ int result = resolver->Insert(type_item, nullptr);
// If there's an array type and the insert succeeded, add that now too
if (result == NANOARROW_OK && typarray != 0) {
std::string array_typname = "_" + std::string(typname);
- item.oid = typarray;
- item.typname = array_typname.c_str();
- item.typreceive = "array_recv";
- item.child_oid = oid;
+ type_item.oid = typarray;
+ type_item.typname = array_typname.c_str();
+ type_item.typreceive = "array_recv";
+ type_item.child_oid = static_cast<uint32_t>(oid);
- resolver->Insert(item, nullptr);
+ resolver->Insert(type_item, nullptr);
}
}
- return n_added;
+ return Status::Ok();
}
} // namespace adbcpq
diff --git a/c/driver/postgresql/database.h b/c/driver/postgresql/database.h
index d246ea04a..e0a00267e 100644
--- a/c/driver/postgresql/database.h
+++ b/c/driver/postgresql/database.h
@@ -17,6 +17,7 @@
#pragma once
+#include <array>
#include <cstdint>
#include <memory>
#include <string>
@@ -24,9 +25,12 @@
#include <arrow-adbc/adbc.h>
#include <libpq-fe.h>
+#include "driver/framework/status.h"
#include "postgres_type.h"
namespace adbcpq {
+using adbc::driver::Status;
+
class PostgresDatabase {
public:
PostgresDatabase();
@@ -58,12 +62,29 @@ class PostgresDatabase {
return type_resolver_;
}
- AdbcStatusCode RebuildTypeResolver(struct AdbcError* error);
+ Status InitVersions(PGconn* conn);
+ Status RebuildTypeResolver(PGconn* conn);
+ std::string_view VendorName() {
+ if (redshift_server_version_[0] != 0) {
+ return "Redshift";
+ } else {
+ return "PostgreSQL";
+ }
+ }
+ const std::array<int, 3>& VendorVersion() {
+ if (redshift_server_version_[0] != 0) {
+ return redshift_server_version_;
+ } else {
+ return postgres_server_version_;
+ }
+ }
private:
int32_t open_connections_;
std::string uri_;
std::shared_ptr<PostgresTypeResolver> type_resolver_;
+ std::array<int, 3> postgres_server_version_{};
+ std::array<int, 3> redshift_server_version_{};
};
} // namespace adbcpq
diff --git a/c/driver/postgresql/postgres_type.h
b/c/driver/postgresql/postgres_type.h
index b3cfc209f..d2a535629 100644
--- a/c/driver/postgresql/postgres_type.h
+++ b/c/driver/postgresql/postgres_type.h
@@ -111,7 +111,11 @@ enum class PostgresTypeId {
kXid8,
kXid,
kXml,
- kUserDefined
+ kUserDefined,
+ // This is not an actual type, but there are cases where all we have is an
Oid
+ // that was not inserted into the type resolver. We can't use "unknown" or
"opaque"
+ // or "void" because those names show up in actual pg_type tables.
+ kUnnamedArrowOpaque
};
// Returns the receive function name as defined in the typrecieve column
@@ -139,6 +143,11 @@ class PostgresType {
PostgresType() : PostgresType(PostgresTypeId::kUninitialized) {}
+ static PostgresType Unnamed(uint32_t oid) {
+ return PostgresType(PostgresTypeId::kUnnamedArrowOpaque)
+ .WithPgTypeInfo(oid, "unnamed<oid:" + std::to_string(oid) + ">");
+ }
+
void AppendChild(const std::string& field_name, const PostgresType& type) {
PostgresType child(type);
children_.push_back(child.WithFieldName(field_name));
@@ -204,7 +213,8 @@ class PostgresType {
// do not have a corresponding Arrow type are returned as Binary with field
// metadata ADBC:posgresql:typname. These types can be represented as their
// binary COPY representation in the output.
- ArrowErrorCode SetSchema(ArrowSchema* schema) const {
+ ArrowErrorCode SetSchema(ArrowSchema* schema,
+ const std::string& vendor_name = "PostgreSQL")
const {
switch (type_id_) {
// ---- Primitive types --------------------
case PostgresTypeId::kBool:
@@ -235,7 +245,7 @@ class PostgresType {
// ---- Numeric/Decimal-------------------
case PostgresTypeId::kNumeric:
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema,
NANOARROW_TYPE_STRING));
- NANOARROW_RETURN_NOT_OK(AddPostgresTypeMetadata(schema));
+ NANOARROW_RETURN_NOT_OK(AddPostgresTypeMetadata(schema, vendor_name));
break;
@@ -290,13 +300,14 @@ class PostgresType {
case PostgresTypeId::kRecord:
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeStruct(schema,
n_children()));
for (int64_t i = 0; i < n_children(); i++) {
- NANOARROW_RETURN_NOT_OK(children_[i].SetSchema(schema->children[i]));
+ NANOARROW_RETURN_NOT_OK(
+ children_[i].SetSchema(schema->children[i], vendor_name));
}
break;
case PostgresTypeId::kArray:
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema,
NANOARROW_TYPE_LIST));
- NANOARROW_RETURN_NOT_OK(children_[0].SetSchema(schema->children[0]));
+ NANOARROW_RETURN_NOT_OK(children_[0].SetSchema(schema->children[0],
vendor_name));
break;
case PostgresTypeId::kUserDefined:
@@ -305,7 +316,7 @@ class PostgresType {
// can still return the bytes postgres gives us and attach the type
name as
// metadata
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema,
NANOARROW_TYPE_BINARY));
- NANOARROW_RETURN_NOT_OK(AddPostgresTypeMetadata(schema));
+ NANOARROW_RETURN_NOT_OK(AddPostgresTypeMetadata(schema, vendor_name));
break;
}
@@ -329,7 +340,8 @@ class PostgresType {
static constexpr const char* kOpaqueExtensionName = "arrow.opaque";
static constexpr const char* kExtensionMetadata = "ARROW:extension:metadata";
- ArrowErrorCode AddPostgresTypeMetadata(ArrowSchema* schema) const {
+ ArrowErrorCode AddPostgresTypeMetadata(ArrowSchema* schema,
+ const std::string& vendor_name) const
{
// the typname_ may not always be set: an instance of this class can be
// created with just the type id. That's why there is this here fallback to
// resolve the type name of built-in types.
@@ -346,7 +358,7 @@ class PostgresType {
// Add the Opaque extension type metadata
std::string metadata = R"({"type_name": ")";
metadata += typname;
- metadata += R"(", "vendor_name": "PostgreSQL"})";
+ metadata += R"(", "vendor_name": ")" + vendor_name + R"("})";
NANOARROW_RETURN_NOT_OK(
ArrowMetadataBuilderAppend(buffer.get(), ArrowCharView(kExtensionName),
ArrowCharView(kOpaqueExtensionName)));
@@ -395,7 +407,18 @@ class PostgresTypeResolver {
return EINVAL;
}
- *type_out = (*result).second;
+ *type_out = result->second;
+ return NANOARROW_OK;
+ }
+
+ ArrowErrorCode FindWithDefault(uint32_t oid, PostgresType* type_out) {
+ auto result = mapping_.find(oid);
+ if (result == mapping_.end()) {
+ *type_out = PostgresType::Unnamed(oid);
+ } else {
+ *type_out = result->second;
+ }
+
return NANOARROW_OK;
}
diff --git a/c/driver/postgresql/postgres_type_test.cc
b/c/driver/postgresql/postgres_type_test.cc
index 2e713204f..2c76f4c1f 100644
--- a/c/driver/postgresql/postgres_type_test.cc
+++ b/c/driver/postgresql/postgres_type_test.cc
@@ -337,6 +337,11 @@ TEST(PostgresTypeTest, PostgresTypeResolver) {
EXPECT_EQ(resolver.Find(123, &type, &error), EINVAL);
EXPECT_STREQ(ArrowErrorMessage(&error), "Postgres type with oid 123 not
found");
+ EXPECT_EQ(resolver.FindWithDefault(123, &type), NANOARROW_OK);
+ EXPECT_EQ(type.oid(), 123);
+ EXPECT_EQ(type.type_id(), PostgresTypeId::kUnnamedArrowOpaque);
+ EXPECT_EQ(type.typname(), "unnamed<oid:123>");
+
// Check error for Array with unknown child
item.oid = 123;
item.typname = "some_array";
diff --git a/c/driver/postgresql/result_reader.cc
b/c/driver/postgresql/result_reader.cc
index c350ab8a3..464bad74a 100644
--- a/c/driver/postgresql/result_reader.cc
+++ b/c/driver/postgresql/result_reader.cc
@@ -174,10 +174,10 @@ Status PqResultArrayReader::Initialize(int64_t*
rows_affected) {
for (int i = 0; i < helper_.NumColumns(); i++) {
PostgresType child_type;
- UNWRAP_NANOARROW(na_error_, Internal,
- type_resolver_->Find(helper_.FieldType(i), &child_type,
&na_error_));
+ UNWRAP_ERRNO(Internal,
+ type_resolver_->FindWithDefault(helper_.FieldType(i),
&child_type));
- UNWRAP_ERRNO(Internal, child_type.SetSchema(schema_->children[i]));
+ UNWRAP_ERRNO(Internal, child_type.SetSchema(schema_->children[i],
vendor_name_));
UNWRAP_ERRNO(Internal,
ArrowSchemaSetName(schema_->children[i],
helper_.FieldName(i)));
diff --git a/c/driver/postgresql/result_reader.h
b/c/driver/postgresql/result_reader.h
index 5c36dccb2..90b35baf0 100644
--- a/c/driver/postgresql/result_reader.h
+++ b/c/driver/postgresql/result_reader.h
@@ -58,6 +58,10 @@ class PqResultArrayReader {
bind_stream_->SetBind(stream);
}
+ void SetVendorName(std::string_view vendor_name) {
+ vendor_name_ = std::string(vendor_name);
+ }
+
int GetSchema(struct ArrowSchema* out);
int GetNext(struct ArrowArray* out);
const char* GetLastError();
@@ -74,6 +78,7 @@ class PqResultArrayReader {
std::vector<std::unique_ptr<PostgresCopyFieldReader>> field_readers_;
nanoarrow::UniqueSchema schema_;
bool autocommit_;
+ std::string vendor_name_;
struct AdbcError error_;
struct ArrowError na_error_;
diff --git a/c/driver/postgresql/statement.cc b/c/driver/postgresql/statement.cc
index 32558b494..129ddebff 100644
--- a/c/driver/postgresql/statement.cc
+++ b/c/driver/postgresql/statement.cc
@@ -459,6 +459,7 @@ AdbcStatusCode PostgresStatement::ExecuteBind(struct
ArrowArrayStream* stream,
PqResultArrayReader reader(connection_->conn(), type_resolver_, query_);
reader.SetAutocommit(connection_->autocommit());
reader.SetBind(&bind_);
+ reader.SetVendorName(connection_->VendorName());
RAISE_STATUS(error, reader.ToArrayStream(rows_affected, stream));
return ADBC_STATUS_OK;
}
@@ -485,8 +486,9 @@ AdbcStatusCode PostgresStatement::ExecuteQuery(struct
ArrowArrayStream* stream,
// If we have been requested to avoid COPY or there is no output requested,
// execute using the PqResultArrayReader.
- if (!stream || !use_copy_) {
+ if (!stream || !UseCopy()) {
PqResultArrayReader reader(connection_->conn(), type_resolver_, query_);
+ reader.SetVendorName(connection_->VendorName());
RAISE_STATUS(error, reader.ToArrayStream(rows_affected, stream));
return ADBC_STATUS_OK;
}
@@ -505,6 +507,7 @@ AdbcStatusCode PostgresStatement::ExecuteQuery(struct
ArrowArrayStream* stream,
if (root_type.n_children() == 0) {
// Could/should move the helper into the reader instead of repreparing
PqResultArrayReader reader(connection_->conn(), type_resolver_, query_);
+ reader.SetVendorName(connection_->VendorName());
RAISE_STATUS(error, reader.ToArrayStream(rows_affected, stream));
return ADBC_STATUS_OK;
}
@@ -512,8 +515,10 @@ AdbcStatusCode PostgresStatement::ExecuteQuery(struct
ArrowArrayStream* stream,
struct ArrowError na_error;
reader_.copy_reader_ = std::make_unique<PostgresCopyStreamReader>();
CHECK_NA(INTERNAL, reader_.copy_reader_->Init(root_type), error);
- CHECK_NA_DETAIL(INTERNAL,
reader_.copy_reader_->InferOutputSchema(&na_error), &na_error,
- error);
+ CHECK_NA_DETAIL(INTERNAL,
+ reader_.copy_reader_->InferOutputSchema(
+ std::string(connection_->VendorName()), &na_error),
+ &na_error, error);
CHECK_NA_DETAIL(INTERNAL, reader_.copy_reader_->InitFieldReaders(&na_error),
&na_error,
error);
@@ -574,7 +579,9 @@ AdbcStatusCode PostgresStatement::ExecuteSchema(struct
ArrowSchema* schema,
nanoarrow::UniqueSchema tmp;
ArrowSchemaInit(tmp.get());
- CHECK_NA(INTERNAL, output_type.SetSchema(tmp.get()), error);
+ CHECK_NA(INTERNAL,
+ output_type.SetSchema(tmp.get(),
std::string(connection_->VendorName())),
+ error);
tmp.move(schema);
return ADBC_STATUS_OK;
@@ -597,11 +604,12 @@ AdbcStatusCode PostgresStatement::ExecuteIngest(struct
ArrowArrayStream* stream,
// This is a little unfortunate; we need another DB roundtrip
std::string current_schema;
{
- PqResultHelper result_helper{connection_->conn(), "SELECT CURRENT_SCHEMA"};
+ PqResultHelper result_helper{connection_->conn(), "SELECT
CURRENT_SCHEMA()"};
RAISE_STATUS(error, result_helper.Execute());
auto it = result_helper.begin();
if (it == result_helper.end()) {
- SetError(error, "[libpq] PostgreSQL returned no rows for 'SELECT
CURRENT_SCHEMA'");
+ SetError(error,
+ "[libpq] PostgreSQL returned no rows for 'SELECT
CURRENT_SCHEMA()'");
return ADBC_STATUS_INTERNAL;
}
current_schema = (*it)[0].data;
@@ -666,7 +674,7 @@ AdbcStatusCode PostgresStatement::GetOption(const char*
key, char* value, size_t
} else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) ==
0) {
result = std::to_string(reader_.batch_size_hint_bytes_);
} else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_USE_COPY) == 0) {
- if (use_copy_) {
+ if (UseCopy()) {
result = "true";
} else {
result = "false";
@@ -838,4 +846,12 @@ void PostgresStatement::ClearResult() {
reader_.Release();
}
+int PostgresStatement::UseCopy() {
+ if (use_copy_ == -1) {
+ return connection_->VendorName() != "Redshift";
+ } else {
+ return use_copy_;
+ }
+}
+
} // namespace adbcpq
diff --git a/c/driver/postgresql/statement.h b/c/driver/postgresql/statement.h
index 9e79f41ed..60ada992b 100644
--- a/c/driver/postgresql/statement.h
+++ b/c/driver/postgresql/statement.h
@@ -97,7 +97,7 @@ class PostgresStatement {
: connection_(nullptr),
query_(),
prepared_(false),
- use_copy_(true),
+ use_copy_(-1),
reader_(nullptr) {
std::memset(&bind_, 0, sizeof(bind_));
}
@@ -161,7 +161,7 @@ class PostgresStatement {
};
// Options
- bool use_copy_;
+ int use_copy_;
struct {
std::string db_schema;
@@ -171,5 +171,7 @@ class PostgresStatement {
} ingest_;
TupleReader reader_;
+
+ int UseCopy();
};
} // namespace adbcpq
diff --git a/docs/source/driver/postgresql.rst
b/docs/source/driver/postgresql.rst
index dda5e108d..8c54ee7b0 100644
--- a/docs/source/driver/postgresql.rst
+++ b/docs/source/driver/postgresql.rst
@@ -35,6 +35,13 @@ overall approach.
Performance/optimization and support for complex types and
different ADBC features is still ongoing.
+.. note:: AWS Redshift supports a very old version of the PostgreSQL
+ wire protocol and has a basic level of support in the ADBC
+ PostgreSQL driver. Because Redshift does not support reading or
+ writing COPY in PostgreSQL binary format, the optimizations that
+ accellerate non-Redshift queries are not enabled when connecting
+ to a Redshift database. This functionality is experimental.
+
Installation
============