This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 448175e  refactor(c/driver/postgresql): Postgres class helper for 
GetObjects (#711)
448175e is described below

commit 448175ee3dea6dde4e126bca847383e4a0f929f9
Author: William Ayd <[email protected]>
AuthorDate: Fri May 26 12:34:47 2023 -0700

    refactor(c/driver/postgresql): Postgres class helper for GetObjects (#711)
    
    Not quite what is being done in
    
https://github.com/apache/arrow-adbc/blob/main/go/adbc/driver/internal/shared_utils.go
    but I think gets us closer
---
 c/driver/postgresql/connection.cc | 338 +++++++++++++++++++++-----------------
 1 file changed, 190 insertions(+), 148 deletions(-)

diff --git a/c/driver/postgresql/connection.cc 
b/c/driver/postgresql/connection.cc
index 2cee78e..4730721 100644
--- a/c/driver/postgresql/connection.cc
+++ b/c/driver/postgresql/connection.cc
@@ -126,6 +126,192 @@ class PqResultHelper {
   std::string query_;
 };
 
+class PqGetObjectsHelper {
+ public:
+  PqGetObjectsHelper(PGconn* conn, int depth, const char* catalog, const char* 
db_schema,
+                     const char* table_name, const char** table_types,
+                     const char* column_name, struct ArrowSchema* schema,
+                     struct ArrowArray* array, struct AdbcError* error)
+      : conn_(conn),
+        depth_(depth),
+        catalog_(catalog),
+        db_schema_(db_schema),
+        table_name_(table_name),
+        table_types_(table_types),
+        column_name_(column_name),
+        schema_(schema),
+        array_(array),
+        error_(error) {
+    na_error_ = {0};
+  }
+
+  AdbcStatusCode GetObjects() {
+    PqResultHelper curr_db_helper = PqResultHelper{conn_, "SELECT 
current_database()"};
+    if (curr_db_helper.Status() == PGRES_TUPLES_OK) {
+      assert(curr_db_helper.NumRows() == 1);
+      auto curr_iter = curr_db_helper.begin();
+      PqResultRow db_row = *curr_iter;
+      current_db_ = std::string(db_row[0].data);
+    } else {
+      return ADBC_STATUS_INTERNAL;
+    }
+
+    RAISE_ADBC(InitArrowArray());
+
+    catalog_name_col_ = array_->children[0];
+    catalog_db_schemas_col_ = array_->children[1];
+    catalog_db_schemas_items_ = catalog_db_schemas_col_->children[0];
+    db_schema_name_col_ = catalog_db_schemas_items_->children[0];
+    db_schema_tables_col_ = catalog_db_schemas_items_->children[1];
+
+    RAISE_ADBC(AppendCatalogs());
+    RAISE_ADBC(FinishArrowArray());
+    return ADBC_STATUS_OK;
+  }
+
+ private:
+  AdbcStatusCode InitArrowArray() {
+    RAISE_ADBC(AdbcInitConnectionObjectsSchema(schema_, error_));
+
+    CHECK_NA_DETAIL(INTERNAL, ArrowArrayInitFromSchema(array_, schema_, 
&na_error_),
+                    &na_error_, error_);
+
+    CHECK_NA(INTERNAL, ArrowArrayStartAppending(array_), error_);
+    return ADBC_STATUS_OK;
+  }
+
+  AdbcStatusCode AppendSchemas(std::string db_name) {
+    // postgres only allows you to list schemas for the currently connected db
+    if (db_name == current_db_) {
+      struct StringBuilder query = {0};
+      if (StringBuilderInit(&query, /*initial_size*/ 256)) {
+        return ADBC_STATUS_INTERNAL;
+      }
+
+      const char* stmt =
+          "SELECT nspname FROM pg_catalog.pg_namespace WHERE "
+          "nspname !~ '^pg_' AND nspname <> 'information_schema'";
+
+      if (StringBuilderAppend(&query, "%s", stmt)) {
+        StringBuilderReset(&query);
+        return ADBC_STATUS_INTERNAL;
+      }
+
+      if (db_schema_ != NULL) {
+        char* schema_name = PQescapeIdentifier(conn_, db_schema_, 
strlen(db_schema_));
+        if (schema_name == NULL) {
+          SetError(error_, "%s%s", "Failed to escape schema: ", 
PQerrorMessage(conn_));
+          StringBuilderReset(&query);
+          return ADBC_STATUS_INVALID_ARGUMENT;
+        }
+
+        int res =
+            StringBuilderAppend(&query, "%s%s%s", " AND nspname ='", 
schema_name, "'");
+        PQfreemem(schema_name);
+        if (res) {
+          return ADBC_STATUS_INTERNAL;
+        }
+      }
+
+      auto result_helper = PqResultHelper{conn_, query.buffer};
+      StringBuilderReset(&query);
+
+      if (result_helper.Status() == PGRES_TUPLES_OK) {
+        for (PqResultRow row : result_helper) {
+          const char* schema_name = row[0].data;
+          CHECK_NA(
+              INTERNAL,
+              ArrowArrayAppendString(db_schema_name_col_, 
ArrowCharView(schema_name)),
+              error_);
+          if (depth_ >= ADBC_OBJECT_DEPTH_TABLES) {
+            return ADBC_STATUS_NOT_IMPLEMENTED;
+          } else {
+            CHECK_NA(INTERNAL, ArrowArrayAppendNull(db_schema_tables_col_, 1), 
error_);
+          }
+          CHECK_NA(INTERNAL, 
ArrowArrayFinishElement(catalog_db_schemas_items_), error_);
+        }
+      } else {
+        return ADBC_STATUS_NOT_IMPLEMENTED;
+      }
+    }
+
+    CHECK_NA(INTERNAL, ArrowArrayFinishElement(catalog_db_schemas_col_), 
error_);
+    return ADBC_STATUS_OK;
+  }
+
+  AdbcStatusCode AppendCatalogs() {
+    struct StringBuilder query = {0};
+    if (StringBuilderInit(&query, /*initial_size=*/256) != 0) return 
ADBC_STATUS_INTERNAL;
+
+    if (StringBuilderAppend(&query, "%s", "SELECT datname FROM 
pg_catalog.pg_database")) {
+      return ADBC_STATUS_INTERNAL;
+    }
+
+    if (catalog_ != NULL) {
+      char* catalog_name = PQescapeIdentifier(conn_, catalog_, 
strlen(catalog_));
+      if (catalog_name == NULL) {
+        SetError(error_, "%s%s", "Failed to escape catalog: ", 
PQerrorMessage(conn_));
+        StringBuilderReset(&query);
+        return ADBC_STATUS_INVALID_ARGUMENT;
+      }
+
+      int res =
+          StringBuilderAppend(&query, "%s%s%s", " WHERE datname = '", 
catalog_name, "'");
+      PQfreemem(catalog_name);
+      if (res) {
+        return ADBC_STATUS_INTERNAL;
+      }
+    }
+
+    PqResultHelper result_helper = PqResultHelper{conn_, query.buffer};
+    StringBuilderReset(&query);
+
+    if (result_helper.Status() == PGRES_TUPLES_OK) {
+      for (PqResultRow row : result_helper) {
+        const char* db_name = row[0].data;
+        CHECK_NA(INTERNAL,
+                 ArrowArrayAppendString(catalog_name_col_, 
ArrowCharView(db_name)),
+                 error_);
+        if (depth_ == ADBC_OBJECT_DEPTH_CATALOGS) {
+          CHECK_NA(INTERNAL, ArrowArrayAppendNull(catalog_db_schemas_col_, 1), 
error_);
+        } else {
+          RAISE_ADBC(AppendSchemas(std::string(db_name)));
+        }
+        CHECK_NA(INTERNAL, ArrowArrayFinishElement(array_), error_);
+      }
+    } else {
+      return ADBC_STATUS_INTERNAL;
+    }
+
+    return ADBC_STATUS_OK;
+  }
+
+  AdbcStatusCode FinishArrowArray() {
+    CHECK_NA_DETAIL(INTERNAL, ArrowArrayFinishBuildingDefault(array_, 
&na_error_),
+                    &na_error_, error_);
+
+    return ADBC_STATUS_OK;
+  }
+
+  PGconn* conn_;
+  int depth_;
+  const char* catalog_;
+  const char* db_schema_;
+  const char* table_name_;
+  const char** table_types_;
+  const char* column_name_;
+  struct ArrowSchema* schema_;
+  struct ArrowArray* array_;
+  struct AdbcError* error_;
+  struct ArrowError na_error_;
+  std::string current_db_;
+  struct ArrowArray* catalog_name_col_;
+  struct ArrowArray* catalog_db_schemas_col_;
+  struct ArrowArray* catalog_db_schemas_items_;
+  struct ArrowArray* db_schema_name_col_;
+  struct ArrowArray* db_schema_tables_col_;
+};
+
 }  // namespace
 
 namespace adbcpq {
@@ -216,151 +402,6 @@ AdbcStatusCode PostgresConnection::GetInfo(struct 
AdbcConnection* connection,
   return BatchToArrayStream(&array, &schema, out, error);
 }
 
-AdbcStatusCode PostgresConnectionGetSchemasImpl(PGconn* conn, int depth,
-                                                const char* db_name,
-                                                const char* db_schema,
-                                                struct ArrowArray* 
db_schemas_list,
-                                                struct AdbcError* error) {
-  struct ArrowArray* db_schema_items = db_schemas_list->children[0];
-  struct ArrowArray* db_schema_names = db_schema_items->children[0];
-  struct ArrowArray* db_schema_tables_list = db_schema_items->children[1];
-
-  // inefficient to place here but better localized until we do a class-based 
refactor
-  std::string curr_db;
-  PqResultHelper curr_db_helper = PqResultHelper{conn, "SELECT 
current_database()"};
-  if (curr_db_helper.Status() == PGRES_TUPLES_OK) {
-    assert(curr_db_helper.NumRows() == 1);
-    auto curr_iter = curr_db_helper.begin();
-    PqResultRow db_row = *curr_iter;
-    curr_db = std::string(db_row[0].data);
-  } else {
-    return ADBC_STATUS_INTERNAL;
-  }
-
-  // postgres only allows you to list schemas for the currently connected db
-  if (strcmp(db_name, curr_db.c_str()) == 0) {
-    struct StringBuilder query = {0};
-    if (StringBuilderInit(&query, /*initial_size*/ 256)) {
-      return ADBC_STATUS_INTERNAL;
-    }
-
-    const char* stmt =
-        "SELECT nspname FROM pg_catalog.pg_namespace WHERE "
-        "nspname !~ '^pg_' AND nspname <> 'information_schema'";
-
-    if (StringBuilderAppend(&query, "%s", stmt)) {
-      StringBuilderReset(&query);
-      return ADBC_STATUS_INTERNAL;
-    }
-
-    if (db_schema != NULL) {
-      char* schema_name = PQescapeIdentifier(conn, db_schema, 
strlen(db_schema));
-      if (schema_name == NULL) {
-        SetError(error, "%s%s", "Failed to escape schema: ", 
PQerrorMessage(conn));
-        StringBuilderReset(&query);
-        return ADBC_STATUS_INVALID_ARGUMENT;
-      }
-
-      int res =
-          StringBuilderAppend(&query, "%s%s%s", " AND nspname ='", 
schema_name, "'");
-      PQfreemem(schema_name);
-      if (res) {
-        return ADBC_STATUS_INTERNAL;
-      }
-    }
-
-    auto result_helper = PqResultHelper{conn, query.buffer};
-    StringBuilderReset(&query);
-
-    if (result_helper.Status() == PGRES_TUPLES_OK) {
-      for (PqResultRow row : result_helper) {
-        const char* schema_name = row[0].data;
-        CHECK_NA(INTERNAL,
-                 ArrowArrayAppendString(db_schema_names, 
ArrowCharView(schema_name)),
-                 error);
-        if (depth >= ADBC_OBJECT_DEPTH_TABLES) {
-          return ADBC_STATUS_NOT_IMPLEMENTED;
-        } else {
-          CHECK_NA(INTERNAL, ArrowArrayAppendNull(db_schema_tables_list, 1), 
error);
-        }
-        CHECK_NA(INTERNAL, ArrowArrayFinishElement(db_schema_items), error);
-      }
-    } else {
-      return ADBC_STATUS_NOT_IMPLEMENTED;
-    }
-  }
-
-  CHECK_NA(INTERNAL, ArrowArrayFinishElement(db_schemas_list), error);
-
-  return ADBC_STATUS_OK;
-}
-
-AdbcStatusCode PostgresConnectionGetObjectsImpl(
-    PGconn* conn, int depth, const char* catalog, const char* db_schema,
-    const char* table_name, const char** table_types, const char* column_name,
-    struct ArrowSchema* schema, struct ArrowArray* array, struct AdbcError* 
error) {
-  RAISE_ADBC(AdbcInitConnectionObjectsSchema(schema, error));
-
-  struct ArrowError na_error = {0};
-  CHECK_NA_DETAIL(INTERNAL, ArrowArrayInitFromSchema(array, schema, 
&na_error), &na_error,
-                  error);
-  CHECK_NA(INTERNAL, ArrowArrayStartAppending(array), error);
-
-  struct ArrowArray* catalog_name_col = array->children[0];
-  struct ArrowArray* catalog_db_schemas_col = array->children[1];
-
-  struct ArrowArray* catalog_db_schemas_items = 
catalog_db_schemas_col->children[0];
-  struct ArrowArray* db_schema_name_col = 
catalog_db_schemas_items->children[0];
-  struct ArrowArray* db_schema_tables_col = 
catalog_db_schemas_items->children[1];
-
-  struct StringBuilder query = {0};
-  if (StringBuilderInit(&query, /*initial_size=*/256) != 0) return 
ADBC_STATUS_INTERNAL;
-
-  if (StringBuilderAppend(&query, "%s", "SELECT datname FROM 
pg_catalog.pg_database")) {
-    return ADBC_STATUS_INTERNAL;
-  }
-
-  if (catalog != NULL) {
-    char* catalog_name = PQescapeIdentifier(conn, catalog, strlen(catalog));
-    if (catalog_name == NULL) {
-      SetError(error, "%s%s", "Failed to escape catalog: ", 
PQerrorMessage(conn));
-      StringBuilderReset(&query);
-      return ADBC_STATUS_INVALID_ARGUMENT;
-    }
-
-    int res =
-        StringBuilderAppend(&query, "%s%s%s", " WHERE datname = '", 
catalog_name, "'");
-    PQfreemem(catalog_name);
-    if (res) {
-      return ADBC_STATUS_INTERNAL;
-    }
-  }
-
-  PqResultHelper result_helper = PqResultHelper{conn, query.buffer};
-  StringBuilderReset(&query);
-
-  if (result_helper.Status() == PGRES_TUPLES_OK) {
-    for (PqResultRow row : result_helper) {
-      const char* db_name = row[0].data;
-      CHECK_NA(INTERNAL, ArrowArrayAppendString(catalog_name_col, 
ArrowCharView(db_name)),
-               error);
-      if (depth == ADBC_OBJECT_DEPTH_CATALOGS) {
-        CHECK_NA(INTERNAL, ArrowArrayAppendNull(catalog_db_schemas_col, 1), 
error);
-      } else {
-        RAISE_ADBC(PostgresConnectionGetSchemasImpl(conn, depth, db_name, 
db_schema,
-                                                    catalog_db_schemas_col, 
error));
-      }
-      CHECK_NA(INTERNAL, ArrowArrayFinishElement(array), error);
-    }
-  } else {
-    return ADBC_STATUS_INTERNAL;
-  }
-
-  CHECK_NA_DETAIL(INTERNAL, ArrowArrayFinishBuildingDefault(array, &na_error), 
&na_error,
-                  error);
-  return ADBC_STATUS_OK;
-}
-
 AdbcStatusCode PostgresConnection::GetObjects(
     struct AdbcConnection* connection, int depth, const char* catalog,
     const char* db_schema, const char* table_name, const char** table_types,
@@ -368,9 +409,10 @@ AdbcStatusCode PostgresConnection::GetObjects(
   struct ArrowSchema schema = {0};
   struct ArrowArray array = {0};
 
-  AdbcStatusCode status =
-      PostgresConnectionGetObjectsImpl(conn_, depth, catalog, db_schema, 
table_name,
-                                       table_types, column_name, &schema, 
&array, error);
+  PqGetObjectsHelper helper =
+      PqGetObjectsHelper(conn_, depth, catalog, db_schema, table_name, 
table_types,
+                         column_name, &schema, &array, error);
+  AdbcStatusCode status = helper.GetObjects();
 
   if (status != ADBC_STATUS_OK) {
     if (schema.release) schema.release(&schema);

Reply via email to