This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 448175e refactor(c/driver/postgresql): Postgres class helper for
GetObjects (#711)
448175e is described below
commit 448175ee3dea6dde4e126bca847383e4a0f929f9
Author: William Ayd <[email protected]>
AuthorDate: Fri May 26 12:34:47 2023 -0700
refactor(c/driver/postgresql): Postgres class helper for GetObjects (#711)
Not quite what is being done in
https://github.com/apache/arrow-adbc/blob/main/go/adbc/driver/internal/shared_utils.go
but I think gets us closer
---
c/driver/postgresql/connection.cc | 338 +++++++++++++++++++++-----------------
1 file changed, 190 insertions(+), 148 deletions(-)
diff --git a/c/driver/postgresql/connection.cc
b/c/driver/postgresql/connection.cc
index 2cee78e..4730721 100644
--- a/c/driver/postgresql/connection.cc
+++ b/c/driver/postgresql/connection.cc
@@ -126,6 +126,192 @@ class PqResultHelper {
std::string query_;
};
+class PqGetObjectsHelper {
+ public:
+ PqGetObjectsHelper(PGconn* conn, int depth, const char* catalog, const char*
db_schema,
+ const char* table_name, const char** table_types,
+ const char* column_name, struct ArrowSchema* schema,
+ struct ArrowArray* array, struct AdbcError* error)
+ : conn_(conn),
+ depth_(depth),
+ catalog_(catalog),
+ db_schema_(db_schema),
+ table_name_(table_name),
+ table_types_(table_types),
+ column_name_(column_name),
+ schema_(schema),
+ array_(array),
+ error_(error) {
+ na_error_ = {0};
+ }
+
+ AdbcStatusCode GetObjects() {
+ PqResultHelper curr_db_helper = PqResultHelper{conn_, "SELECT
current_database()"};
+ if (curr_db_helper.Status() == PGRES_TUPLES_OK) {
+ assert(curr_db_helper.NumRows() == 1);
+ auto curr_iter = curr_db_helper.begin();
+ PqResultRow db_row = *curr_iter;
+ current_db_ = std::string(db_row[0].data);
+ } else {
+ return ADBC_STATUS_INTERNAL;
+ }
+
+ RAISE_ADBC(InitArrowArray());
+
+ catalog_name_col_ = array_->children[0];
+ catalog_db_schemas_col_ = array_->children[1];
+ catalog_db_schemas_items_ = catalog_db_schemas_col_->children[0];
+ db_schema_name_col_ = catalog_db_schemas_items_->children[0];
+ db_schema_tables_col_ = catalog_db_schemas_items_->children[1];
+
+ RAISE_ADBC(AppendCatalogs());
+ RAISE_ADBC(FinishArrowArray());
+ return ADBC_STATUS_OK;
+ }
+
+ private:
+ AdbcStatusCode InitArrowArray() {
+ RAISE_ADBC(AdbcInitConnectionObjectsSchema(schema_, error_));
+
+ CHECK_NA_DETAIL(INTERNAL, ArrowArrayInitFromSchema(array_, schema_,
&na_error_),
+ &na_error_, error_);
+
+ CHECK_NA(INTERNAL, ArrowArrayStartAppending(array_), error_);
+ return ADBC_STATUS_OK;
+ }
+
+ AdbcStatusCode AppendSchemas(std::string db_name) {
+ // postgres only allows you to list schemas for the currently connected db
+ if (db_name == current_db_) {
+ struct StringBuilder query = {0};
+ if (StringBuilderInit(&query, /*initial_size*/ 256)) {
+ return ADBC_STATUS_INTERNAL;
+ }
+
+ const char* stmt =
+ "SELECT nspname FROM pg_catalog.pg_namespace WHERE "
+ "nspname !~ '^pg_' AND nspname <> 'information_schema'";
+
+ if (StringBuilderAppend(&query, "%s", stmt)) {
+ StringBuilderReset(&query);
+ return ADBC_STATUS_INTERNAL;
+ }
+
+ if (db_schema_ != NULL) {
+ char* schema_name = PQescapeIdentifier(conn_, db_schema_,
strlen(db_schema_));
+ if (schema_name == NULL) {
+ SetError(error_, "%s%s", "Failed to escape schema: ",
PQerrorMessage(conn_));
+ StringBuilderReset(&query);
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+
+ int res =
+ StringBuilderAppend(&query, "%s%s%s", " AND nspname ='",
schema_name, "'");
+ PQfreemem(schema_name);
+ if (res) {
+ return ADBC_STATUS_INTERNAL;
+ }
+ }
+
+ auto result_helper = PqResultHelper{conn_, query.buffer};
+ StringBuilderReset(&query);
+
+ if (result_helper.Status() == PGRES_TUPLES_OK) {
+ for (PqResultRow row : result_helper) {
+ const char* schema_name = row[0].data;
+ CHECK_NA(
+ INTERNAL,
+ ArrowArrayAppendString(db_schema_name_col_,
ArrowCharView(schema_name)),
+ error_);
+ if (depth_ >= ADBC_OBJECT_DEPTH_TABLES) {
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+ } else {
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(db_schema_tables_col_, 1),
error_);
+ }
+ CHECK_NA(INTERNAL,
ArrowArrayFinishElement(catalog_db_schemas_items_), error_);
+ }
+ } else {
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+ }
+ }
+
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(catalog_db_schemas_col_),
error_);
+ return ADBC_STATUS_OK;
+ }
+
+ AdbcStatusCode AppendCatalogs() {
+ struct StringBuilder query = {0};
+ if (StringBuilderInit(&query, /*initial_size=*/256) != 0) return
ADBC_STATUS_INTERNAL;
+
+ if (StringBuilderAppend(&query, "%s", "SELECT datname FROM
pg_catalog.pg_database")) {
+ return ADBC_STATUS_INTERNAL;
+ }
+
+ if (catalog_ != NULL) {
+ char* catalog_name = PQescapeIdentifier(conn_, catalog_,
strlen(catalog_));
+ if (catalog_name == NULL) {
+ SetError(error_, "%s%s", "Failed to escape catalog: ",
PQerrorMessage(conn_));
+ StringBuilderReset(&query);
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+
+ int res =
+ StringBuilderAppend(&query, "%s%s%s", " WHERE datname = '",
catalog_name, "'");
+ PQfreemem(catalog_name);
+ if (res) {
+ return ADBC_STATUS_INTERNAL;
+ }
+ }
+
+ PqResultHelper result_helper = PqResultHelper{conn_, query.buffer};
+ StringBuilderReset(&query);
+
+ if (result_helper.Status() == PGRES_TUPLES_OK) {
+ for (PqResultRow row : result_helper) {
+ const char* db_name = row[0].data;
+ CHECK_NA(INTERNAL,
+ ArrowArrayAppendString(catalog_name_col_,
ArrowCharView(db_name)),
+ error_);
+ if (depth_ == ADBC_OBJECT_DEPTH_CATALOGS) {
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(catalog_db_schemas_col_, 1),
error_);
+ } else {
+ RAISE_ADBC(AppendSchemas(std::string(db_name)));
+ }
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(array_), error_);
+ }
+ } else {
+ return ADBC_STATUS_INTERNAL;
+ }
+
+ return ADBC_STATUS_OK;
+ }
+
+ AdbcStatusCode FinishArrowArray() {
+ CHECK_NA_DETAIL(INTERNAL, ArrowArrayFinishBuildingDefault(array_,
&na_error_),
+ &na_error_, error_);
+
+ return ADBC_STATUS_OK;
+ }
+
+ PGconn* conn_;
+ int depth_;
+ const char* catalog_;
+ const char* db_schema_;
+ const char* table_name_;
+ const char** table_types_;
+ const char* column_name_;
+ struct ArrowSchema* schema_;
+ struct ArrowArray* array_;
+ struct AdbcError* error_;
+ struct ArrowError na_error_;
+ std::string current_db_;
+ struct ArrowArray* catalog_name_col_;
+ struct ArrowArray* catalog_db_schemas_col_;
+ struct ArrowArray* catalog_db_schemas_items_;
+ struct ArrowArray* db_schema_name_col_;
+ struct ArrowArray* db_schema_tables_col_;
+};
+
} // namespace
namespace adbcpq {
@@ -216,151 +402,6 @@ AdbcStatusCode PostgresConnection::GetInfo(struct
AdbcConnection* connection,
return BatchToArrayStream(&array, &schema, out, error);
}
-AdbcStatusCode PostgresConnectionGetSchemasImpl(PGconn* conn, int depth,
- const char* db_name,
- const char* db_schema,
- struct ArrowArray*
db_schemas_list,
- struct AdbcError* error) {
- struct ArrowArray* db_schema_items = db_schemas_list->children[0];
- struct ArrowArray* db_schema_names = db_schema_items->children[0];
- struct ArrowArray* db_schema_tables_list = db_schema_items->children[1];
-
- // inefficient to place here but better localized until we do a class-based
refactor
- std::string curr_db;
- PqResultHelper curr_db_helper = PqResultHelper{conn, "SELECT
current_database()"};
- if (curr_db_helper.Status() == PGRES_TUPLES_OK) {
- assert(curr_db_helper.NumRows() == 1);
- auto curr_iter = curr_db_helper.begin();
- PqResultRow db_row = *curr_iter;
- curr_db = std::string(db_row[0].data);
- } else {
- return ADBC_STATUS_INTERNAL;
- }
-
- // postgres only allows you to list schemas for the currently connected db
- if (strcmp(db_name, curr_db.c_str()) == 0) {
- struct StringBuilder query = {0};
- if (StringBuilderInit(&query, /*initial_size*/ 256)) {
- return ADBC_STATUS_INTERNAL;
- }
-
- const char* stmt =
- "SELECT nspname FROM pg_catalog.pg_namespace WHERE "
- "nspname !~ '^pg_' AND nspname <> 'information_schema'";
-
- if (StringBuilderAppend(&query, "%s", stmt)) {
- StringBuilderReset(&query);
- return ADBC_STATUS_INTERNAL;
- }
-
- if (db_schema != NULL) {
- char* schema_name = PQescapeIdentifier(conn, db_schema,
strlen(db_schema));
- if (schema_name == NULL) {
- SetError(error, "%s%s", "Failed to escape schema: ",
PQerrorMessage(conn));
- StringBuilderReset(&query);
- return ADBC_STATUS_INVALID_ARGUMENT;
- }
-
- int res =
- StringBuilderAppend(&query, "%s%s%s", " AND nspname ='",
schema_name, "'");
- PQfreemem(schema_name);
- if (res) {
- return ADBC_STATUS_INTERNAL;
- }
- }
-
- auto result_helper = PqResultHelper{conn, query.buffer};
- StringBuilderReset(&query);
-
- if (result_helper.Status() == PGRES_TUPLES_OK) {
- for (PqResultRow row : result_helper) {
- const char* schema_name = row[0].data;
- CHECK_NA(INTERNAL,
- ArrowArrayAppendString(db_schema_names,
ArrowCharView(schema_name)),
- error);
- if (depth >= ADBC_OBJECT_DEPTH_TABLES) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
- } else {
- CHECK_NA(INTERNAL, ArrowArrayAppendNull(db_schema_tables_list, 1),
error);
- }
- CHECK_NA(INTERNAL, ArrowArrayFinishElement(db_schema_items), error);
- }
- } else {
- return ADBC_STATUS_NOT_IMPLEMENTED;
- }
- }
-
- CHECK_NA(INTERNAL, ArrowArrayFinishElement(db_schemas_list), error);
-
- return ADBC_STATUS_OK;
-}
-
-AdbcStatusCode PostgresConnectionGetObjectsImpl(
- PGconn* conn, int depth, const char* catalog, const char* db_schema,
- const char* table_name, const char** table_types, const char* column_name,
- struct ArrowSchema* schema, struct ArrowArray* array, struct AdbcError*
error) {
- RAISE_ADBC(AdbcInitConnectionObjectsSchema(schema, error));
-
- struct ArrowError na_error = {0};
- CHECK_NA_DETAIL(INTERNAL, ArrowArrayInitFromSchema(array, schema,
&na_error), &na_error,
- error);
- CHECK_NA(INTERNAL, ArrowArrayStartAppending(array), error);
-
- struct ArrowArray* catalog_name_col = array->children[0];
- struct ArrowArray* catalog_db_schemas_col = array->children[1];
-
- struct ArrowArray* catalog_db_schemas_items =
catalog_db_schemas_col->children[0];
- struct ArrowArray* db_schema_name_col =
catalog_db_schemas_items->children[0];
- struct ArrowArray* db_schema_tables_col =
catalog_db_schemas_items->children[1];
-
- struct StringBuilder query = {0};
- if (StringBuilderInit(&query, /*initial_size=*/256) != 0) return
ADBC_STATUS_INTERNAL;
-
- if (StringBuilderAppend(&query, "%s", "SELECT datname FROM
pg_catalog.pg_database")) {
- return ADBC_STATUS_INTERNAL;
- }
-
- if (catalog != NULL) {
- char* catalog_name = PQescapeIdentifier(conn, catalog, strlen(catalog));
- if (catalog_name == NULL) {
- SetError(error, "%s%s", "Failed to escape catalog: ",
PQerrorMessage(conn));
- StringBuilderReset(&query);
- return ADBC_STATUS_INVALID_ARGUMENT;
- }
-
- int res =
- StringBuilderAppend(&query, "%s%s%s", " WHERE datname = '",
catalog_name, "'");
- PQfreemem(catalog_name);
- if (res) {
- return ADBC_STATUS_INTERNAL;
- }
- }
-
- PqResultHelper result_helper = PqResultHelper{conn, query.buffer};
- StringBuilderReset(&query);
-
- if (result_helper.Status() == PGRES_TUPLES_OK) {
- for (PqResultRow row : result_helper) {
- const char* db_name = row[0].data;
- CHECK_NA(INTERNAL, ArrowArrayAppendString(catalog_name_col,
ArrowCharView(db_name)),
- error);
- if (depth == ADBC_OBJECT_DEPTH_CATALOGS) {
- CHECK_NA(INTERNAL, ArrowArrayAppendNull(catalog_db_schemas_col, 1),
error);
- } else {
- RAISE_ADBC(PostgresConnectionGetSchemasImpl(conn, depth, db_name,
db_schema,
- catalog_db_schemas_col,
error));
- }
- CHECK_NA(INTERNAL, ArrowArrayFinishElement(array), error);
- }
- } else {
- return ADBC_STATUS_INTERNAL;
- }
-
- CHECK_NA_DETAIL(INTERNAL, ArrowArrayFinishBuildingDefault(array, &na_error),
&na_error,
- error);
- return ADBC_STATUS_OK;
-}
-
AdbcStatusCode PostgresConnection::GetObjects(
struct AdbcConnection* connection, int depth, const char* catalog,
const char* db_schema, const char* table_name, const char** table_types,
@@ -368,9 +409,10 @@ AdbcStatusCode PostgresConnection::GetObjects(
struct ArrowSchema schema = {0};
struct ArrowArray array = {0};
- AdbcStatusCode status =
- PostgresConnectionGetObjectsImpl(conn_, depth, catalog, db_schema,
table_name,
- table_types, column_name, &schema,
&array, error);
+ PqGetObjectsHelper helper =
+ PqGetObjectsHelper(conn_, depth, catalog, db_schema, table_name,
table_types,
+ column_name, &schema, &array, error);
+ AdbcStatusCode status = helper.GetObjects();
if (status != ADBC_STATUS_OK) {
if (schema.release) schema.release(&schema);