This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new d56915d refactor(c/driver/postgresql): Factor out Postgres type
abstraction and test it independently of the driver (#573)
d56915d is described below
commit d56915db7f806f598dae6815796eea5764a1d3a9
Author: Dewey Dunnington <[email protected]>
AuthorDate: Thu Apr 20 10:00:41 2023 -0400
refactor(c/driver/postgresql): Factor out Postgres type abstraction and
test it independently of the driver (#573)
The existing `type.h`/`type.cc` provides a limited abstraction of the
postgres type system; however, eventually this driver will need to deal
with range, domain, array, and record types. This PR factors out
existing behaviour and includes a `SetSchema()` method to provide the
default mapping from the postgres -> arrow type systems. This PR doesn't
provide full type support but connects enough wires that future support
for these types is more straightforward.
Many commits on this branch also implemented a COPY -> Arrow converter
abstraction that can handle range, domain, array, and record types. I
moved that to a separate branch (
https://github.com/paleolimbot/arrow-adbc/pull/1 ) because it has an
orthogonal scope to this one. If that's of interest I'll continue with
that too (which would add support for Array, Domain, and date/time types
in results).
---------
Co-authored-by: David Li <[email protected]>
---
.github/workflows/native-unix.yml | 1 -
c/driver/postgresql/CMakeLists.txt | 2 +-
c/driver/postgresql/CMakeUserPresets.example.json | 33 +
c/driver/postgresql/README.md | 13 +
c/driver/postgresql/connection.cc | 2 +-
c/driver/postgresql/connection.h | 8 +-
c/driver/postgresql/database.cc | 220 +++++-
c/driver/postgresql/database.h | 9 +-
c/driver/postgresql/postgres_type.h | 897 ++++++++++++++++++++++
c/driver/postgresql/postgres_type_test.cc | 405 ++++++++++
c/driver/postgresql/postgresql.cc | 2 -
c/driver/postgresql/statement.cc | 86 +--
c/driver/postgresql/statement.h | 4 +-
c/driver/postgresql/type.cc | 92 ---
c/driver/postgresql/type.h | 63 --
c/driver/postgresql/util.h | 8 +
docker-compose.yml | 2 +-
r/adbcpostgresql/bootstrap.R | 12 +-
r/adbcpostgresql/src/.gitignore | 4 +-
r/adbcpostgresql/src/Makevars.in | 1 -
r/adbcpostgresql/src/Makevars.ucrt | 1 -
r/adbcpostgresql/src/Makevars.win | 1 -
r/adbcpostgresql/src/database.cc | 124 ---
r/adbcpostgresql/src/nanoarrow/.gitignore | 1 +
r/adbcpostgresql/src/postgresql.cc | 499 ------------
25 files changed, 1588 insertions(+), 902 deletions(-)
diff --git a/.github/workflows/native-unix.yml
b/.github/workflows/native-unix.yml
index ee3c5e9..82c99bd 100644
--- a/.github/workflows/native-unix.yml
+++ b/.github/workflows/native-unix.yml
@@ -501,7 +501,6 @@ jobs:
- uses: r-lib/actions/setup-r-dependencies@v2
with:
- pak-version: devel
extra-packages: any::rcmdcheck, local::../adbcdrivermanager
needs: check
working-directory: r/${{ matrix.config.pkg }}
diff --git a/c/driver/postgresql/CMakeLists.txt
b/c/driver/postgresql/CMakeLists.txt
index 4d194f8..e14ea20 100644
--- a/c/driver/postgresql/CMakeLists.txt
+++ b/c/driver/postgresql/CMakeLists.txt
@@ -43,7 +43,6 @@ add_arrow_lib(adbc_driver_postgresql
database.cc
postgresql.cc
statement.cc
- type.cc
OUTPUTS
ADBC_LIBRARIES
CMAKE_PACKAGE_NAME
@@ -77,6 +76,7 @@ if(ADBC_BUILD_TESTS)
PREFIX
adbc
SOURCES
+ postgres_type_test.cc
postgresql_test.cc
../../validation/adbc_validation.cc
../../validation/adbc_validation_util.cc
diff --git a/c/driver/postgresql/CMakeUserPresets.example.json
b/c/driver/postgresql/CMakeUserPresets.example.json
new file mode 100644
index 0000000..986cc5e
--- /dev/null
+++ b/c/driver/postgresql/CMakeUserPresets.example.json
@@ -0,0 +1,33 @@
+{
+ "version": 3,
+ "cmakeMinimumRequired": {
+ "major": 3,
+ "minor": 21,
+ "patch": 0
+ },
+ "configurePresets": [
+ {
+ "name": "user-local",
+ "displayName": "(user) local build",
+ "cacheVariables": {
+ "CMAKE_BUILD_TYPE": "Debug",
+ "ADBC_BUILD_TESTS": "ON"
+ },
+ "environment": {
+ "PKG_CONFIG_PATH": ""
+ }
+ }
+ ],
+ "testPresets": [
+ {
+ "name": "user-test-preset",
+ "description": "",
+ "displayName": "(user) test preset",
+ "configurePreset": "user-local",
+ "environment": {
+ "CTEST_OUTPUT_ON_FAILURE": "1",
+ "ADBC_POSTGRESQL_TEST_URI":
"postgresql://localhost:5432/postgres?user=postgres&password=password"
+ }
+ }
+ ]
+}
diff --git a/c/driver/postgresql/README.md b/c/driver/postgresql/README.md
index 15859d0..cc5a3df 100644
--- a/c/driver/postgresql/README.md
+++ b/c/driver/postgresql/README.md
@@ -50,6 +50,15 @@ $ docker run -it --rm \
postgres
```
+Alternatively use the `docker compose` provided by ADBC to manage the test
+database container.
+
+```shell
+$ docker compose up postgres_test
+# When finished:
+# docker compose down postgres_test
+```
+
Then, to run the tests, set the environment variable specifying the
PostgreSQL URI before running tests:
@@ -57,3 +66,7 @@ PostgreSQL URI before running tests:
$ export
ADBC_POSTGRESQL_TEST_URI=postgresql://localhost:5432/postgres?user=postgres&password=password
$ ctest
```
+
+Users of VSCode can use the CMake extension with the supplied
CMakeUserPresets.json
+example to supply the required CMake and environment variables required to
build and
+run tests.
diff --git a/c/driver/postgresql/connection.cc
b/c/driver/postgresql/connection.cc
index 38cba57..b61ebbe 100644
--- a/c/driver/postgresql/connection.cc
+++ b/c/driver/postgresql/connection.cc
@@ -58,7 +58,7 @@ AdbcStatusCode PostgresConnection::Init(struct AdbcDatabase*
database,
}
database_ =
*reinterpret_cast<std::shared_ptr<PostgresDatabase>*>(database->private_data);
- type_mapping_ = database_->type_mapping();
+ type_resolver_ = database_->type_resolver();
return database_->Connect(&conn_, error);
}
diff --git a/c/driver/postgresql/connection.h b/c/driver/postgresql/connection.h
index 9105c61..6f63d66 100644
--- a/c/driver/postgresql/connection.h
+++ b/c/driver/postgresql/connection.h
@@ -23,7 +23,7 @@
#include <adbc.h>
#include <libpq-fe.h>
-#include "type.h"
+#include "postgres_type.h"
namespace adbcpq {
class PostgresDatabase;
@@ -41,11 +41,13 @@ class PostgresConnection {
AdbcStatusCode SetOption(const char* key, const char* value, struct
AdbcError* error);
PGconn* conn() const { return conn_; }
- const std::shared_ptr<TypeMapping>& type_mapping() const { return
type_mapping_; }
+ const std::shared_ptr<PostgresTypeResolver>& type_resolver() const {
+ return type_resolver_;
+ }
private:
std::shared_ptr<PostgresDatabase> database_;
- std::shared_ptr<TypeMapping> type_mapping_;
+ std::shared_ptr<PostgresTypeResolver> type_resolver_;
PGconn* conn_;
bool autocommit_;
};
diff --git a/c/driver/postgresql/database.cc b/c/driver/postgresql/database.cc
index bc5e0ec..08ff0fc 100644
--- a/c/driver/postgresql/database.cc
+++ b/c/driver/postgresql/database.cc
@@ -19,6 +19,8 @@
#include <cstring>
#include <memory>
+#include <utility>
+#include <vector>
#include <adbc.h>
#include <libpq-fe.h>
@@ -29,52 +31,13 @@
namespace adbcpq {
PostgresDatabase::PostgresDatabase() : open_connections_(0) {
- type_mapping_ = std::make_shared<TypeMapping>();
+ type_resolver_ = std::make_shared<PostgresTypeResolver>();
}
PostgresDatabase::~PostgresDatabase() = default;
AdbcStatusCode PostgresDatabase::Init(struct AdbcError* error) {
// Connect to validate the parameters.
- PGconn* conn = nullptr;
- AdbcStatusCode final_status = Connect(&conn, error);
- if (final_status != ADBC_STATUS_OK) {
- return final_status;
- }
-
- // Build the type mapping table.
- const std::string kTypeQuery = R"(
-SELECT
- oid,
- typname,
- typreceive
-FROM
- pg_catalog.pg_type
-)";
-
- pg_result* result = PQexec(conn, kTypeQuery.c_str());
- ExecStatusType pq_status = PQresultStatus(result);
- if (pq_status == PGRES_TUPLES_OK) {
- int num_rows = PQntuples(result);
- for (int row = 0; row < num_rows; row++) {
- const uint32_t oid = static_cast<uint32_t>(
- std::strtol(PQgetvalue(result, row, 0), /*str_end=*/nullptr,
/*base=*/10));
- const char* typname = PQgetvalue(result, row, 1);
- const char* typreceive = PQgetvalue(result, row, 2);
-
- type_mapping_->Insert(oid, typname, typreceive);
- }
- } else {
- SetError(error, "Failed to build type mapping table: ",
PQerrorMessage(conn));
- final_status = ADBC_STATUS_IO;
- }
- PQclear(result);
-
- // Disconnect since PostgreSQL connections can be heavy.
- {
- AdbcStatusCode status = Disconnect(&conn, error);
- if (status != ADBC_STATUS_OK) final_status = status;
- }
- return final_status;
+ return RebuildTypeResolver(error);
}
AdbcStatusCode PostgresDatabase::Release(struct AdbcError* error) {
@@ -121,4 +84,179 @@ AdbcStatusCode PostgresDatabase::Disconnect(PGconn** conn,
struct AdbcError* err
}
return ADBC_STATUS_OK;
}
+
+// Helpers for building the type resolver from queries
+static inline int32_t InsertPgAttributeResult(
+ pg_result* result, const std::shared_ptr<PostgresTypeResolver>& resolver);
+
+static inline int32_t InsertPgTypeResult(
+ pg_result* result, const std::shared_ptr<PostgresTypeResolver>& resolver);
+
+AdbcStatusCode PostgresDatabase::RebuildTypeResolver(struct AdbcError* error) {
+ PGconn* conn = nullptr;
+ AdbcStatusCode final_status = Connect(&conn, error);
+ if (final_status != ADBC_STATUS_OK) {
+ return final_status;
+ }
+
+ // We need a few queries to build the resolver. The current strategy might
+ // fail for some recursive definitions (e.g., arrays of records of arrays).
+ // First, one on the pg_attribute table to resolve column names/oids for
+ // record types.
+ const std::string kColumnsQuery = R"(
+SELECT
+ attrelid,
+ attname,
+ atttypid
+FROM
+ pg_catalog.pg_attribute
+ORDER BY
+ attrelid, attnum
+)";
+
+ // Second, a query of the pg_type table. This query may need a few attempts
to handle
+ // recursive definitions (e.g., record types with array column). This
currently won't
+ // handle range types because those rows don't have child OID information.
Arrays types
+ // are inserted after a successful insert of the element type.
+ const std::string kTypeQuery = R"(
+SELECT
+ oid,
+ typname,
+ typreceive,
+ typbasetype,
+ typarray,
+ typrelid
+FROM
+ pg_catalog.pg_type
+WHERE
+ (typreceive != 0 OR typname = 'aclitem') AND typtype != 'r' AND
typreceive::TEXT != 'array_recv'
+ORDER BY
+ oid
+)";
+
+ // Create a new type resolver (this instance's type_resolver_ member
+ // will be updated at the end if this succeeds).
+ auto resolver = std::make_shared<PostgresTypeResolver>();
+
+ // Insert record type definitions (this includes table schemas)
+ pg_result* result = PQexec(conn, kColumnsQuery.c_str());
+ ExecStatusType pq_status = PQresultStatus(result);
+ if (pq_status == PGRES_TUPLES_OK) {
+ InsertPgAttributeResult(result, resolver);
+ } else {
+ SetError(error, "Failed to build type mapping table: ",
PQerrorMessage(conn));
+ final_status = ADBC_STATUS_IO;
+ }
+
+ PQclear(result);
+
+ // Attempt filling the resolver a few times to handle recursive definitions.
+ int32_t max_attempts = 3;
+ for (int32_t i = 0; i < max_attempts; i++) {
+ result = PQexec(conn, kTypeQuery.c_str());
+ ExecStatusType pq_status = PQresultStatus(result);
+ if (pq_status == PGRES_TUPLES_OK) {
+ InsertPgTypeResult(result, resolver);
+ } else {
+ SetError(error, "Failed to build type mapping table: ",
PQerrorMessage(conn));
+ final_status = ADBC_STATUS_IO;
+ }
+
+ PQclear(result);
+ if (final_status != ADBC_STATUS_OK) {
+ break;
+ }
+ }
+
+ // Disconnect since PostgreSQL connections can be heavy.
+ {
+ AdbcStatusCode status = Disconnect(&conn, error);
+ if (status != ADBC_STATUS_OK) final_status = status;
+ }
+
+ if (final_status == ADBC_STATUS_OK) {
+ type_resolver_ = std::move(resolver);
+ }
+
+ return final_status;
+}
+
+static inline int32_t InsertPgAttributeResult(
+ pg_result* result, const std::shared_ptr<PostgresTypeResolver>& resolver) {
+ int num_rows = PQntuples(result);
+ std::vector<std::pair<std::string, uint32_t>> columns;
+ uint32_t current_type_oid = 0;
+ int32_t n_added = 0;
+
+ for (int row = 0; row < num_rows; row++) {
+ const uint32_t type_oid = static_cast<uint32_t>(
+ std::strtol(PQgetvalue(result, row, 0), /*str_end=*/nullptr,
/*base=*/10));
+ const char* col_name = PQgetvalue(result, row, 1);
+ const uint32_t col_oid = static_cast<uint32_t>(
+ std::strtol(PQgetvalue(result, row, 2), /*str_end=*/nullptr,
/*base=*/10));
+
+ if (type_oid != current_type_oid && !columns.empty()) {
+ resolver->InsertClass(current_type_oid, columns);
+ columns.clear();
+ current_type_oid = type_oid;
+ n_added++;
+ }
+
+ columns.push_back({col_name, col_oid});
+ }
+
+ if (!columns.empty()) {
+ resolver->InsertClass(current_type_oid, columns);
+ n_added++;
+ }
+
+ return n_added;
+}
+
+static inline int32_t InsertPgTypeResult(
+ pg_result* result, const std::shared_ptr<PostgresTypeResolver>& resolver) {
+ int num_rows = PQntuples(result);
+ PostgresTypeResolver::Item item;
+ int32_t n_added = 0;
+
+ for (int row = 0; row < num_rows; row++) {
+ const uint32_t oid = static_cast<uint32_t>(
+ std::strtol(PQgetvalue(result, row, 0), /*str_end=*/nullptr,
/*base=*/10));
+ const char* typname = PQgetvalue(result, row, 1);
+ const char* typreceive = PQgetvalue(result, row, 2);
+ const uint32_t typbasetype = static_cast<uint32_t>(
+ std::strtol(PQgetvalue(result, row, 3), /*str_end=*/nullptr,
/*base=*/10));
+ const uint32_t typarray = static_cast<uint32_t>(
+ std::strtol(PQgetvalue(result, row, 4), /*str_end=*/nullptr,
/*base=*/10));
+ const uint32_t typrelid = static_cast<uint32_t>(
+ std::strtol(PQgetvalue(result, row, 5), /*str_end=*/nullptr,
/*base=*/10));
+
+ // Special case the aclitem because it shows up in a bunch of internal
tables
+ if (strcmp(typname, "aclitem") == 0) {
+ typreceive = "aclitem_recv";
+ }
+
+ item.oid = oid;
+ item.typname = typname;
+ item.typreceive = typreceive;
+ item.class_oid = typrelid;
+ item.base_oid = typbasetype;
+
+ int result = resolver->Insert(item, nullptr);
+
+ // If there's an array type and the insert succeeded, add that now too
+ if (result == NANOARROW_OK && typarray != 0) {
+ std::string array_typname = StringBuilder("_", typname);
+ item.oid = typarray;
+ item.typname = array_typname.c_str();
+ item.typreceive = "array_recv";
+ item.child_oid = oid;
+
+ resolver->Insert(item, nullptr);
+ }
+ }
+
+ return n_added;
+}
+
} // namespace adbcpq
diff --git a/c/driver/postgresql/database.h b/c/driver/postgresql/database.h
index 9c51a77..f104647 100644
--- a/c/driver/postgresql/database.h
+++ b/c/driver/postgresql/database.h
@@ -24,7 +24,7 @@
#include <adbc.h>
#include <libpq-fe.h>
-#include "type.h"
+#include "postgres_type.h"
namespace adbcpq {
class PostgresDatabase {
@@ -42,12 +42,15 @@ class PostgresDatabase {
AdbcStatusCode Connect(PGconn** conn, struct AdbcError* error);
AdbcStatusCode Disconnect(PGconn** conn, struct AdbcError* error);
+ const std::shared_ptr<PostgresTypeResolver>& type_resolver() const {
+ return type_resolver_;
+ }
- const std::shared_ptr<TypeMapping>& type_mapping() const { return
type_mapping_; }
+ AdbcStatusCode RebuildTypeResolver(struct AdbcError* error);
private:
int32_t open_connections_;
std::string uri_;
- std::shared_ptr<TypeMapping> type_mapping_;
+ std::shared_ptr<PostgresTypeResolver> type_resolver_;
};
} // namespace adbcpq
diff --git a/c/driver/postgresql/postgres_type.h
b/c/driver/postgresql/postgres_type.h
new file mode 100644
index 0000000..b327eb5
--- /dev/null
+++ b/c/driver/postgresql/postgres_type.h
@@ -0,0 +1,897 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cerrno>
+#include <cstdint>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <nanoarrow/nanoarrow.hpp>
+
+namespace adbcpq {
+
+// An enum of the types available in most Postgres pg_type tables
+enum class PostgresTypeId {
+ kUninitialized,
+ kAclitem,
+ kAnyarray,
+ kAnycompatiblearray,
+ kArray,
+ kBit,
+ kBool,
+ kBox,
+ kBpchar,
+ kBrinBloomSummary,
+ kBrinMinmaxMultiSummary,
+ kBytea,
+ kCash,
+ kChar,
+ kCidr,
+ kCid,
+ kCircle,
+ kCstring,
+ kDate,
+ kDomain,
+ kFloat4,
+ kFloat8,
+ kInet,
+ kInt2,
+ kInt2vector,
+ kInt4,
+ kInt8,
+ kInterval,
+ kJson,
+ kJsonb,
+ kJsonpath,
+ kLine,
+ kLseg,
+ kMacaddr,
+ kMacaddr8,
+ kMultirange,
+ kName,
+ kNumeric,
+ kOid,
+ kOidvector,
+ kPath,
+ kPgDdlCommand,
+ kPgDependencies,
+ kPgLsn,
+ kPgMcvList,
+ kPgNdistinct,
+ kPgNodeTree,
+ kPgSnapshot,
+ kPoint,
+ kPoly,
+ kRange,
+ kRecord,
+ kRegclass,
+ kRegcollation,
+ kRegconfig,
+ kRegdictionary,
+ kRegnamespace,
+ kRegoperator,
+ kRegoper,
+ kRegprocedure,
+ kRegproc,
+ kRegrole,
+ kRegtype,
+ kText,
+ kTid,
+ kTime,
+ kTimestamp,
+ kTimestamptz,
+ kTimetz,
+ kTsquery,
+ kTsvector,
+ kTxidSnapshot,
+ kUnknown,
+ kUuid,
+ kVarbit,
+ kVarchar,
+ kVoid,
+ kXid8,
+ kXid,
+ kXml,
+};
+
+// Returns the receive function name as defined in the typrecieve column
+// of the pg_type table. This name is the one that gets used to look up
+// the PostgresTypeId.
+static inline const char* PostgresTyprecv(PostgresTypeId type_id);
+
+// Returns a likely typname value for a given PostgresTypeId. This is useful
+// for testing and error messages but may not be the actual value present
+// in the pg_type typname column.
+static inline const char* PostgresTypname(PostgresTypeId type_id);
+
+// A vector of all type IDs, optionally including the nested types
PostgresTypeId::ARRAY,
+// PostgresTypeId::DOMAIN_, PostgresTypeId::RECORD, and PostgresTypeId::RANGE.
+static inline std::vector<PostgresTypeId> PostgresTypeIdAll(bool nested =
true);
+
+class PostgresTypeResolver;
+
+// An abstraction of a (potentially nested and/or parameterized) Postgres
+// data type. This class is where default type conversion to/from Arrow
+// is defined. It is intentionally copyable.
+class PostgresType {
+ public:
+ explicit PostgresType(PostgresTypeId type_id) : oid_(0), type_id_(type_id) {}
+
+ PostgresType() : PostgresType(PostgresTypeId::kUninitialized) {}
+
+ void AppendChild(const std::string& field_name, const PostgresType& type) {
+ PostgresType child(type);
+ children_.push_back(child.WithFieldName(field_name));
+ }
+
+ PostgresType WithFieldName(const std::string& field_name) const {
+ PostgresType out(*this);
+ out.field_name_ = field_name;
+ return out;
+ }
+
+ PostgresType WithPgTypeInfo(uint32_t oid, const std::string& typname) const {
+ PostgresType out(*this);
+ out.oid_ = oid;
+ out.typname_ = typname;
+ return out;
+ }
+
+ PostgresType Array(uint32_t oid = 0, const std::string& typname = "") const {
+ PostgresType out(PostgresTypeId::kArray);
+ out.AppendChild("item", *this);
+ out.oid_ = oid;
+ out.typname_ = typname;
+ return out;
+ }
+
+ PostgresType Domain(uint32_t oid, const std::string& typname) {
+ return WithPgTypeInfo(oid, typname);
+ }
+
+ PostgresType Range(uint32_t oid = 0, const std::string& typname = "") const {
+ PostgresType out(PostgresTypeId::kRange);
+ out.AppendChild("item", *this);
+ out.oid_ = oid;
+ out.typname_ = typname;
+ return out;
+ }
+
+ uint32_t oid() const { return oid_; }
+ PostgresTypeId type_id() const { return type_id_; }
+ const std::string& typname() const { return typname_; }
+ const std::string& field_name() const { return field_name_; }
+ int64_t n_children() const { return static_cast<int64_t>(children_.size()); }
+ const PostgresType& child(int64_t i) const { return children_[i]; }
+
+ // Sets appropriate fields of an ArrowSchema that has been initialized using
+ // ArrowSchemaInit. This is a recursive operation (i.e., nested types will
+ // initialize and set the appropriate number of children). Returns
NANOARROW_OK
+ // on success and perhaps ENOMEM if memory cannot be allocated. Types that
+ // do not have a corresponding Arrow type are returned as Binary with field
+ // metadata ADBC:posgresql:typname. These types can be represented as their
+ // binary COPY representation in the output.
+ ArrowErrorCode SetSchema(ArrowSchema* schema) const {
+ switch (type_id_) {
+ case PostgresTypeId::kBool:
+ NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema,
NANOARROW_TYPE_BOOL));
+ break;
+ case PostgresTypeId::kInt2:
+ NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema,
NANOARROW_TYPE_INT16));
+ break;
+ case PostgresTypeId::kInt4:
+ NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema,
NANOARROW_TYPE_INT32));
+ break;
+ case PostgresTypeId::kInt8:
+ NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema,
NANOARROW_TYPE_INT64));
+ break;
+ case PostgresTypeId::kFloat4:
+ NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema,
NANOARROW_TYPE_FLOAT));
+ break;
+ case PostgresTypeId::kFloat8:
+ NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema,
NANOARROW_TYPE_DOUBLE));
+ break;
+ case PostgresTypeId::kChar:
+ case PostgresTypeId::kBpchar:
+ case PostgresTypeId::kVarchar:
+ case PostgresTypeId::kText:
+ NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema,
NANOARROW_TYPE_STRING));
+ break;
+ case PostgresTypeId::kBytea:
+ NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema,
NANOARROW_TYPE_BINARY));
+ break;
+
+ case PostgresTypeId::kRecord:
+ NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeStruct(schema,
n_children()));
+ for (int64_t i = 0; i < n_children(); i++) {
+ NANOARROW_RETURN_NOT_OK(children_[i].SetSchema(schema->children[i]));
+ }
+ break;
+
+ case PostgresTypeId::kArray:
+ NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema,
NANOARROW_TYPE_LIST));
+ NANOARROW_RETURN_NOT_OK(children_[0].SetSchema(schema->children[0]));
+ break;
+ default: {
+ // For any types we don't explicitly know how to deal with, we can
still
+ // return the bytes postgres gives us and attach the type name as
metadata
+ NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema,
NANOARROW_TYPE_BINARY));
+ nanoarrow::UniqueBuffer buffer;
+ ArrowMetadataBuilderInit(buffer.get(), nullptr);
+ NANOARROW_RETURN_NOT_OK(ArrowMetadataBuilderAppend(
+ buffer.get(), ArrowCharView("ADBC:postgresql:typname"),
+ ArrowCharView(typname_.c_str())));
+ NANOARROW_RETURN_NOT_OK(
+ ArrowSchemaSetMetadata(schema,
reinterpret_cast<char*>(buffer->data)));
+ break;
+ }
+ }
+
+ NANOARROW_RETURN_NOT_OK(ArrowSchemaSetName(schema, field_name_.c_str()));
+ return NANOARROW_OK;
+ }
+
+ static ArrowErrorCode FromSchema(const PostgresTypeResolver& resolver,
+ ArrowSchema* schema, PostgresType* out,
+ ArrowError* error);
+
+ private:
+ uint32_t oid_;
+ PostgresTypeId type_id_;
+ std::string typname_;
+ std::string field_name_;
+ std::vector<PostgresType> children_;
+};
+
+// Because type information is stored in a database's pg_type table, it can't
+// truly be resolved until runtime; however, querying the database's pg_type
table
+// for every result is unlikely to be reasonable. This class is a cache of
information
+// from the pg_type table with appropriate lookup tables to resolve a
PostgresType
+// instance based on a oid (which is the information that libpq provides when
+// inspecting a result object). Types can be added/removed from the pg_type
table
+// via SQL, so this cache may need to be periodically refreshed.
+class PostgresTypeResolver {
+ public:
+ struct Item {
+ uint32_t oid;
+ const char* typname;
+ const char* typreceive;
+ uint32_t child_oid;
+ uint32_t base_oid;
+ uint32_t class_oid;
+ };
+
+ PostgresTypeResolver() : base_(AllBase()) {}
+
+ // Place a resolved copy of a PostgresType with the appropriate oid in
type_out
+ // if NANOARROW_OK is returned or place a null-terminated error message into
error
+ // otherwise.
+ ArrowErrorCode Find(uint32_t oid, PostgresType* type_out, ArrowError* error)
const {
+ auto result = mapping_.find(oid);
+ if (result == mapping_.end()) {
+ ArrowErrorSet(error, "Postgres type with oid %ld not found",
+ static_cast<long>(oid)); // NOLINT(runtime/int)
+ return EINVAL;
+ }
+
+ *type_out = (*result).second;
+ return NANOARROW_OK;
+ }
+
+ ArrowErrorCode FindArray(uint32_t child_oid, PostgresType* type_out,
+ ArrowError* error) const {
+ auto array_oid_lookup = array_mapping_.find(child_oid);
+ if (array_oid_lookup == array_mapping_.end()) {
+ ArrowErrorSet(error, "Postgres array type with child oid %ld not found",
+ static_cast<long>(child_oid)); // NOLINT(runtime/int)
+ return EINVAL;
+ }
+
+ return Find(array_oid_lookup->second, type_out, error);
+ }
+
+ // Resolve the oid for a given type_id. Returns 0 if the oid cannot be
+ // resolved.
+ uint32_t GetOID(PostgresTypeId type_id) const {
+ auto result = reverse_mapping_.find(static_cast<int32_t>(type_id));
+ if (result == reverse_mapping_.end()) {
+ return 0;
+ } else {
+ return result->second;
+ }
+ }
+
+ // Insert a type into this resolver. Returns NANOARROW_OK on success
+ // or places a null-terminated error message into error otherwise. The order
+ // of Inserts matters: Non-array types must be inserted before the
corresponding
+ // array types and class definitions must be inserted before the
corresponding
+ // class type using InsertClass().
+ ArrowErrorCode Insert(const Item& item, ArrowError* error) {
+ auto result = base_.find(item.typreceive);
+ if (result == base_.end()) {
+ ArrowErrorSet(error, "Base type not found for type '%s' with receive
function '%s'",
+ item.typname, item.typreceive);
+ return ENOTSUP;
+ }
+
+ const PostgresType& base = result->second;
+ PostgresType type = base.WithPgTypeInfo(item.oid, item.typname);
+
+ switch (base.type_id()) {
+ case PostgresTypeId::kArray: {
+ PostgresType child;
+ NANOARROW_RETURN_NOT_OK(Find(item.child_oid, &child, error));
+ mapping_.insert({item.oid, child.Array(item.oid, item.typname)});
+ reverse_mapping_.insert({static_cast<int32_t>(base.type_id()),
item.oid});
+ array_mapping_.insert({child.oid(), item.oid});
+ break;
+ }
+
+ case PostgresTypeId::kRecord: {
+ std::vector<std::pair<std::string, uint32_t>> child_desc;
+ NANOARROW_RETURN_NOT_OK(ResolveClass(item.class_oid, &child_desc,
error));
+
+ PostgresType out(PostgresTypeId::kRecord);
+ for (const auto& child_item : child_desc) {
+ PostgresType child;
+ NANOARROW_RETURN_NOT_OK(Find(child_item.second, &child, error));
+ out.AppendChild(child_item.first, child);
+ }
+
+ mapping_.insert({item.oid, out.WithPgTypeInfo(item.oid,
item.typname)});
+ reverse_mapping_.insert({static_cast<int32_t>(base.type_id()),
item.oid});
+ break;
+ }
+
+ case PostgresTypeId::kDomain: {
+ PostgresType base_type;
+ NANOARROW_RETURN_NOT_OK(Find(item.base_oid, &base_type, error));
+ mapping_.insert({item.oid, base_type.Domain(item.oid, item.typname)});
+ reverse_mapping_.insert({static_cast<int32_t>(base.type_id()),
item.oid});
+ break;
+ }
+
+ case PostgresTypeId::kRange: {
+ PostgresType base_type;
+ NANOARROW_RETURN_NOT_OK(Find(item.base_oid, &base_type, error));
+ mapping_.insert({item.oid, base_type.Range(item.oid, item.typname)});
+ reverse_mapping_.insert({static_cast<int32_t>(base.type_id()),
item.oid});
+ break;
+ }
+
+ default:
+ mapping_.insert({item.oid, type});
+ reverse_mapping_.insert({static_cast<int32_t>(base.type_id()),
item.oid});
+ break;
+ }
+
+ return NANOARROW_OK;
+ }
+
+ // Insert a class definition. For the purposes of resolving a PostgresType
+ // instance, this is simply a vector of field_name: oid tuples. The specified
+ // OIDs need not have already been inserted into the type resolver. This
+ // information can be found in the pg_attribute table (attname and atttypoid,
+ // respectively).
+ void InsertClass(uint32_t oid,
+ const std::vector<std::pair<std::string, uint32_t>>& cls) {
+ classes_.insert({oid, cls});
+ }
+
+ private:
+ std::unordered_map<uint32_t, PostgresType> mapping_;
+ // We can't use PostgresTypeId as an unordered map key because there is no
+ // built-in hasher for an enum on gcc 4.8 (i.e., R 3.6 on Windows).
+ std::unordered_map<int32_t, uint32_t> reverse_mapping_;
+ std::unordered_map<uint32_t, uint32_t> array_mapping_;
+ std::unordered_map<uint32_t, std::vector<std::pair<std::string, uint32_t>>>
classes_;
+ std::unordered_map<std::string, PostgresType> base_;
+
+ ArrowErrorCode ResolveClass(uint32_t oid,
+ std::vector<std::pair<std::string, uint32_t>>*
out,
+ ArrowError* error) {
+ auto result = classes_.find(oid);
+ if (result == classes_.end()) {
+ ArrowErrorSet(error, "Class definition with oid %ld not found",
+ static_cast<long>(oid)); // NOLINT(runtime/int)
+ return EINVAL;
+ }
+
+ *out = result->second;
+ return NANOARROW_OK;
+ }
+
+ // Returns a sentinel PostgresType instance for each type and builds a lookup
+ // table based on the receive function name.
+ static std::unordered_map<std::string, PostgresType> AllBase() {
+ std::unordered_map<std::string, PostgresType> out;
+ for (PostgresTypeId type_id : PostgresTypeIdAll()) {
+ PostgresType type(type_id);
+ out.insert(
+ {PostgresTyprecv(type_id), type.WithPgTypeInfo(0,
PostgresTypname(type_id))});
+ }
+
+ return out;
+ }
+};
+
+inline ArrowErrorCode PostgresType::FromSchema(const PostgresTypeResolver&
resolver,
+ ArrowSchema* schema,
PostgresType* out,
+ ArrowError* error) {
+ ArrowSchemaView schema_view;
+ NANOARROW_RETURN_NOT_OK(ArrowSchemaViewInit(&schema_view, schema, error));
+
+ switch (schema_view.type) {
+ case NANOARROW_TYPE_BOOL:
+ return resolver.Find(resolver.GetOID(PostgresTypeId::kBool), out, error);
+ case NANOARROW_TYPE_INT8:
+ case NANOARROW_TYPE_UINT8:
+ case NANOARROW_TYPE_INT16:
+ return resolver.Find(resolver.GetOID(PostgresTypeId::kInt2), out, error);
+ case NANOARROW_TYPE_UINT16:
+ case NANOARROW_TYPE_INT32:
+ return resolver.Find(resolver.GetOID(PostgresTypeId::kInt4), out, error);
+ case NANOARROW_TYPE_UINT32:
+ case NANOARROW_TYPE_INT64:
+ return resolver.Find(resolver.GetOID(PostgresTypeId::kInt8), out, error);
+ case NANOARROW_TYPE_FLOAT:
+ return resolver.Find(resolver.GetOID(PostgresTypeId::kFloat4), out,
error);
+ case NANOARROW_TYPE_DOUBLE:
+ return resolver.Find(resolver.GetOID(PostgresTypeId::kFloat8), out,
error);
+ case NANOARROW_TYPE_STRING:
+ return resolver.Find(resolver.GetOID(PostgresTypeId::kText), out, error);
+ case NANOARROW_TYPE_BINARY:
+ case NANOARROW_TYPE_FIXED_SIZE_BINARY:
+ return resolver.Find(resolver.GetOID(PostgresTypeId::kBytea), out,
error);
+ case NANOARROW_TYPE_LIST:
+ case NANOARROW_TYPE_LARGE_LIST:
+ case NANOARROW_TYPE_FIXED_SIZE_LIST: {
+ PostgresType child;
+ NANOARROW_RETURN_NOT_OK(
+ PostgresType::FromSchema(resolver, schema->children[0], &child,
error));
+ return resolver.FindArray(child.oid(), out, error);
+ }
+
+ default:
+ ArrowErrorSet(error, "Can't map Arrow type '%s' to Postgres type",
+ ArrowTypeString(schema_view.type));
+ return ENOTSUP;
+ }
+}
+
+static inline const char* PostgresTyprecv(PostgresTypeId type_id) {
+ switch (type_id) {
+ case PostgresTypeId::kAclitem:
+ return "aclitem_recv";
+ case PostgresTypeId::kAnyarray:
+ return "anyarray_recv";
+ case PostgresTypeId::kAnycompatiblearray:
+ return "anycompatiblearray_recv";
+ case PostgresTypeId::kArray:
+ return "array_recv";
+ case PostgresTypeId::kBit:
+ return "bit_recv";
+ case PostgresTypeId::kBool:
+ return "boolrecv";
+ case PostgresTypeId::kBox:
+ return "box_recv";
+ case PostgresTypeId::kBpchar:
+ return "bpcharrecv";
+ case PostgresTypeId::kBrinBloomSummary:
+ return "brin_bloom_summary_recv";
+ case PostgresTypeId::kBrinMinmaxMultiSummary:
+ return "brin_minmax_multi_summary_recv";
+ case PostgresTypeId::kBytea:
+ return "bytearecv";
+ case PostgresTypeId::kCash:
+ return "cash_recv";
+ case PostgresTypeId::kChar:
+ return "charrecv";
+ case PostgresTypeId::kCidr:
+ return "cidr_recv";
+ case PostgresTypeId::kCid:
+ return "cidrecv";
+ case PostgresTypeId::kCircle:
+ return "circle_recv";
+ case PostgresTypeId::kCstring:
+ return "cstring_recv";
+ case PostgresTypeId::kDate:
+ return "date_recv";
+ case PostgresTypeId::kDomain:
+ return "domain_recv";
+ case PostgresTypeId::kFloat4:
+ return "float4recv";
+ case PostgresTypeId::kFloat8:
+ return "float8recv";
+ case PostgresTypeId::kInet:
+ return "inet_recv";
+ case PostgresTypeId::kInt2:
+ return "int2recv";
+ case PostgresTypeId::kInt2vector:
+ return "int2vectorrecv";
+ case PostgresTypeId::kInt4:
+ return "int4recv";
+ case PostgresTypeId::kInt8:
+ return "int8recv";
+ case PostgresTypeId::kInterval:
+ return "interval_recv";
+ case PostgresTypeId::kJson:
+ return "json_recv";
+ case PostgresTypeId::kJsonb:
+ return "jsonb_recv";
+ case PostgresTypeId::kJsonpath:
+ return "jsonpath_recv";
+ case PostgresTypeId::kLine:
+ return "line_recv";
+ case PostgresTypeId::kLseg:
+ return "lseg_recv";
+ case PostgresTypeId::kMacaddr:
+ return "macaddr_recv";
+ case PostgresTypeId::kMacaddr8:
+ return "macaddr8_recv";
+ case PostgresTypeId::kMultirange:
+ return "multirange_recv";
+ case PostgresTypeId::kName:
+ return "namerecv";
+ case PostgresTypeId::kNumeric:
+ return "numeric_recv";
+ case PostgresTypeId::kOid:
+ return "oidrecv";
+ case PostgresTypeId::kOidvector:
+ return "oidvectorrecv";
+ case PostgresTypeId::kPath:
+ return "path_recv";
+ case PostgresTypeId::kPgNodeTree:
+ return "pg_node_tree_recv";
+ case PostgresTypeId::kPgNdistinct:
+ return "pg_ndistinct_recv";
+ case PostgresTypeId::kPgDependencies:
+ return "pg_dependencies_recv";
+ case PostgresTypeId::kPgLsn:
+ return "pg_lsn_recv";
+ case PostgresTypeId::kPgMcvList:
+ return "pg_mcv_list_recv";
+ case PostgresTypeId::kPgDdlCommand:
+ return "pg_ddl_command_recv";
+ case PostgresTypeId::kPgSnapshot:
+ return "pg_snapshot_recv";
+ case PostgresTypeId::kPoint:
+ return "point_recv";
+ case PostgresTypeId::kPoly:
+ return "poly_recv";
+ case PostgresTypeId::kRange:
+ return "range_recv";
+ case PostgresTypeId::kRecord:
+ return "record_recv";
+ case PostgresTypeId::kRegclass:
+ return "regclassrecv";
+ case PostgresTypeId::kRegcollation:
+ return "regcollationrecv";
+ case PostgresTypeId::kRegconfig:
+ return "regconfigrecv";
+ case PostgresTypeId::kRegdictionary:
+ return "regdictionaryrecv";
+ case PostgresTypeId::kRegnamespace:
+ return "regnamespacerecv";
+ case PostgresTypeId::kRegoperator:
+ return "regoperatorrecv";
+ case PostgresTypeId::kRegoper:
+ return "regoperrecv";
+ case PostgresTypeId::kRegprocedure:
+ return "regprocedurerecv";
+ case PostgresTypeId::kRegproc:
+ return "regprocrecv";
+ case PostgresTypeId::kRegrole:
+ return "regrolerecv";
+ case PostgresTypeId::kRegtype:
+ return "regtyperecv";
+ case PostgresTypeId::kText:
+ return "textrecv";
+ case PostgresTypeId::kTid:
+ return "tidrecv";
+ case PostgresTypeId::kTime:
+ return "time_recv";
+ case PostgresTypeId::kTimestamp:
+ return "timestamp_recv";
+ case PostgresTypeId::kTimestamptz:
+ return "timestamptz_recv";
+ case PostgresTypeId::kTimetz:
+ return "timetz_recv";
+ case PostgresTypeId::kTsquery:
+ return "tsqueryrecv";
+ case PostgresTypeId::kTsvector:
+ return "tsvectorrecv";
+ case PostgresTypeId::kTxidSnapshot:
+ return "txid_snapshot_recv";
+ case PostgresTypeId::kUnknown:
+ return "unknownrecv";
+ case PostgresTypeId::kUuid:
+ return "uuid_recv";
+ case PostgresTypeId::kVarbit:
+ return "varbit_recv";
+ case PostgresTypeId::kVarchar:
+ return "varcharrecv";
+ case PostgresTypeId::kVoid:
+ return "void_recv";
+ case PostgresTypeId::kXid8:
+ return "xid8recv";
+ case PostgresTypeId::kXid:
+ return "xidrecv";
+ case PostgresTypeId::kXml:
+ return "xml_recv";
+ default:
+ return "";
+ }
+}
+
+static inline const char* PostgresTypname(PostgresTypeId type_id) {
+ switch (type_id) {
+ case PostgresTypeId::kAclitem:
+ return "aclitem";
+ case PostgresTypeId::kAnyarray:
+ return "anyarray";
+ case PostgresTypeId::kAnycompatiblearray:
+ return "anycompatiblearray";
+ case PostgresTypeId::kArray:
+ return "array";
+ case PostgresTypeId::kBit:
+ return "bit";
+ case PostgresTypeId::kBool:
+ return "bool";
+ case PostgresTypeId::kBox:
+ return "box";
+ case PostgresTypeId::kBpchar:
+ return "bpchar";
+ case PostgresTypeId::kBrinBloomSummary:
+ return "brin_bloom_summary";
+ case PostgresTypeId::kBrinMinmaxMultiSummary:
+ return "brin_minmax_multi_summary";
+ case PostgresTypeId::kBytea:
+ return "bytea";
+ case PostgresTypeId::kCash:
+ return "cash";
+ case PostgresTypeId::kChar:
+ return "char";
+ case PostgresTypeId::kCidr:
+ return "cidr";
+ case PostgresTypeId::kCid:
+ return "cid";
+ case PostgresTypeId::kCircle:
+ return "circle";
+ case PostgresTypeId::kCstring:
+ return "cstring";
+ case PostgresTypeId::kDate:
+ return "date";
+ case PostgresTypeId::kDomain:
+ return "domain";
+ case PostgresTypeId::kFloat4:
+ return "float4";
+ case PostgresTypeId::kFloat8:
+ return "float8";
+ case PostgresTypeId::kInet:
+ return "inet";
+ case PostgresTypeId::kInt2:
+ return "int2";
+ case PostgresTypeId::kInt2vector:
+ return "int2vector";
+ case PostgresTypeId::kInt4:
+ return "int4";
+ case PostgresTypeId::kInt8:
+ return "int8";
+ case PostgresTypeId::kInterval:
+ return "interval";
+ case PostgresTypeId::kJson:
+ return "json";
+ case PostgresTypeId::kJsonb:
+ return "jsonb";
+ case PostgresTypeId::kJsonpath:
+ return "jsonpath";
+ case PostgresTypeId::kLine:
+ return "line";
+ case PostgresTypeId::kLseg:
+ return "lseg";
+ case PostgresTypeId::kMacaddr:
+ return "macaddr";
+ case PostgresTypeId::kMacaddr8:
+ return "macaddr8";
+ case PostgresTypeId::kMultirange:
+ return "multirange";
+ case PostgresTypeId::kName:
+ return "name";
+ case PostgresTypeId::kNumeric:
+ return "numeric";
+ case PostgresTypeId::kOid:
+ return "oid";
+ case PostgresTypeId::kOidvector:
+ return "oidvector";
+ case PostgresTypeId::kPath:
+ return "path";
+ case PostgresTypeId::kPgNodeTree:
+ return "pg_node_tree";
+ case PostgresTypeId::kPgNdistinct:
+ return "pg_ndistinct";
+ case PostgresTypeId::kPgDependencies:
+ return "pg_dependencies";
+ case PostgresTypeId::kPgLsn:
+ return "pg_lsn";
+ case PostgresTypeId::kPgMcvList:
+ return "pg_mcv_list";
+ case PostgresTypeId::kPgDdlCommand:
+ return "pg_ddl_command";
+ case PostgresTypeId::kPgSnapshot:
+ return "pg_snapshot";
+ case PostgresTypeId::kPoint:
+ return "point";
+ case PostgresTypeId::kPoly:
+ return "poly";
+ case PostgresTypeId::kRange:
+ return "range";
+ case PostgresTypeId::kRecord:
+ return "record";
+ case PostgresTypeId::kRegclass:
+ return "regclass";
+ case PostgresTypeId::kRegcollation:
+ return "regcollation";
+ case PostgresTypeId::kRegconfig:
+ return "regconfig";
+ case PostgresTypeId::kRegdictionary:
+ return "regdictionary";
+ case PostgresTypeId::kRegnamespace:
+ return "regnamespace";
+ case PostgresTypeId::kRegoperator:
+ return "regoperator";
+ case PostgresTypeId::kRegoper:
+ return "regoper";
+ case PostgresTypeId::kRegprocedure:
+ return "regprocedure";
+ case PostgresTypeId::kRegproc:
+ return "regproc";
+ case PostgresTypeId::kRegrole:
+ return "regrole";
+ case PostgresTypeId::kRegtype:
+ return "regtype";
+ case PostgresTypeId::kText:
+ return "text";
+ case PostgresTypeId::kTid:
+ return "tid";
+ case PostgresTypeId::kTime:
+ return "time";
+ case PostgresTypeId::kTimestamp:
+ return "timestamp";
+ case PostgresTypeId::kTimestamptz:
+ return "timestamptz";
+ case PostgresTypeId::kTimetz:
+ return "timetz";
+ case PostgresTypeId::kTsquery:
+ return "tsquery";
+ case PostgresTypeId::kTsvector:
+ return "tsvector";
+ case PostgresTypeId::kTxidSnapshot:
+ return "txid_snapshot";
+ case PostgresTypeId::kUnknown:
+ return "unknown";
+ case PostgresTypeId::kUuid:
+ return "uuid";
+ case PostgresTypeId::kVarbit:
+ return "varbit";
+ case PostgresTypeId::kVarchar:
+ return "varchar";
+ case PostgresTypeId::kVoid:
+ return "void";
+ case PostgresTypeId::kXid8:
+ return "xid8";
+ case PostgresTypeId::kXid:
+ return "xid";
+ case PostgresTypeId::kXml:
+ return "xml";
+ default:
+ return "";
+ }
+}
+
+static inline std::vector<PostgresTypeId> PostgresTypeIdAll(bool nested) {
+ std::vector<PostgresTypeId> base = {PostgresTypeId::kAclitem,
+ PostgresTypeId::kAnyarray,
+ PostgresTypeId::kAnycompatiblearray,
+ PostgresTypeId::kBit,
+ PostgresTypeId::kBool,
+ PostgresTypeId::kBox,
+ PostgresTypeId::kBpchar,
+ PostgresTypeId::kBrinBloomSummary,
+ PostgresTypeId::kBrinMinmaxMultiSummary,
+ PostgresTypeId::kBytea,
+ PostgresTypeId::kCash,
+ PostgresTypeId::kChar,
+ PostgresTypeId::kCidr,
+ PostgresTypeId::kCid,
+ PostgresTypeId::kCircle,
+ PostgresTypeId::kCstring,
+ PostgresTypeId::kDate,
+ PostgresTypeId::kFloat4,
+ PostgresTypeId::kFloat8,
+ PostgresTypeId::kInet,
+ PostgresTypeId::kInt2,
+ PostgresTypeId::kInt2vector,
+ PostgresTypeId::kInt4,
+ PostgresTypeId::kInt8,
+ PostgresTypeId::kInterval,
+ PostgresTypeId::kJson,
+ PostgresTypeId::kJsonb,
+ PostgresTypeId::kJsonpath,
+ PostgresTypeId::kLine,
+ PostgresTypeId::kLseg,
+ PostgresTypeId::kMacaddr,
+ PostgresTypeId::kMacaddr8,
+ PostgresTypeId::kMultirange,
+ PostgresTypeId::kName,
+ PostgresTypeId::kNumeric,
+ PostgresTypeId::kOid,
+ PostgresTypeId::kOidvector,
+ PostgresTypeId::kPath,
+ PostgresTypeId::kPgNodeTree,
+ PostgresTypeId::kPgNdistinct,
+ PostgresTypeId::kPgDependencies,
+ PostgresTypeId::kPgLsn,
+ PostgresTypeId::kPgMcvList,
+ PostgresTypeId::kPgDdlCommand,
+ PostgresTypeId::kPgSnapshot,
+ PostgresTypeId::kPoint,
+ PostgresTypeId::kPoly,
+ PostgresTypeId::kRegclass,
+ PostgresTypeId::kRegcollation,
+ PostgresTypeId::kRegconfig,
+ PostgresTypeId::kRegdictionary,
+ PostgresTypeId::kRegnamespace,
+ PostgresTypeId::kRegoperator,
+ PostgresTypeId::kRegoper,
+ PostgresTypeId::kRegprocedure,
+ PostgresTypeId::kRegproc,
+ PostgresTypeId::kRegrole,
+ PostgresTypeId::kRegtype,
+ PostgresTypeId::kText,
+ PostgresTypeId::kTid,
+ PostgresTypeId::kTime,
+ PostgresTypeId::kTimestamp,
+ PostgresTypeId::kTimestamptz,
+ PostgresTypeId::kTimetz,
+ PostgresTypeId::kTsquery,
+ PostgresTypeId::kTsvector,
+ PostgresTypeId::kTxidSnapshot,
+ PostgresTypeId::kUnknown,
+ PostgresTypeId::kUuid,
+ PostgresTypeId::kVarbit,
+ PostgresTypeId::kVarchar,
+ PostgresTypeId::kVoid,
+ PostgresTypeId::kXid8,
+ PostgresTypeId::kXid,
+ PostgresTypeId::kXml};
+
+ if (nested) {
+ base.push_back(PostgresTypeId::kArray);
+ base.push_back(PostgresTypeId::kRecord);
+ base.push_back(PostgresTypeId::kRange);
+ base.push_back(PostgresTypeId::kDomain);
+ }
+
+ return base;
+}
+
+} // namespace adbcpq
diff --git a/c/driver/postgresql/postgres_type_test.cc
b/c/driver/postgresql/postgres_type_test.cc
new file mode 100644
index 0000000..74fe53c
--- /dev/null
+++ b/c/driver/postgresql/postgres_type_test.cc
@@ -0,0 +1,405 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <utility>
+
+#include <gtest/gtest.h>
+#include <nanoarrow/nanoarrow.h>
+
+#include "postgres_type.h"
+
+using adbcpq::PostgresType;
+using adbcpq::PostgresTypeId;
+using adbcpq::PostgresTypeResolver;
+
+class MockTypeResolver : public PostgresTypeResolver {
+ public:
+ ArrowErrorCode Init() {
+ auto all_types = adbcpq::PostgresTypeIdAll(false);
+ PostgresTypeResolver::Item item;
+ item.oid = 0;
+
+ // Insert all the base types
+ for (auto type_id : all_types) {
+ std::string typreceive = adbcpq::PostgresTyprecv(type_id);
+ std::string typname = adbcpq::PostgresTypname(type_id);
+ item.oid++;
+ item.typname = typname.c_str();
+ item.typreceive = typreceive.c_str();
+ NANOARROW_RETURN_NOT_OK(Insert(item, nullptr));
+ }
+
+ // Insert one of each nested type
+ item.oid++;
+ item.typname = "_bool";
+ item.typreceive = "array_recv";
+ item.child_oid = GetOID(PostgresTypeId::kBool);
+ NANOARROW_RETURN_NOT_OK(Insert(item, nullptr));
+
+ item.oid++;
+ item.typname = "boolrange";
+ item.typreceive = "range_recv";
+ item.base_oid = GetOID(PostgresTypeId::kBool);
+ NANOARROW_RETURN_NOT_OK(Insert(item, nullptr));
+
+ item.oid++;
+ item.typname = "custombool";
+ item.typreceive = "domain_recv";
+ item.base_oid = GetOID(PostgresTypeId::kBool);
+ NANOARROW_RETURN_NOT_OK(Insert(item, nullptr));
+
+ item.oid++;
+ uint32_t class_oid = item.oid;
+ std::vector<std::pair<std::string, uint32_t>> record_fields = {
+ {"int4_col", GetOID(PostgresTypeId::kInt4)},
+ {"text_col", GetOID(PostgresTypeId::kText)}};
+ InsertClass(class_oid, std::move(record_fields));
+
+ item.oid++;
+ item.typname = "customrecord";
+ item.typreceive = "record_recv";
+ item.class_oid = class_oid;
+
+ NANOARROW_RETURN_NOT_OK(Insert(item, nullptr));
+ return NANOARROW_OK;
+ }
+};
+
+TEST(PostgresTypeTest, PostgresTypeBasic) {
+ PostgresType type(PostgresTypeId::kBool);
+ EXPECT_EQ(type.field_name(), "");
+ EXPECT_EQ(type.typname(), "");
+ EXPECT_EQ(type.type_id(), PostgresTypeId::kBool);
+ EXPECT_EQ(type.oid(), 0);
+ EXPECT_EQ(type.n_children(), 0);
+
+ PostgresType with_info = type.WithPgTypeInfo(1234, "some_typename");
+ EXPECT_EQ(with_info.oid(), 1234);
+ EXPECT_EQ(with_info.typname(), "some_typename");
+ EXPECT_EQ(with_info.type_id(), type.type_id());
+
+ PostgresType with_name = type.WithFieldName("some name");
+ EXPECT_EQ(with_name.field_name(), "some name");
+ EXPECT_EQ(with_name.oid(), type.oid());
+ EXPECT_EQ(with_name.type_id(), type.type_id());
+
+ PostgresType array = type.Array(12345, "array type name");
+ EXPECT_EQ(array.oid(), 12345);
+ EXPECT_EQ(array.typname(), "array type name");
+ EXPECT_EQ(array.n_children(), 1);
+ EXPECT_EQ(array.child(0).oid(), type.oid());
+ EXPECT_EQ(array.child(0).type_id(), type.type_id());
+
+ PostgresType range = type.Range(12345, "range type name");
+ EXPECT_EQ(range.oid(), 12345);
+ EXPECT_EQ(range.typname(), "range type name");
+ EXPECT_EQ(range.n_children(), 1);
+ EXPECT_EQ(range.child(0).oid(), type.oid());
+ EXPECT_EQ(range.child(0).type_id(), type.type_id());
+
+ PostgresType domain = type.Domain(123456, "domain type name");
+ EXPECT_EQ(domain.oid(), 123456);
+ EXPECT_EQ(domain.typname(), "domain type name");
+ EXPECT_EQ(domain.type_id(), type.type_id());
+
+ PostgresType record(PostgresTypeId::kRecord);
+ record.AppendChild("col1", type);
+ EXPECT_EQ(record.type_id(), PostgresTypeId::kRecord);
+ EXPECT_EQ(record.n_children(), 1);
+ EXPECT_EQ(record.child(0).type_id(), type.type_id());
+ EXPECT_EQ(record.child(0).field_name(), "col1");
+}
+
+TEST(PostgresTypeTest, PostgresTypeSetSchema) {
+ ArrowSchema schema;
+
+ ArrowSchemaInit(&schema);
+ EXPECT_EQ(PostgresType(PostgresTypeId::kBool).SetSchema(&schema),
NANOARROW_OK);
+ EXPECT_STREQ(schema.format, "b");
+ schema.release(&schema);
+
+ ArrowSchemaInit(&schema);
+ EXPECT_EQ(PostgresType(PostgresTypeId::kInt2).SetSchema(&schema),
NANOARROW_OK);
+ EXPECT_STREQ(schema.format, "s");
+ schema.release(&schema);
+
+ ArrowSchemaInit(&schema);
+ EXPECT_EQ(PostgresType(PostgresTypeId::kInt4).SetSchema(&schema),
NANOARROW_OK);
+ EXPECT_STREQ(schema.format, "i");
+ schema.release(&schema);
+
+ ArrowSchemaInit(&schema);
+ EXPECT_EQ(PostgresType(PostgresTypeId::kInt8).SetSchema(&schema),
NANOARROW_OK);
+ EXPECT_STREQ(schema.format, "l");
+ schema.release(&schema);
+
+ ArrowSchemaInit(&schema);
+ EXPECT_EQ(PostgresType(PostgresTypeId::kFloat4).SetSchema(&schema),
NANOARROW_OK);
+ EXPECT_STREQ(schema.format, "f");
+ schema.release(&schema);
+
+ ArrowSchemaInit(&schema);
+ EXPECT_EQ(PostgresType(PostgresTypeId::kFloat8).SetSchema(&schema),
NANOARROW_OK);
+ EXPECT_STREQ(schema.format, "g");
+ schema.release(&schema);
+
+ ArrowSchemaInit(&schema);
+ EXPECT_EQ(PostgresType(PostgresTypeId::kText).SetSchema(&schema),
NANOARROW_OK);
+ EXPECT_STREQ(schema.format, "u");
+ schema.release(&schema);
+
+ ArrowSchemaInit(&schema);
+ EXPECT_EQ(PostgresType(PostgresTypeId::kBytea).SetSchema(&schema),
NANOARROW_OK);
+ EXPECT_STREQ(schema.format, "z");
+ schema.release(&schema);
+
+ ArrowSchemaInit(&schema);
+ EXPECT_EQ(PostgresType(PostgresTypeId::kBool).Array().SetSchema(&schema),
NANOARROW_OK);
+ EXPECT_STREQ(schema.format, "+l");
+ EXPECT_STREQ(schema.children[0]->format, "b");
+ schema.release(&schema);
+
+ ArrowSchemaInit(&schema);
+ PostgresType record(PostgresTypeId::kRecord);
+ record.AppendChild("col1", PostgresType(PostgresTypeId::kBool));
+ EXPECT_EQ(record.SetSchema(&schema), NANOARROW_OK);
+ EXPECT_STREQ(schema.format, "+s");
+ EXPECT_STREQ(schema.children[0]->format, "b");
+ schema.release(&schema);
+
+ ArrowSchemaInit(&schema);
+ PostgresType unknown(PostgresTypeId::kBrinMinmaxMultiSummary);
+ EXPECT_EQ(unknown.WithPgTypeInfo(0, "some_name").SetSchema(&schema),
NANOARROW_OK);
+ EXPECT_STREQ(schema.format, "z");
+
+ ArrowStringView value = ArrowCharView("<not found>");
+ ArrowMetadataGetValue(schema.metadata,
ArrowCharView("ADBC:postgresql:typname"),
+ &value);
+ EXPECT_EQ(std::string(value.data, value.size_bytes), "some_name");
+ schema.release(&schema);
+}
+
+TEST(PostgresTypeTest, PostgresTypeFromSchema) {
+ ArrowSchema schema;
+ PostgresType type;
+ MockTypeResolver resolver;
+ ASSERT_EQ(resolver.Init(), NANOARROW_OK);
+
+ ASSERT_EQ(ArrowSchemaInitFromType(&schema, NANOARROW_TYPE_BOOL),
NANOARROW_OK);
+ EXPECT_EQ(adbcpq::PostgresType::FromSchema(resolver, &schema, &type,
nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(type.type_id(), PostgresTypeId::kBool);
+ schema.release(&schema);
+
+ ASSERT_EQ(ArrowSchemaInitFromType(&schema, NANOARROW_TYPE_INT8),
NANOARROW_OK);
+ EXPECT_EQ(adbcpq::PostgresType::FromSchema(resolver, &schema, &type,
nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(type.type_id(), PostgresTypeId::kInt2);
+ schema.release(&schema);
+
+ ASSERT_EQ(ArrowSchemaInitFromType(&schema, NANOARROW_TYPE_UINT8),
NANOARROW_OK);
+ EXPECT_EQ(adbcpq::PostgresType::FromSchema(resolver, &schema, &type,
nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(type.type_id(), PostgresTypeId::kInt2);
+ schema.release(&schema);
+
+ ASSERT_EQ(ArrowSchemaInitFromType(&schema, NANOARROW_TYPE_INT16),
NANOARROW_OK);
+ EXPECT_EQ(adbcpq::PostgresType::FromSchema(resolver, &schema, &type,
nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(type.type_id(), PostgresTypeId::kInt2);
+ schema.release(&schema);
+
+ ASSERT_EQ(ArrowSchemaInitFromType(&schema, NANOARROW_TYPE_UINT16),
NANOARROW_OK);
+ EXPECT_EQ(adbcpq::PostgresType::FromSchema(resolver, &schema, &type,
nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(type.type_id(), PostgresTypeId::kInt4);
+ schema.release(&schema);
+
+ ASSERT_EQ(ArrowSchemaInitFromType(&schema, NANOARROW_TYPE_INT32),
NANOARROW_OK);
+ EXPECT_EQ(adbcpq::PostgresType::FromSchema(resolver, &schema, &type,
nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(type.type_id(), PostgresTypeId::kInt4);
+ schema.release(&schema);
+
+ ASSERT_EQ(ArrowSchemaInitFromType(&schema, NANOARROW_TYPE_UINT32),
NANOARROW_OK);
+ EXPECT_EQ(adbcpq::PostgresType::FromSchema(resolver, &schema, &type,
nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(type.type_id(), PostgresTypeId::kInt8);
+ schema.release(&schema);
+
+ ASSERT_EQ(ArrowSchemaInitFromType(&schema, NANOARROW_TYPE_INT64),
NANOARROW_OK);
+ EXPECT_EQ(adbcpq::PostgresType::FromSchema(resolver, &schema, &type,
nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(type.type_id(), PostgresTypeId::kInt8);
+ schema.release(&schema);
+
+ ASSERT_EQ(ArrowSchemaInitFromType(&schema, NANOARROW_TYPE_FLOAT),
NANOARROW_OK);
+ EXPECT_EQ(adbcpq::PostgresType::FromSchema(resolver, &schema, &type,
nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(type.type_id(), PostgresTypeId::kFloat4);
+ schema.release(&schema);
+
+ ASSERT_EQ(ArrowSchemaInitFromType(&schema, NANOARROW_TYPE_DOUBLE),
NANOARROW_OK);
+ EXPECT_EQ(adbcpq::PostgresType::FromSchema(resolver, &schema, &type,
nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(type.type_id(), PostgresTypeId::kFloat8);
+ schema.release(&schema);
+
+ ASSERT_EQ(ArrowSchemaInitFromType(&schema, NANOARROW_TYPE_BINARY),
NANOARROW_OK);
+ EXPECT_EQ(adbcpq::PostgresType::FromSchema(resolver, &schema, &type,
nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(type.type_id(), PostgresTypeId::kBytea);
+ schema.release(&schema);
+
+ ASSERT_EQ(ArrowSchemaInitFromType(&schema, NANOARROW_TYPE_STRING),
NANOARROW_OK);
+ EXPECT_EQ(adbcpq::PostgresType::FromSchema(resolver, &schema, &type,
nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(type.type_id(), PostgresTypeId::kText);
+ schema.release(&schema);
+
+ ArrowSchemaInit(&schema);
+ ASSERT_EQ(ArrowSchemaSetType(&schema, NANOARROW_TYPE_LIST), NANOARROW_OK);
+ ASSERT_EQ(ArrowSchemaSetType(schema.children[0], NANOARROW_TYPE_BOOL),
NANOARROW_OK);
+ EXPECT_EQ(adbcpq::PostgresType::FromSchema(resolver, &schema, &type,
nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(type.type_id(), PostgresTypeId::kArray);
+ EXPECT_EQ(type.child(0).type_id(), PostgresTypeId::kBool);
+ schema.release(&schema);
+
+ ArrowError error;
+ ASSERT_EQ(ArrowSchemaInitFromType(&schema,
NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO),
+ NANOARROW_OK);
+ EXPECT_EQ(adbcpq::PostgresType::FromSchema(resolver, &schema, &type,
&error), ENOTSUP);
+ EXPECT_STREQ(error.message,
+ "Can't map Arrow type 'interval_month_day_nano' to Postgres
type");
+ schema.release(&schema);
+}
+
+TEST(PostgresTypeTest, PostgresTypeResolver) {
+ PostgresTypeResolver resolver;
+ ArrowError error;
+ PostgresType type;
+ PostgresTypeResolver::Item item;
+
+ // Check error for type not found
+ EXPECT_EQ(resolver.Find(123, &type, &error), EINVAL);
+ EXPECT_STREQ(ArrowErrorMessage(&error), "Postgres type with oid 123 not
found");
+
+ // Check error for unsupported type_id name
+ item.oid = 123;
+ item.typname = "invalid";
+ item.typreceive = "invalid_recv";
+ EXPECT_EQ(resolver.Insert(item, &error), ENOTSUP);
+ EXPECT_STREQ(
+ ArrowErrorMessage(&error),
+ "Base type not found for type 'invalid' with receive function
'invalid_recv'");
+
+ // Check error for Array with unknown child
+ item.typname = "some_array";
+ item.typreceive = "array_recv";
+ item.child_oid = 1234;
+ EXPECT_EQ(resolver.Insert(item, &error), EINVAL);
+ EXPECT_STREQ(ArrowErrorMessage(&error), "Postgres type with oid 1234 not
found");
+
+ // Check error for Range with unknown child
+ item.typname = "some_range";
+ item.typreceive = "range_recv";
+ item.base_oid = 12345;
+ EXPECT_EQ(resolver.Insert(item, &error), EINVAL);
+ EXPECT_STREQ(ArrowErrorMessage(&error), "Postgres type with oid 12345 not
found");
+
+ // Check error for Domain with unknown child
+ item.typname = "some_domain";
+ item.typreceive = "domain_recv";
+ item.base_oid = 123456;
+ EXPECT_EQ(resolver.Insert(item, &error), EINVAL);
+ EXPECT_STREQ(ArrowErrorMessage(&error), "Postgres type with oid 123456 not
found");
+
+ // Check error for Record with unknown class
+ item.typname = "some_record";
+ item.typreceive = "record_recv";
+ item.class_oid = 123456;
+ EXPECT_EQ(resolver.Insert(item, &error), EINVAL);
+ EXPECT_STREQ(ArrowErrorMessage(&error), "Class definition with oid 123456
not found");
+
+ // Check insert/resolve of regular type
+ item.typname = "some_type_name";
+ item.typreceive = "boolrecv";
+ item.oid = 10;
+ EXPECT_EQ(resolver.Insert(item, &error), NANOARROW_OK);
+ EXPECT_EQ(resolver.Find(10, &type, &error), NANOARROW_OK);
+ EXPECT_EQ(type.oid(), 10);
+ EXPECT_EQ(type.typname(), "some_type_name");
+ EXPECT_EQ(type.type_id(), PostgresTypeId::kBool);
+
+ // Check insert/resolve of array type
+ item.oid = 11;
+ item.typname = "some_array_type_name";
+ item.typreceive = "array_recv";
+ item.child_oid = 10;
+ EXPECT_EQ(resolver.Insert(item, &error), NANOARROW_OK);
+ EXPECT_EQ(resolver.Find(11, &type, &error), NANOARROW_OK);
+ EXPECT_EQ(type.oid(), 11);
+ EXPECT_EQ(type.typname(), "some_array_type_name");
+ EXPECT_EQ(type.type_id(), PostgresTypeId::kArray);
+ EXPECT_EQ(type.child(0).oid(), 10);
+ EXPECT_EQ(type.child(0).type_id(), PostgresTypeId::kBool);
+
+ // Check reverse lookup of array type from item type
+ EXPECT_EQ(resolver.FindArray(10, &type, &error), NANOARROW_OK);
+ EXPECT_EQ(type.oid(), 11);
+
+ // Check insert/resolve of range type
+ item.oid = 12;
+ item.typname = "some_range_type_name";
+ item.typreceive = "range_recv";
+ item.base_oid = 10;
+ EXPECT_EQ(resolver.Insert(item, &error), NANOARROW_OK);
+ EXPECT_EQ(resolver.Find(12, &type, &error), NANOARROW_OK);
+ EXPECT_EQ(type.oid(), 12);
+ EXPECT_EQ(type.typname(), "some_range_type_name");
+ EXPECT_EQ(type.type_id(), PostgresTypeId::kRange);
+ EXPECT_EQ(type.child(0).oid(), 10);
+ EXPECT_EQ(type.child(0).type_id(), PostgresTypeId::kBool);
+
+ // Check insert/resolve of domain type
+ item.oid = 13;
+ item.typname = "some_domain_type_name";
+ item.typreceive = "domain_recv";
+ item.base_oid = 10;
+ EXPECT_EQ(resolver.Insert(item, &error), NANOARROW_OK);
+ EXPECT_EQ(resolver.Find(13, &type, &error), NANOARROW_OK);
+ EXPECT_EQ(type.oid(), 13);
+ EXPECT_EQ(type.typname(), "some_domain_type_name");
+ EXPECT_EQ(type.type_id(), PostgresTypeId::kBool);
+}
+
+TEST(PostgresTypeTest, PostgresTypeResolveRecord) {
+ // Use the mock resolver for the record test since it already has one
+ MockTypeResolver resolver;
+ ASSERT_EQ(resolver.Init(), NANOARROW_OK);
+
+ PostgresType type;
+ EXPECT_EQ(resolver.Find(resolver.GetOID(PostgresTypeId::kRecord), &type,
nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(type.oid(), resolver.GetOID(PostgresTypeId::kRecord));
+ EXPECT_EQ(type.n_children(), 2);
+ EXPECT_EQ(type.child(0).field_name(), "int4_col");
+ EXPECT_EQ(type.child(0).type_id(), PostgresTypeId::kInt4);
+ EXPECT_EQ(type.child(1).field_name(), "text_col");
+ EXPECT_EQ(type.child(1).type_id(), PostgresTypeId::kText);
+}
diff --git a/c/driver/postgresql/postgresql.cc
b/c/driver/postgresql/postgresql.cc
index d4be5ce..8cb998d 100644
--- a/c/driver/postgresql/postgresql.cc
+++ b/c/driver/postgresql/postgresql.cc
@@ -307,8 +307,6 @@ AdbcStatusCode PostgresStatementExecutePartitions(struct
AdbcStatement* statemen
int64_t* rows_affected,
struct AdbcError* error) {
if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
- auto* ptr =
-
reinterpret_cast<std::shared_ptr<PostgresStatement>*>(statement->private_data);
return ADBC_STATUS_NOT_IMPLEMENTED;
}
diff --git a/c/driver/postgresql/statement.cc b/c/driver/postgresql/statement.cc
index 900b23e..c88be68 100644
--- a/c/driver/postgresql/statement.cc
+++ b/c/driver/postgresql/statement.cc
@@ -30,7 +30,7 @@
#include <nanoarrow/nanoarrow.h>
#include "connection.h"
-#include "type.h"
+#include "postgres_type.h"
#include "util.h"
namespace adbcpq {
@@ -105,57 +105,23 @@ struct Handle {
};
/// Build an Arrow schema from a PostgreSQL result set
-AdbcStatusCode InferSchema(const TypeMapping& type_mapping, PGresult* result,
+AdbcStatusCode InferSchema(const PostgresTypeResolver& type_resolver,
PGresult* result,
struct ArrowSchema* out, struct AdbcError* error) {
+ ArrowError na_error;
const int num_fields = PQnfields(result);
ArrowSchemaInit(out);
CHECK_NA_ADBC(ArrowSchemaSetTypeStruct(out, num_fields), error);
for (int i = 0; i < num_fields; i++) {
- ArrowType field_type = NANOARROW_TYPE_NA;
- const Oid pg_type = PQftype(result, i);
-
- auto it = type_mapping.type_mapping.find(pg_type);
- if (it == type_mapping.type_mapping.end()) {
+ const Oid pg_oid = PQftype(result, i);
+ PostgresType pg_type;
+ if (type_resolver.Find(pg_oid, &pg_type, &na_error) != NANOARROW_OK) {
SetError(error, "Column #", i + 1, " (\"", PQfname(result, i),
- "\") has unknown type code ", pg_type);
+ "\") has unknown type code ", pg_oid);
return ADBC_STATUS_NOT_IMPLEMENTED;
}
- switch (it->second) {
- // TODO: this mapping will eventually have to become dynamic,
- // because of complex types like arrays/records
- case PgType::kBool:
- field_type = NANOARROW_TYPE_BOOL;
- break;
- case PgType::kFloat4:
- field_type = NANOARROW_TYPE_FLOAT;
- break;
- case PgType::kFloat8:
- field_type = NANOARROW_TYPE_DOUBLE;
- break;
- case PgType::kInt2:
- field_type = NANOARROW_TYPE_INT16;
- break;
- case PgType::kInt4:
- field_type = NANOARROW_TYPE_INT32;
- break;
- case PgType::kInt8:
- field_type = NANOARROW_TYPE_INT64;
- break;
- case PgType::kVarBinary:
- field_type = NANOARROW_TYPE_BINARY;
- break;
- case PgType::kText:
- case PgType::kVarChar:
- field_type = NANOARROW_TYPE_STRING;
- break;
- default:
- SetError(error, "Column #", i + 1, " (\"", PQfname(result, i),
- "\") has unimplemented type code ", pg_type);
- return ADBC_STATUS_NOT_IMPLEMENTED;
- }
- CHECK_NA_ADBC(ArrowSchemaSetType(out->children[i], field_type), error);
- CHECK_NA_ADBC(ArrowSchemaSetName(out->children[i], PQfname(result, i)),
error);
+ CHECK_NA_ADBC(pg_type.WithFieldName(PQfname(result,
i)).SetSchema(out->children[i]),
+ error);
}
return ADBC_STATUS_OK;
}
@@ -206,7 +172,8 @@ struct BindStream {
return std::move(callback)();
}
- AdbcStatusCode SetParamTypes(const TypeMapping& type_mapping, struct
AdbcError* error) {
+ AdbcStatusCode SetParamTypes(const PostgresTypeResolver& type_resolver,
+ struct AdbcError* error) {
param_types.resize(bind_schema->n_children);
param_values.resize(bind_schema->n_children);
param_lengths.resize(bind_schema->n_children);
@@ -214,41 +181,40 @@ struct BindStream {
param_values_offsets.reserve(bind_schema->n_children);
for (size_t i = 0; i < bind_schema_fields.size(); i++) {
- PgType pg_type;
+ PostgresTypeId type_id;
switch (bind_schema_fields[i].type) {
case ArrowType::NANOARROW_TYPE_INT16:
- pg_type = PgType::kInt2;
+ type_id = PostgresTypeId::kInt2;
param_lengths[i] = 2;
break;
case ArrowType::NANOARROW_TYPE_INT32:
- pg_type = PgType::kInt4;
+ type_id = PostgresTypeId::kInt4;
param_lengths[i] = 4;
break;
case ArrowType::NANOARROW_TYPE_INT64:
- pg_type = PgType::kInt8;
+ type_id = PostgresTypeId::kInt8;
param_lengths[i] = 8;
break;
case ArrowType::NANOARROW_TYPE_DOUBLE:
- pg_type = PgType::kFloat8;
+ type_id = PostgresTypeId::kFloat8;
param_lengths[i] = 8;
break;
case ArrowType::NANOARROW_TYPE_STRING:
- pg_type = PgType::kText;
+ type_id = PostgresTypeId::kText;
param_lengths[i] = 0;
break;
default:
- // TODO: data type to string
SetError(error, "Field #", i + 1, " ('",
bind_schema->children[i]->name,
- "') has unsupported parameter type ",
bind_schema_fields[i].type);
+ "') has unsupported parameter type ",
+ ArrowTypeString(bind_schema_fields[i].type));
return ADBC_STATUS_NOT_IMPLEMENTED;
}
- param_types[i] = type_mapping.GetOid(pg_type);
+ param_types[i] = type_resolver.GetOID(type_id);
if (param_types[i] == 0) {
- // TODO: data type to string
SetError(error, "Field #", i + 1, " ('",
bind_schema->children[i]->name,
"') has type with no corresponding PostgreSQL type ",
- bind_schema_fields[i].type);
+ ArrowTypeString(bind_schema_fields[i].type));
return ADBC_STATUS_NOT_IMPLEMENTED;
}
}
@@ -421,7 +387,7 @@ int TupleReader::GetNext(struct ArrowArray* out) {
kPgCopyBinarySignature.size() + sizeof(uint32_t) + sizeof(uint32_t);
// https://www.postgresql.org/docs/14/sql-copy.html#id-1.9.3.55.9.4.5
const int size = PQgetCopyData(conn_, &pgbuf_, /*async=*/0);
- if (size < kPqHeaderLength) {
+ if (size < static_cast<int>(kPqHeaderLength)) {
return EIO;
} else if (std::strcmp(pgbuf_, kPgCopyBinarySignature.data()) != 0) {
return EIO;
@@ -676,7 +642,7 @@ AdbcStatusCode PostgresStatement::New(struct
AdbcConnection* connection,
}
connection_ =
*reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
- type_mapping_ = connection_->type_mapping();
+ type_resolver_ = connection_->type_resolver();
reader_.conn_ = connection_->conn();
return ADBC_STATUS_OK;
}
@@ -785,7 +751,7 @@ AdbcStatusCode PostgresStatement::ExecutePreparedStatement(
std::memset(&bind_, 0, sizeof(bind_));
CHECK(bind_stream.Begin([&]() { return ADBC_STATUS_OK; }, error));
- CHECK(bind_stream.SetParamTypes(*type_mapping_, error));
+ CHECK(bind_stream.SetParamTypes(*type_resolver_, error));
CHECK(bind_stream.Prepare(connection_->conn(), query_, error));
CHECK(bind_stream.Execute(connection_->conn(), rows_affected, error));
return ADBC_STATUS_OK;
@@ -836,7 +802,7 @@ AdbcStatusCode PostgresStatement::ExecuteQuery(struct
ArrowArrayStream* stream,
PQclear(result);
return ADBC_STATUS_IO;
}
- AdbcStatusCode status = InferSchema(*type_mapping_, result,
&reader_.schema_, error);
+ AdbcStatusCode status = InferSchema(*type_resolver_, result,
&reader_.schema_, error);
PQclear(result);
if (status != ADBC_STATUS_OK) return status;
}
@@ -882,7 +848,7 @@ AdbcStatusCode
PostgresStatement::ExecuteUpdateBulk(int64_t* rows_affected,
return ADBC_STATUS_OK;
},
error));
- CHECK(bind_stream.SetParamTypes(*type_mapping_, error));
+ CHECK(bind_stream.SetParamTypes(*type_resolver_, error));
std::string insert = "INSERT INTO ";
insert += ingest_.target;
diff --git a/c/driver/postgresql/statement.h b/c/driver/postgresql/statement.h
index bb39d0d..1ad4d82 100644
--- a/c/driver/postgresql/statement.h
+++ b/c/driver/postgresql/statement.h
@@ -26,7 +26,7 @@
#include <libpq-fe.h>
#include <nanoarrow/nanoarrow.h>
-#include "type.h"
+#include "postgres_type.h"
namespace adbcpq {
class PostgresConnection;
@@ -100,7 +100,7 @@ class PostgresStatement {
struct AdbcError* error);
private:
- std::shared_ptr<TypeMapping> type_mapping_;
+ std::shared_ptr<PostgresTypeResolver> type_resolver_;
std::shared_ptr<PostgresConnection> connection_;
// Query state
diff --git a/c/driver/postgresql/type.cc b/c/driver/postgresql/type.cc
deleted file mode 100644
index b246604..0000000
--- a/c/driver/postgresql/type.cc
+++ /dev/null
@@ -1,92 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "type.h"
-
-#include <cstring>
-
-namespace adbcpq {
-void TypeMapping::Insert(uint32_t oid, const char* typname, const char*
typreceive) {
- PgType type;
- if (FromPgTypreceive(typreceive, &type)) {
- type_mapping[oid] = type;
- }
-
- // Record 'canonical' types
- if (std::strcmp(typname, "int4") == 0) {
- // DCHECK_EQ(type, PgType::kInt4);
- canonical_types[static_cast<int32_t>(PgType::kInt4)] = oid;
- } else if (std::strcmp(typname, "int8") == 0) {
- // DCHECK_EQ(type, PgType::kInt8);
- canonical_types[static_cast<int32_t>(PgType::kInt8)] = oid;
- } else if (std::strcmp(typname, "float8") == 0) {
- // DCHECK_EQ(type, PgType::kFloat8);
- canonical_types[static_cast<int32_t>(PgType::kFloat8)] = oid;
- } else if (std::strcmp(typname, "text") == 0) {
- canonical_types[static_cast<int32_t>(PgType::kText)] = oid;
- }
- // TODO: fill in remainder
-}
-
-uint32_t TypeMapping::GetOid(PgType type) const {
- auto it = canonical_types.find(static_cast<int32_t>(type));
- if (it == canonical_types.end()) {
- return 0;
- }
- return it->second;
-}
-
-bool FromPgTypreceive(const char* typreceive, PgType* out) {
- if (std::strcmp(typreceive, "bitrecv") == 0) {
- *out = PgType::kBit;
- } else if (std::strcmp(typreceive, "bytearecv") == 0) {
- *out = PgType::kVarBinary;
- } else if (std::strcmp(typreceive, "boolrecv") == 0) {
- *out = PgType::kBool;
- } else if (std::strcmp(typreceive, "bpcharrecv") == 0) {
- *out = PgType::kVarChar;
- } else if (std::strcmp(typreceive, "date_recv") == 0) {
- *out = PgType::kDate;
- } else if (std::strcmp(typreceive, "float4recv") == 0) {
- *out = PgType::kFloat4;
- } else if (std::strcmp(typreceive, "float8recv") == 0) {
- *out = PgType::kFloat8;
- } else if (std::strcmp(typreceive, "int2recv") == 0) {
- *out = PgType::kInt2;
- } else if (std::strcmp(typreceive, "int4recv") == 0) {
- *out = PgType::kInt4;
- } else if (std::strcmp(typreceive, "int8recv") == 0) {
- *out = PgType::kInt8;
- } else if (std::strcmp(typreceive, "textrecv") == 0) {
- *out = PgType::kText;
- } else if (std::strcmp(typreceive, "time_recv") == 0) {
- *out = PgType::kTime;
- } else if (std::strcmp(typreceive, "timestamp_recv") == 0) {
- *out = PgType::kTimestamp;
- } else if (std::strcmp(typreceive, "timestamptz_recv") == 0) {
- *out = PgType::kTimestampTz;
- } else if (std::strcmp(typreceive, "timetz_recv") == 0) {
- *out = PgType::kTimeTz;
- } else if (std::strcmp(typreceive, "varcharrecv") == 0) {
- *out = PgType::kVarChar;
- } else {
- return false;
- }
- return true;
-}
-
-} // namespace adbcpq
diff --git a/c/driver/postgresql/type.h b/c/driver/postgresql/type.h
deleted file mode 100644
index 1f2ce70..0000000
--- a/c/driver/postgresql/type.h
+++ /dev/null
@@ -1,63 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <cstdint>
-#include <unordered_map>
-
-#include <nanoarrow/nanoarrow.h>
-
-namespace adbcpq {
-
-enum class PgType : uint8_t {
- // TODO: is there a good null type?
- kBit,
- kBool,
- kDate,
- kFloat4,
- kFloat8,
- kInt2,
- kInt4,
- kInt8,
- kText,
- kTime,
- kTimestamp,
- kTimestampTz,
- kTimeTz,
- kVarBinary,
- kVarChar,
-};
-
-struct TypeMapping {
- // Maps PostgreSQL type OIDs to a standardized type name
- // Example: int8 == 20
- std::unordered_map<uint32_t, PgType> type_mapping;
- // Maps standardized type names to the PostgreSQL type OID to use
- // Example: kInt8 == 20
- // We can't use enum PgType as the key because enums don't have a hash
- // implementation on gcc 4.8 (i.e., R 3.6 on Windows)
- std::unordered_map<int32_t, uint32_t> canonical_types;
-
- void Insert(uint32_t oid, const char* typname, const char* typreceive);
- /// \return 0 if not found
- uint32_t GetOid(PgType type) const;
-};
-
-bool FromPgTypreceive(const char* typreceive, PgType* out);
-
-} // namespace adbcpq
diff --git a/c/driver/postgresql/util.h b/c/driver/postgresql/util.h
index d8729fd..c5982f0 100644
--- a/c/driver/postgresql/util.h
+++ b/c/driver/postgresql/util.h
@@ -41,12 +41,16 @@ namespace adbcpq {
#define MAKE_NAME(x, y) CONCAT(x, y)
#if defined(_WIN32) && defined(_MSC_VER)
+static inline uint32_t SwapNetworkToHost(uint16_t x) { return ntohs(x); }
+static inline uint32_t SwapHostToNetwork(uint16_t x) { return htons(x); }
static inline uint32_t SwapNetworkToHost(uint32_t x) { return ntohl(x); }
static inline uint32_t SwapHostToNetwork(uint32_t x) { return htonl(x); }
static inline uint64_t SwapNetworkToHost(uint64_t x) { return ntohll(x); }
static inline uint64_t SwapHostToNetwork(uint64_t x) { return htonll(x); }
#elif defined(_WIN32)
// e.g., msys2, where ntohll is not necessarily defined
+static inline uint32_t SwapNetworkToHost(uint16_t x) { return ntohs(x); }
+static inline uint32_t SwapHostToNetwork(uint16_t x) { return htons(x); }
static inline uint32_t SwapNetworkToHost(uint32_t x) { return ntohl(x); }
static inline uint32_t SwapHostToNetwork(uint32_t x) { return htonl(x); }
static inline uint64_t SwapNetworkToHost(uint64_t x) {
@@ -57,11 +61,15 @@ static inline uint64_t SwapNetworkToHost(uint64_t x) {
}
static inline uint64_t SwapHostToNetwork(uint64_t x) { return
SwapNetworkToHost(x); }
#elif defined(__APPLE__)
+static inline uint16_t SwapNetworkToHost(uint16_t x) { return
OSSwapBigToHostInt16(x); }
+static inline uint16_t SwapHostToNetwork(uint16_t x) { return
OSSwapHostToBigInt16(x); }
static inline uint32_t SwapNetworkToHost(uint32_t x) { return
OSSwapBigToHostInt32(x); }
static inline uint32_t SwapHostToNetwork(uint32_t x) { return
OSSwapHostToBigInt32(x); }
static inline uint64_t SwapNetworkToHost(uint64_t x) { return
OSSwapBigToHostInt64(x); }
static inline uint64_t SwapHostToNetwork(uint64_t x) { return
OSSwapHostToBigInt64(x); }
#else
+static inline uint16_t SwapNetworkToHost(uint16_t x) { return be16toh(x); }
+static inline uint16_t SwapHostToNetwork(uint16_t x) { return htobe16(x); }
static inline uint32_t SwapNetworkToHost(uint32_t x) { return be32toh(x); }
static inline uint32_t SwapHostToNetwork(uint32_t x) { return htobe32(x); }
static inline uint64_t SwapNetworkToHost(uint64_t x) { return be64toh(x); }
diff --git a/docker-compose.yml b/docker-compose.yml
index 55cc5c5..77ca8ae 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -21,7 +21,7 @@ services:
# These reuse Arrow's images for simplicity. You won't be able to
# build the image from here.
- ############################ Documentation###################################
+ ############################ Documentation
###################################
docs:
image: condaforge/mambaforge:latest
diff --git a/r/adbcpostgresql/bootstrap.R b/r/adbcpostgresql/bootstrap.R
index 030157e..b7c84fc 100644
--- a/r/adbcpostgresql/bootstrap.R
+++ b/r/adbcpostgresql/bootstrap.R
@@ -20,8 +20,7 @@
files_to_vendor <- c(
"../../adbc.h",
"../../c/driver/postgresql/util.h",
- "../../c/driver/postgresql/type.h",
- "../../c/driver/postgresql/type.cc",
+ "../../c/driver/postgresql/postgres_type.h",
"../../c/driver/postgresql/statement.h",
"../../c/driver/postgresql/statement.cc",
"../../c/driver/postgresql/connection.h",
@@ -30,6 +29,7 @@ files_to_vendor <- c(
"../../c/driver/postgresql/database.cc",
"../../c/driver/postgresql/postgresql.cc",
"../../c/vendor/nanoarrow/nanoarrow.h",
+ "../../c/vendor/nanoarrow/nanoarrow.hpp",
"../../c/vendor/nanoarrow/nanoarrow.c"
)
@@ -50,8 +50,12 @@ if (all(file.exists(files_to_vendor))) {
if (all(file.copy(files_to_vendor, "src"))) {
file.rename(
- c("src/nanoarrow.c", "src/nanoarrow.h"),
- c("src/nanoarrow/nanoarrow.c", "src/nanoarrow/nanoarrow.h")
+ c("src/nanoarrow.c", "src/nanoarrow.h", "src/nanoarrow.hpp"),
+ c(
+ "src/nanoarrow/nanoarrow.c",
+ "src/nanoarrow/nanoarrow.h",
+ "src/nanoarrow/nanoarrow.hpp"
+ )
)
cat("All files successfully copied to src/\n")
} else {
diff --git a/r/adbcpostgresql/src/.gitignore b/r/adbcpostgresql/src/.gitignore
index 45c6e7e..1e652d7 100644
--- a/r/adbcpostgresql/src/.gitignore
+++ b/r/adbcpostgresql/src/.gitignore
@@ -22,10 +22,10 @@ adbc.h
connection.cc
connection.h
database.h
+database.cc
postgresql.cc
statement.h
statement.cc
-type.cc
-type.h
+postgres_type.h
util.h
Makevars
diff --git a/r/adbcpostgresql/src/Makevars.in b/r/adbcpostgresql/src/Makevars.in
index fe9d8e5..c062d11 100644
--- a/r/adbcpostgresql/src/Makevars.in
+++ b/r/adbcpostgresql/src/Makevars.in
@@ -22,6 +22,5 @@ OBJECTS = init.o \
connection.o \
database.o \
statement.o \
- type.o \
postgresql.o \
nanoarrow/nanoarrow.o
diff --git a/r/adbcpostgresql/src/Makevars.ucrt
b/r/adbcpostgresql/src/Makevars.ucrt
index ef55b83..0fc2d0c 100644
--- a/r/adbcpostgresql/src/Makevars.ucrt
+++ b/r/adbcpostgresql/src/Makevars.ucrt
@@ -22,6 +22,5 @@ OBJECTS = init.o \
connection.o \
database.o \
statement.o \
- type.o \
postgresql.o \
nanoarrow/nanoarrow.o
diff --git a/r/adbcpostgresql/src/Makevars.win
b/r/adbcpostgresql/src/Makevars.win
index e930737..a45dc51 100644
--- a/r/adbcpostgresql/src/Makevars.win
+++ b/r/adbcpostgresql/src/Makevars.win
@@ -25,7 +25,6 @@ OBJECTS = init.o \
connection.o \
database.o \
statement.o \
- type.o \
postgresql.o \
nanoarrow/nanoarrow.o
diff --git a/r/adbcpostgresql/src/database.cc b/r/adbcpostgresql/src/database.cc
deleted file mode 100644
index bc5e0ec..0000000
--- a/r/adbcpostgresql/src/database.cc
+++ /dev/null
@@ -1,124 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "database.h"
-
-#include <cstring>
-#include <memory>
-
-#include <adbc.h>
-#include <libpq-fe.h>
-#include <nanoarrow/nanoarrow.h>
-
-#include "util.h"
-
-namespace adbcpq {
-
-PostgresDatabase::PostgresDatabase() : open_connections_(0) {
- type_mapping_ = std::make_shared<TypeMapping>();
-}
-PostgresDatabase::~PostgresDatabase() = default;
-
-AdbcStatusCode PostgresDatabase::Init(struct AdbcError* error) {
- // Connect to validate the parameters.
- PGconn* conn = nullptr;
- AdbcStatusCode final_status = Connect(&conn, error);
- if (final_status != ADBC_STATUS_OK) {
- return final_status;
- }
-
- // Build the type mapping table.
- const std::string kTypeQuery = R"(
-SELECT
- oid,
- typname,
- typreceive
-FROM
- pg_catalog.pg_type
-)";
-
- pg_result* result = PQexec(conn, kTypeQuery.c_str());
- ExecStatusType pq_status = PQresultStatus(result);
- if (pq_status == PGRES_TUPLES_OK) {
- int num_rows = PQntuples(result);
- for (int row = 0; row < num_rows; row++) {
- const uint32_t oid = static_cast<uint32_t>(
- std::strtol(PQgetvalue(result, row, 0), /*str_end=*/nullptr,
/*base=*/10));
- const char* typname = PQgetvalue(result, row, 1);
- const char* typreceive = PQgetvalue(result, row, 2);
-
- type_mapping_->Insert(oid, typname, typreceive);
- }
- } else {
- SetError(error, "Failed to build type mapping table: ",
PQerrorMessage(conn));
- final_status = ADBC_STATUS_IO;
- }
- PQclear(result);
-
- // Disconnect since PostgreSQL connections can be heavy.
- {
- AdbcStatusCode status = Disconnect(&conn, error);
- if (status != ADBC_STATUS_OK) final_status = status;
- }
- return final_status;
-}
-
-AdbcStatusCode PostgresDatabase::Release(struct AdbcError* error) {
- if (open_connections_ != 0) {
- SetError(error, "Database released with ", open_connections_, " open
connections");
- return ADBC_STATUS_INVALID_STATE;
- }
- return ADBC_STATUS_OK;
-}
-
-AdbcStatusCode PostgresDatabase::SetOption(const char* key, const char* value,
- struct AdbcError* error) {
- if (strcmp(key, "uri") == 0) {
- uri_ = value;
- } else {
- SetError(error, "Unknown database option ", key);
- return ADBC_STATUS_NOT_IMPLEMENTED;
- }
- return ADBC_STATUS_OK;
-}
-
-AdbcStatusCode PostgresDatabase::Connect(PGconn** conn, struct AdbcError*
error) {
- if (uri_.empty()) {
- SetError(error, "Must set database option 'uri' before creating a
connection");
- return ADBC_STATUS_INVALID_STATE;
- }
- *conn = PQconnectdb(uri_.c_str());
- if (PQstatus(*conn) != CONNECTION_OK) {
- SetError(error, "Failed to connect: ", PQerrorMessage(*conn));
- PQfinish(*conn);
- *conn = nullptr;
- return ADBC_STATUS_IO;
- }
- open_connections_++;
- return ADBC_STATUS_OK;
-}
-
-AdbcStatusCode PostgresDatabase::Disconnect(PGconn** conn, struct AdbcError*
error) {
- PQfinish(*conn);
- *conn = nullptr;
- if (--open_connections_ < 0) {
- SetError(error, "Open connection count underflowed");
- return ADBC_STATUS_INTERNAL;
- }
- return ADBC_STATUS_OK;
-}
-} // namespace adbcpq
diff --git a/r/adbcpostgresql/src/nanoarrow/.gitignore
b/r/adbcpostgresql/src/nanoarrow/.gitignore
index 87e59e2..632b0d8 100644
--- a/r/adbcpostgresql/src/nanoarrow/.gitignore
+++ b/r/adbcpostgresql/src/nanoarrow/.gitignore
@@ -17,3 +17,4 @@
nanoarrow.c
nanoarrow.h
+nanoarrow.hpp
diff --git a/r/adbcpostgresql/src/postgresql.cc
b/r/adbcpostgresql/src/postgresql.cc
deleted file mode 100644
index d4be5ce..0000000
--- a/r/adbcpostgresql/src/postgresql.cc
+++ /dev/null
@@ -1,499 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// A libpq-based PostgreSQL driver for ADBC.
-
-#include <cstring>
-#include <memory>
-
-#include <adbc.h>
-
-#include "connection.h"
-#include "database.h"
-#include "statement.h"
-#include "util.h"
-
-using adbcpq::PostgresConnection;
-using adbcpq::PostgresDatabase;
-using adbcpq::PostgresStatement;
-
-// ---------------------------------------------------------------------
-// ADBC interface implementation - as private functions so that these
-// don't get replaced by the dynamic linker. If we implemented these
-// under the Adbc* names, then DriverInit, the linker may resolve
-// functions to the address of the functions provided by the driver
-// manager instead of our functions.
-//
-// We could also:
-// - Play games with RTLD_DEEPBIND - but this doesn't work with ASan
-// - Use __attribute__((visibility("protected"))) - but this is
-// apparently poorly supported by some linkers
-// - Play with -Bsymbolic(-functions) - but this has other
-// consequences and complicates the build setup
-//
-// So in the end some manual effort here was chosen.
-
-// ---------------------------------------------------------------------
-// AdbcDatabase
-
-namespace {
-using adbcpq::SetError;
-AdbcStatusCode PostgresDatabaseInit(struct AdbcDatabase* database,
- struct AdbcError* error) {
- if (!database || !database->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
reinterpret_cast<std::shared_ptr<PostgresDatabase>*>(database->private_data);
- return (*ptr)->Init(error);
-}
-
-AdbcStatusCode PostgresDatabaseNew(struct AdbcDatabase* database,
- struct AdbcError* error) {
- if (!database) {
- SetError(error, "database must not be null");
- return ADBC_STATUS_INVALID_STATE;
- }
- if (database->private_data) {
- SetError(error, "database is already initialized");
- return ADBC_STATUS_INVALID_STATE;
- }
- auto impl = std::make_shared<PostgresDatabase>();
- database->private_data = new std::shared_ptr<PostgresDatabase>(impl);
- return ADBC_STATUS_OK;
-}
-
-AdbcStatusCode PostgresDatabaseRelease(struct AdbcDatabase* database,
- struct AdbcError* error) {
- if (!database->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
reinterpret_cast<std::shared_ptr<PostgresDatabase>*>(database->private_data);
- AdbcStatusCode status = (*ptr)->Release(error);
- delete ptr;
- database->private_data = nullptr;
- return status;
-}
-
-AdbcStatusCode PostgresDatabaseSetOption(struct AdbcDatabase* database, const
char* key,
- const char* value, struct AdbcError*
error) {
- if (!database || !database->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
reinterpret_cast<std::shared_ptr<PostgresDatabase>*>(database->private_data);
- return (*ptr)->SetOption(key, value, error);
-}
-} // namespace
-
-AdbcStatusCode AdbcDatabaseInit(struct AdbcDatabase* database, struct
AdbcError* error) {
- return PostgresDatabaseInit(database, error);
-}
-
-AdbcStatusCode AdbcDatabaseNew(struct AdbcDatabase* database, struct
AdbcError* error) {
- return PostgresDatabaseNew(database, error);
-}
-
-AdbcStatusCode AdbcDatabaseRelease(struct AdbcDatabase* database,
- struct AdbcError* error) {
- return PostgresDatabaseRelease(database, error);
-}
-
-AdbcStatusCode AdbcDatabaseSetOption(struct AdbcDatabase* database, const
char* key,
- const char* value, struct AdbcError*
error) {
- return PostgresDatabaseSetOption(database, key, value, error);
-}
-
-// ---------------------------------------------------------------------
-// AdbcConnection
-
-namespace {
-AdbcStatusCode PostgresConnectionCommit(struct AdbcConnection* connection,
- struct AdbcError* error) {
- if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
-
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
- return (*ptr)->Commit(error);
-}
-
-AdbcStatusCode PostgresConnectionGetInfo(struct AdbcConnection* connection,
- uint32_t* info_codes, size_t
info_codes_length,
- struct ArrowArrayStream* stream,
- struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
-}
-
-AdbcStatusCode PostgresConnectionGetObjects(
- struct AdbcConnection* connection, int depth, const char* catalog,
- const char* db_schema, const char* table_name, const char** table_types,
- const char* column_name, struct ArrowArrayStream* stream, struct
AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
-}
-
-AdbcStatusCode PostgresConnectionGetTableSchema(
- struct AdbcConnection* connection, const char* catalog, const char*
db_schema,
- const char* table_name, struct ArrowSchema* schema, struct AdbcError*
error) {
- if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
-
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
- return (*ptr)->GetTableSchema(catalog, db_schema, table_name, schema, error);
-}
-
-AdbcStatusCode PostgresConnectionGetTableTypes(struct AdbcConnection*
connection,
- struct ArrowArrayStream* stream,
- struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
-}
-
-AdbcStatusCode PostgresConnectionInit(struct AdbcConnection* connection,
- struct AdbcDatabase* database,
- struct AdbcError* error) {
- if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
-
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
- return (*ptr)->Init(database, error);
-}
-
-AdbcStatusCode PostgresConnectionNew(struct AdbcConnection* connection,
- struct AdbcError* error) {
- auto impl = std::make_shared<PostgresConnection>();
- connection->private_data = new std::shared_ptr<PostgresConnection>(impl);
- return ADBC_STATUS_OK;
-}
-
-AdbcStatusCode PostgresConnectionReadPartition(struct AdbcConnection*
connection,
- const uint8_t*
serialized_partition,
- size_t serialized_length,
- struct ArrowArrayStream* out,
- struct AdbcError* error) {
- if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
- return ADBC_STATUS_NOT_IMPLEMENTED;
-}
-
-AdbcStatusCode PostgresConnectionRelease(struct AdbcConnection* connection,
- struct AdbcError* error) {
- if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
-
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
- AdbcStatusCode status = (*ptr)->Release(error);
- delete ptr;
- connection->private_data = nullptr;
- return status;
-}
-
-AdbcStatusCode PostgresConnectionRollback(struct AdbcConnection* connection,
- struct AdbcError* error) {
- if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
-
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
- return (*ptr)->Rollback(error);
-}
-
-AdbcStatusCode PostgresConnectionSetOption(struct AdbcConnection* connection,
- const char* key, const char* value,
- struct AdbcError* error) {
- if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
-
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
- return (*ptr)->SetOption(key, value, error);
-}
-
-} // namespace
-AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection,
- struct AdbcError* error) {
- return PostgresConnectionCommit(connection, error);
-}
-
-AdbcStatusCode AdbcConnectionGetInfo(struct AdbcConnection* connection,
- uint32_t* info_codes, size_t
info_codes_length,
- struct ArrowArrayStream* stream,
- struct AdbcError* error) {
- return PostgresConnectionGetInfo(connection, info_codes, info_codes_length,
stream,
- error);
-}
-
-AdbcStatusCode AdbcConnectionGetObjects(struct AdbcConnection* connection, int
depth,
- const char* catalog, const char*
db_schema,
- const char* table_name, const char**
table_types,
- const char* column_name,
- struct ArrowArrayStream* stream,
- struct AdbcError* error) {
- return PostgresConnectionGetObjects(connection, depth, catalog, db_schema,
table_name,
- table_types, column_name, stream, error);
-}
-
-AdbcStatusCode AdbcConnectionGetTableSchema(struct AdbcConnection* connection,
- const char* catalog, const char*
db_schema,
- const char* table_name,
- struct ArrowSchema* schema,
- struct AdbcError* error) {
- return PostgresConnectionGetTableSchema(connection, catalog, db_schema,
table_name,
- schema, error);
-}
-
-AdbcStatusCode AdbcConnectionGetTableTypes(struct AdbcConnection* connection,
- struct ArrowArrayStream* stream,
- struct AdbcError* error) {
- return PostgresConnectionGetTableTypes(connection, stream, error);
-}
-
-AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
- struct AdbcDatabase* database,
- struct AdbcError* error) {
- return PostgresConnectionInit(connection, database, error);
-}
-
-AdbcStatusCode AdbcConnectionNew(struct AdbcConnection* connection,
- struct AdbcError* error) {
- return PostgresConnectionNew(connection, error);
-}
-
-AdbcStatusCode AdbcConnectionReadPartition(struct AdbcConnection* connection,
- const uint8_t* serialized_partition,
- size_t serialized_length,
- struct ArrowArrayStream* out,
- struct AdbcError* error) {
- return PostgresConnectionReadPartition(connection, serialized_partition,
- serialized_length, out, error);
-}
-
-AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
- struct AdbcError* error) {
- return PostgresConnectionRelease(connection, error);
-}
-
-AdbcStatusCode AdbcConnectionRollback(struct AdbcConnection* connection,
- struct AdbcError* error) {
- return PostgresConnectionRollback(connection, error);
-}
-
-AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection,
const char* key,
- const char* value, struct AdbcError*
error) {
- return PostgresConnectionSetOption(connection, key, value, error);
-}
-
-// ---------------------------------------------------------------------
-// AdbcStatement
-
-namespace {
-AdbcStatusCode PostgresStatementBind(struct AdbcStatement* statement,
- struct ArrowArray* values,
- struct ArrowSchema* schema,
- struct AdbcError* error) {
- if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
- auto* ptr =
-
reinterpret_cast<std::shared_ptr<PostgresStatement>*>(statement->private_data);
- return (*ptr)->Bind(values, schema, error);
-}
-
-AdbcStatusCode PostgresStatementBindStream(struct AdbcStatement* statement,
- struct ArrowArrayStream* stream,
- struct AdbcError* error) {
- if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
- auto* ptr =
-
reinterpret_cast<std::shared_ptr<PostgresStatement>*>(statement->private_data);
- return (*ptr)->Bind(stream, error);
-}
-
-AdbcStatusCode PostgresStatementExecutePartitions(struct AdbcStatement*
statement,
- struct ArrowSchema* schema,
- struct AdbcPartitions*
partitions,
- int64_t* rows_affected,
- struct AdbcError* error) {
- if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
- auto* ptr =
-
reinterpret_cast<std::shared_ptr<PostgresStatement>*>(statement->private_data);
- return ADBC_STATUS_NOT_IMPLEMENTED;
-}
-
-AdbcStatusCode PostgresStatementExecuteQuery(struct AdbcStatement* statement,
- struct ArrowArrayStream* output,
- int64_t* rows_affected,
- struct AdbcError* error) {
- if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
- auto* ptr =
-
reinterpret_cast<std::shared_ptr<PostgresStatement>*>(statement->private_data);
- return (*ptr)->ExecuteQuery(output, rows_affected, error);
-}
-
-AdbcStatusCode PostgresStatementGetPartitionDesc(struct AdbcStatement*
statement,
- uint8_t* partition_desc,
- struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
-}
-
-AdbcStatusCode PostgresStatementGetPartitionDescSize(struct AdbcStatement*
statement,
- size_t* length,
- struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
-}
-
-AdbcStatusCode PostgresStatementGetParameterSchema(struct AdbcStatement*
statement,
- struct ArrowSchema* schema,
- struct AdbcError* error) {
- if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
- auto* ptr =
-
reinterpret_cast<std::shared_ptr<PostgresStatement>*>(statement->private_data);
- return (*ptr)->GetParameterSchema(schema, error);
-}
-
-AdbcStatusCode PostgresStatementNew(struct AdbcConnection* connection,
- struct AdbcStatement* statement,
- struct AdbcError* error) {
- auto impl = std::make_shared<PostgresStatement>();
- statement->private_data = new std::shared_ptr<PostgresStatement>(impl);
- return impl->New(connection, error);
-}
-
-AdbcStatusCode PostgresStatementPrepare(struct AdbcStatement* statement,
- struct AdbcError* error) {
- if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
- auto* ptr =
-
reinterpret_cast<std::shared_ptr<PostgresStatement>*>(statement->private_data);
- return (*ptr)->Prepare(error);
-}
-
-AdbcStatusCode PostgresStatementRelease(struct AdbcStatement* statement,
- struct AdbcError* error) {
- if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
- auto* ptr =
-
reinterpret_cast<std::shared_ptr<PostgresStatement>*>(statement->private_data);
- auto status = (*ptr)->Release(error);
- delete ptr;
- statement->private_data = nullptr;
- return status;
-}
-
-AdbcStatusCode PostgresStatementSetOption(struct AdbcStatement* statement,
- const char* key, const char* value,
- struct AdbcError* error) {
- if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
- auto* ptr =
-
reinterpret_cast<std::shared_ptr<PostgresStatement>*>(statement->private_data);
- return (*ptr)->SetOption(key, value, error);
-}
-
-AdbcStatusCode PostgresStatementSetSqlQuery(struct AdbcStatement* statement,
- const char* query, struct
AdbcError* error) {
- if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
- auto* ptr =
-
reinterpret_cast<std::shared_ptr<PostgresStatement>*>(statement->private_data);
- return (*ptr)->SetSqlQuery(query, error);
-}
-} // namespace
-
-AdbcStatusCode AdbcStatementBind(struct AdbcStatement* statement,
- struct ArrowArray* values, struct
ArrowSchema* schema,
- struct AdbcError* error) {
- return PostgresStatementBind(statement, values, schema, error);
-}
-
-AdbcStatusCode AdbcStatementBindStream(struct AdbcStatement* statement,
- struct ArrowArrayStream* stream,
- struct AdbcError* error) {
- return PostgresStatementBindStream(statement, stream, error);
-}
-
-AdbcStatusCode AdbcStatementExecutePartitions(struct AdbcStatement* statement,
- ArrowSchema* schema,
- struct AdbcPartitions*
partitions,
- int64_t* rows_affected,
- struct AdbcError* error) {
- return PostgresStatementExecutePartitions(statement, schema, partitions,
rows_affected,
- error);
-}
-
-AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement,
- struct ArrowArrayStream* output,
- int64_t* rows_affected,
- struct AdbcError* error) {
- return PostgresStatementExecuteQuery(statement, output, rows_affected,
error);
-}
-
-AdbcStatusCode AdbcStatementGetPartitionDesc(struct AdbcStatement* statement,
- uint8_t* partition_desc,
- struct AdbcError* error) {
- return PostgresStatementGetPartitionDesc(statement, partition_desc, error);
-}
-
-AdbcStatusCode AdbcStatementGetPartitionDescSize(struct AdbcStatement*
statement,
- size_t* length,
- struct AdbcError* error) {
- return PostgresStatementGetPartitionDescSize(statement, length, error);
-}
-
-AdbcStatusCode AdbcStatementGetParameterSchema(struct AdbcStatement* statement,
- struct ArrowSchema* schema,
- struct AdbcError* error) {
- return PostgresStatementGetParameterSchema(statement, schema, error);
-}
-
-AdbcStatusCode AdbcStatementNew(struct AdbcConnection* connection,
- struct AdbcStatement* statement,
- struct AdbcError* error) {
- return PostgresStatementNew(connection, statement, error);
-}
-
-AdbcStatusCode AdbcStatementPrepare(struct AdbcStatement* statement,
- struct AdbcError* error) {
- return PostgresStatementPrepare(statement, error);
-}
-
-AdbcStatusCode AdbcStatementRelease(struct AdbcStatement* statement,
- struct AdbcError* error) {
- return PostgresStatementRelease(statement, error);
-}
-
-AdbcStatusCode AdbcStatementSetOption(struct AdbcStatement* statement, const
char* key,
- const char* value, struct AdbcError*
error) {
- return PostgresStatementSetOption(statement, key, value, error);
-}
-
-AdbcStatusCode AdbcStatementSetSqlQuery(struct AdbcStatement* statement,
- const char* query, struct AdbcError*
error) {
- return PostgresStatementSetSqlQuery(statement, query, error);
-}
-
-extern "C" {
-ADBC_EXPORT
-AdbcStatusCode AdbcDriverInit(int version, void* raw_driver, struct AdbcError*
error) {
- if (version != ADBC_VERSION_1_0_0) return ADBC_STATUS_NOT_IMPLEMENTED;
-
- auto* driver = reinterpret_cast<struct AdbcDriver*>(raw_driver);
- std::memset(driver, 0, sizeof(*driver));
- driver->DatabaseInit = PostgresDatabaseInit;
- driver->DatabaseNew = PostgresDatabaseNew;
- driver->DatabaseRelease = PostgresDatabaseRelease;
- driver->DatabaseSetOption = PostgresDatabaseSetOption;
-
- driver->ConnectionCommit = PostgresConnectionCommit;
- driver->ConnectionGetInfo = PostgresConnectionGetInfo;
- driver->ConnectionGetObjects = PostgresConnectionGetObjects;
- driver->ConnectionGetTableSchema = PostgresConnectionGetTableSchema;
- driver->ConnectionGetTableTypes = PostgresConnectionGetTableTypes;
- driver->ConnectionInit = PostgresConnectionInit;
- driver->ConnectionNew = PostgresConnectionNew;
- driver->ConnectionReadPartition = PostgresConnectionReadPartition;
- driver->ConnectionRelease = PostgresConnectionRelease;
- driver->ConnectionRollback = PostgresConnectionRollback;
- driver->ConnectionSetOption = PostgresConnectionSetOption;
-
- driver->StatementBind = PostgresStatementBind;
- driver->StatementBindStream = PostgresStatementBindStream;
- driver->StatementExecutePartitions = PostgresStatementExecutePartitions;
- driver->StatementExecuteQuery = PostgresStatementExecuteQuery;
- driver->StatementGetParameterSchema = PostgresStatementGetParameterSchema;
- driver->StatementNew = PostgresStatementNew;
- driver->StatementPrepare = PostgresStatementPrepare;
- driver->StatementRelease = PostgresStatementRelease;
- driver->StatementSetOption = PostgresStatementSetOption;
- driver->StatementSetSqlQuery = PostgresStatementSetSqlQuery;
- return ADBC_STATUS_OK;
-}
-}