This is an automated email from the ASF dual-hosted git repository. zeroshade pushed a commit to branch fixup-metadata-getobjects-snowflake in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
commit 9acf8a8ab373cc9e249337d3677bfd3d7973b3c9 Author: Matt Topol <[email protected]> AuthorDate: Wed Oct 16 12:04:05 2024 -0400 updates to make tests work --- c/driver/snowflake/snowflake_test.cc | 5 +- c/validation/adbc_validation_connection.cc | 9 +- c/validation/adbc_validation_statement.cc | 4 +- go/adbc/driver/internal/driverbase/connection.go | 9 +- go/adbc/driver/snowflake/connection.go | 160 ++++++++++++--------- .../driver/snowflake/queries/get_objects_all.sql | 18 +-- .../snowflake/queries/get_objects_dbschemas.sql | 4 +- .../snowflake/queries/get_objects_tables.sql | 76 +--------- 8 files changed, 123 insertions(+), 162 deletions(-) diff --git a/c/driver/snowflake/snowflake_test.cc b/c/driver/snowflake/snowflake_test.cc index 60003353d..90735b3ff 100644 --- a/c/driver/snowflake/snowflake_test.cc +++ b/c/driver/snowflake/snowflake_test.cc @@ -131,7 +131,9 @@ class SnowflakeQuirks : public adbc_validation::DriverQuirks { return NANOARROW_TYPE_DOUBLE; case NANOARROW_TYPE_STRING: case NANOARROW_TYPE_LARGE_STRING: - return NANOARROW_TYPE_STRING; + case NANOARROW_TYPE_LIST: + case NANOARROW_TYPE_LARGE_LIST: + return NANOARROW_TYPE_STRING; default: return ingest_type; } @@ -150,6 +152,7 @@ class SnowflakeQuirks : public adbc_validation::DriverQuirks { bool supports_error_on_incompatible_schema() const override { return false; } bool ddl_implicit_commit_txn() const override { return true; } std::string db_schema() const override { return schema_; } + std::string catalog() const override { return "ADBC_TESTING"; } const char* uri_; bool skip_{false}; diff --git a/c/validation/adbc_validation_connection.cc b/c/validation/adbc_validation_connection.cc index a885fa2c8..6ef430213 100644 --- a/c/validation/adbc_validation_connection.cc +++ b/c/validation/adbc_validation_connection.cc @@ -701,9 +701,8 @@ void ConnectionTest::TestMetadataGetObjectsTablesTypes() { db_schemas_index < ArrowArrayViewListChildOffset(catalog_db_schemas_list, row + 1); db_schemas_index++) { - ASSERT_FALSE(ArrowArrayViewIsNull(db_schema_tables_list, db_schemas_index)) - << "Row " << row << " should have non-null db_schema_tables"; - + + // db_schema_tables should either be null or an empty list for (int64_t tables_index = ArrowArrayViewListChildOffset(db_schema_tables_list, db_schemas_index); tables_index < @@ -752,6 +751,8 @@ void ConnectionTest::TestMetadataGetObjectsColumns() { test_cases.push_back({std::nullopt, {"int64s", "strings"}, {1, 2}}); test_cases.push_back({"in%", {"int64s"}, {1}}); + const std::string catalog = quirks()->catalog(); + for (const auto& test_case : test_cases) { std::string scope = "Filter: "; scope += test_case.filter ? *test_case.filter : "(no filter)"; @@ -763,7 +764,7 @@ void ConnectionTest::TestMetadataGetObjectsColumns() { ASSERT_THAT( AdbcConnectionGetObjects( - &connection, ADBC_OBJECT_DEPTH_COLUMNS, nullptr, nullptr, nullptr, nullptr, + &connection, ADBC_OBJECT_DEPTH_COLUMNS, catalog.c_str(), nullptr, nullptr, nullptr, test_case.filter.has_value() ? test_case.filter->c_str() : nullptr, &reader.stream.value, &error), IsOkStatus(&error)); diff --git a/c/validation/adbc_validation_statement.cc b/c/validation/adbc_validation_statement.cc index 07ab0b22a..19166d852 100644 --- a/c/validation/adbc_validation_statement.cc +++ b/c/validation/adbc_validation_statement.cc @@ -2218,7 +2218,7 @@ void StatementTest::TestSqlBind() { ASSERT_THAT( AdbcStatementSetSqlQuery( - &statement, "SELECT * FROM bindtest ORDER BY \"col1\" ASC NULLS FIRST", &error), + &statement, "SELECT * FROM bindtest ORDER BY col1 ASC NULLS FIRST", &error), IsOkStatus(&error)); { StreamReader reader; @@ -2226,7 +2226,7 @@ void StatementTest::TestSqlBind() { &reader.rows_affected, &error), IsOkStatus(&error)); ASSERT_THAT(reader.rows_affected, - ::testing::AnyOf(::testing::Eq(0), ::testing::Eq(-1))); + ::testing::AnyOf(::testing::Eq(3), ::testing::Eq(-1))); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_NO_FATAL_FAILURE(reader.Next()); diff --git a/go/adbc/driver/internal/driverbase/connection.go b/go/adbc/driver/internal/driverbase/connection.go index 37433e0ce..b09f74e30 100644 --- a/go/adbc/driver/internal/driverbase/connection.go +++ b/go/adbc/driver/internal/driverbase/connection.go @@ -624,20 +624,20 @@ type ColumnInfo struct { type TableInfo struct { TableName string `json:"table_name"` TableType string `json:"table_type"` - TableColumns []ColumnInfo `json:"table_columns,omitempty"` - TableConstraints []ConstraintInfo `json:"table_constraints,omitempty"` + TableColumns []ColumnInfo `json:"table_columns"` + TableConstraints []ConstraintInfo `json:"table_constraints"` } // DBSchemaInfo is a structured representation of adbc.DBSchemaSchema type DBSchemaInfo struct { DbSchemaName *string `json:"db_schema_name,omitempty"` - DbSchemaTables []TableInfo `json:"db_schema_tables,omitempty"` + DbSchemaTables []TableInfo `json:"db_schema_tables"` } // GetObjectsInfo is a structured representation of adbc.GetObjectsSchema type GetObjectsInfo struct { CatalogName *string `json:"catalog_name,omitempty"` - CatalogDbSchemas []DBSchemaInfo `json:"catalog_db_schemas,omitempty"` + CatalogDbSchemas []DBSchemaInfo `json:"catalog_db_schemas"` } // Scan implements sql.Scanner. @@ -688,6 +688,7 @@ CATALOGLOOP: rec := bldr.NewRecord() defer rec.Release() + return array.NewRecordReader(adbc.GetObjectsSchema, []arrow.Record{rec}) } diff --git a/go/adbc/driver/snowflake/connection.go b/go/adbc/driver/snowflake/connection.go index 77d3c2cd5..90ae67f4a 100644 --- a/go/adbc/driver/snowflake/connection.go +++ b/go/adbc/driver/snowflake/connection.go @@ -26,6 +26,7 @@ import ( "io" "io/fs" "path" + "runtime" "strconv" "strings" "time" @@ -110,6 +111,41 @@ func getQueryID(ctx context.Context, query string, driverConn any) (string, erro return rows.(gosnowflake.SnowflakeRows).GetQueryID(), rows.Close() } +const ( + objSchemas = "SCHEMAS" + objDatabases = "DATABASES" +) + +func goGetQueryID(ctx context.Context, conn *sql.Conn, grp *errgroup.Group, objType string, catalog, dbSchema *string, outQueryID *string) { + grp.Go(func() error { + return conn.Raw(func(driverConn any) (err error) { + query := "SHOW TERSE /* ADBC:getObjects */ " + objType + switch objType { + case objDatabases: + if catalog != nil && len(*catalog) > 0 && *catalog != "%" && *catalog != ".*" { + query += " LIKE '" + escapeSingleQuoteForLike(*catalog) + "'" + } + query += " IN ACCOUNT" + case objSchemas: + if dbSchema != nil && len(*dbSchema) > 0 && *dbSchema != "%" && *dbSchema != ".*" { + query += " LIKE '" + escapeSingleQuoteForLike(*dbSchema) + "'" + } + + if catalog == nil || isWildcardStr(*catalog) { + query += " IN ACCOUNT" + } else { + query += " IN DATABASE " + quoteTblName(*catalog) + } + default: + return fmt.Errorf("unimplemented object type") + } + + *outQueryID, err = getQueryID(ctx, query, driverConn) + return + }) + }) +} + func isWildcardStr(ident string) bool { return strings.ContainsAny(ident, "_%") } @@ -126,73 +162,84 @@ func (c *connectionImpl) GetObjects(ctx context.Context, depth adbc.ObjectDepth, } defer conn.Close() + var hasViews, hasTables bool + for _, t := range tableType { + if strings.EqualFold("VIEW", t) { + hasViews = true + } else if strings.EqualFold("TABLE", t) { + hasTables = true + } + } + + if len(tableType) > 0 && depth >= adbc.ObjectDepthTables && !hasViews && !hasTables { + depth = adbc.ObjectDepthDBSchemas + } gQueryIDs, gQueryIDsCtx := errgroup.WithContext(ctx) queryFile := queryTemplateGetObjectsAll switch depth { case adbc.ObjectDepthCatalogs: queryFile = queryTemplateGetObjectsTerseCatalogs - gQueryIDs.Go(func() error { - return conn.Raw(func(driverConn any) (err error) { - query := "SHOW TERSE /* ADBC:getObjectsCatalogs */ DATABASES" - if catalog != nil && len(*catalog) > 0 && *catalog != "%" && *catalog != ".*" { - query += " LIKE '" + escapeSingleQuoteForLike(*catalog) + "'" - } - query += " IN ACCOUNT" - - terseDbQueryID, err = getQueryID(gQueryIDsCtx, query, driverConn) - return - }) - }) + goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objDatabases, + catalog, dbSchema, &terseDbQueryID) case adbc.ObjectDepthDBSchemas: queryFile = queryTemplateGetObjectsDbSchemas + goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objSchemas, + catalog, dbSchema, &showSchemaQueryID) + goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objDatabases, + catalog, dbSchema, &terseDbQueryID) + case adbc.ObjectDepthTables: + queryFile = queryTemplateGetObjectsTables + goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objSchemas, + catalog, dbSchema, &showSchemaQueryID) + goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objDatabases, + catalog, dbSchema, &terseDbQueryID) gQueryIDs.Go(func() error { return conn.Raw(func(driverConn any) (err error) { - query := "SHOW TERSE /* ADBC:getObjectsDBSchemas */ SCHEMAS" - if dbSchema != nil && len(*dbSchema) > 0 && *dbSchema != "%" && *dbSchema != ".*" { - query += " LIKE '" + escapeSingleQuoteForLike(*dbSchema) + "'" + objType := "objects" + if len(tableType) == 1 { + if strings.EqualFold("VIEW", tableType[0]) { + objType = "views" + } else if strings.EqualFold("TABLE", tableType[0]) { + objType = "tables" + } + } + + query := "SHOW TERSE /* ADBC:getObjectsTables */ " + objType + if tableName != nil && len(*tableName) > 0 && *tableName != "%" && *tableName != ".*" { + query += " LIKE '" + escapeSingleQuoteForLike(*tableName) + "'" } if catalog == nil || isWildcardStr(*catalog) { query += " IN ACCOUNT" } else { - query += " IN DATABASE \"" + quoteTblName(*catalog) + "\"" - } - - showSchemaQueryID, err = getQueryID(gQueryIDsCtx, query, driverConn) - return - }) - }) - - gQueryIDs.Go(func() error { - return conn.Raw(func(driverConn any) (err error) { - query := "SHOW TERSE /* ADBC:getObjectsDBSchemas */ DATABASES" - if catalog != nil && len(*catalog) > 0 && *catalog != "%" && *catalog != ".*" { - query += " LIKE '" + escapeSingleQuoteForLike(*catalog) + "'" + escapedCatalog := quoteTblName(*catalog) + if dbSchema == nil || isWildcardStr(*dbSchema) { + query += " IN DATABASE " + escapedCatalog + } else { + query += " IN SCHEMA " + escapedCatalog + "." + quoteTblName(*dbSchema) + } } - query += " IN ACCOUNT" - terseDbQueryID, err = getQueryID(gQueryIDsCtx, query, driverConn) + tableQueryID, err = getQueryID(gQueryIDsCtx, query, driverConn) return }) }) - case adbc.ObjectDepthTables: - queryFile = queryTemplateGetObjectsTables - fallthrough + // fallthrough default: var suffix string - if catalog == nil { + if catalog == nil || isWildcardStr(*catalog) { suffix = " IN ACCOUNT" } else { escapedCatalog := quoteTblName(*catalog) if dbSchema == nil || isWildcardStr(*dbSchema) { - suffix = " IN DATABASE \"" + escapedCatalog + "\"" + suffix = " IN DATABASE " + escapedCatalog } else { escapedSchema := quoteTblName(*dbSchema) if tableName == nil || isWildcardStr(*tableName) { - suffix = " IN SCHEMA \"" + escapedCatalog + "\".\"" + escapedSchema + "\"" + suffix = " IN SCHEMA " + escapedCatalog + "." + escapedSchema } else { escapedTable := quoteTblName(*tableName) - suffix = " IN TABLE \"" + escapedCatalog + "\".\"" + escapedSchema + "\".\"" + escapedTable + "\"" + suffix = " IN TABLE " + escapedCatalog + "." + escapedSchema + "." + escapedTable } } } @@ -220,35 +267,10 @@ func (c *connectionImpl) GetObjects(ctx context.Context, depth adbc.ObjectDepth, }) }) - gQueryIDs.Go(func() error { - return conn.Raw(func(driverConn any) (err error) { - query := "SHOW TERSE /* ADBC:getObjectsDBSchemas */ SCHEMAS" - if dbSchema != nil && len(*dbSchema) > 0 && *dbSchema != "%" && *dbSchema != ".*" { - query += " LIKE '" + escapeSingleQuoteForLike(*dbSchema) + "'" - } - if catalog == nil || isWildcardStr(*catalog) { - query += " IN ACCOUNT" - } else { - query += " IN DATABASE \"" + quoteTblName(*catalog) + "\"" - } - - showSchemaQueryID, err = getQueryID(gQueryIDsCtx, query, driverConn) - return - }) - }) - - gQueryIDs.Go(func() error { - return conn.Raw(func(driverConn any) (err error) { - query := "SHOW TERSE /* ADBC:getObjectsDBSchemas */ DATABASES" - if catalog != nil && len(*catalog) > 0 && *catalog != "%" && *catalog != ".*" { - query += " LIKE '" + escapeSingleQuoteForLike(*catalog) + "'" - } - query += " IN ACCOUNT" - - terseDbQueryID, err = getQueryID(gQueryIDsCtx, query, driverConn) - return - }) - }) + goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objDatabases, + catalog, dbSchema, &terseDbQueryID) + goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objSchemas, + catalog, dbSchema, &showSchemaQueryID) gQueryIDs.Go(func() error { return conn.Raw(func(driverConn any) (err error) { @@ -270,9 +292,9 @@ func (c *connectionImpl) GetObjects(ctx context.Context, depth adbc.ObjectDepth, } else { escapedCatalog := quoteTblName(*catalog) if dbSchema == nil || isWildcardStr(*dbSchema) { - query += " IN DATABASE \"" + escapedCatalog + "\"" + query += " IN DATABASE " + escapedCatalog } else { - query += " IN SCHEMA \"" + escapedCatalog + "\".\"" + quoteTblName(*dbSchema) + "\"" + query += " IN SCHEMA " + escapedCatalog + "." + quoteTblName(*dbSchema) } } @@ -340,7 +362,7 @@ func (c *connectionImpl) GetObjects(ctx context.Context, depth adbc.ObjectDepth, } defer rows.Close() - catalogCh := make(chan driverbase.GetObjectsInfo, 5) + catalogCh := make(chan driverbase.GetObjectsInfo, runtime.NumCPU()) errCh := make(chan error) go func() { diff --git a/go/adbc/driver/snowflake/queries/get_objects_all.sql b/go/adbc/driver/snowflake/queries/get_objects_all.sql index 45b807f15..7fc10f2e2 100644 --- a/go/adbc/driver/snowflake/queries/get_objects_all.sql +++ b/go/adbc/driver/snowflake/queries/get_objects_all.sql @@ -86,12 +86,12 @@ constraints AS ( table_catalog, table_schema, table_name, - ARRAY_AGG({ + ARRAY_AGG(NULLIF({ 'constraint_name': constraint_name, 'constraint_type': constraint_type, 'constraint_column_names': constraint_column_names, 'constraint_column_usage': constraint_column_usage - }) table_constraints, + }, {})) table_constraints, FROM ( SELECT * FROM pk_constraints UNION ALL @@ -105,12 +105,12 @@ tables AS ( SELECT table_catalog catalog_name, table_schema schema_name, - ARRAY_AGG({ + ARRAY_AGG(NULLIF({ 'table_name': table_name, 'table_type': table_type, - 'table_columns': table_columns, - 'table_constraints': table_constraints - }) db_schema_tables + 'table_columns': COALESCE(table_columns, []), + 'table_constraints': COALESCE(table_constraints, []) + }, {})) db_schema_tables FROM information_schema.tables LEFT JOIN columns USING (table_catalog, table_schema, table_name) @@ -123,7 +123,7 @@ db_schemas AS ( SELECT catalog_name, schema_name, - db_schema_tables, + COALESCE(db_schema_tables, []) db_schema_tables, FROM information_schema.schemata LEFT JOIN tables USING (catalog_name, schema_name) @@ -132,10 +132,10 @@ db_schemas AS ( SELECT { 'catalog_name': database_name, - 'catalog_db_schemas': ARRAY_AGG({ + 'catalog_db_schemas': ARRAY_AGG(NULLIF({ 'db_schema_name': schema_name, 'db_schema_tables': db_schema_tables - }) + }, {})) } get_objects FROM information_schema.databases diff --git a/go/adbc/driver/snowflake/queries/get_objects_dbschemas.sql b/go/adbc/driver/snowflake/queries/get_objects_dbschemas.sql index bc454866a..872118f7c 100644 --- a/go/adbc/driver/snowflake/queries/get_objects_dbschemas.sql +++ b/go/adbc/driver/snowflake/queries/get_objects_dbschemas.sql @@ -29,10 +29,10 @@ WITH db_schemas AS ( SELECT { 'catalog_name': "database_name", - 'catalog_db_schemas': ARRAY_AGG({ + 'catalog_db_schemas': ARRAY_AGG(NULLIF({ 'db_schema_name': "schema_name", 'db_schema_tables': null - }) + }, {})) } get_objects FROM db_info diff --git a/go/adbc/driver/snowflake/queries/get_objects_tables.sql b/go/adbc/driver/snowflake/queries/get_objects_tables.sql index ec284947c..9d6ce36ed 100644 --- a/go/adbc/driver/snowflake/queries/get_objects_tables.sql +++ b/go/adbc/driver/snowflake/queries/get_objects_tables.sql @@ -15,83 +15,17 @@ -- specific language governing permissions and limitations -- under the License. -WITH pk_constraints AS ( - SELECT - "database_name" "table_catalog", - "schema_name" "table_schema", - "table_name" "table_name", - "constraint_name" "constraint_name", - 'PRIMARY KEY' "constraint_type", - ARRAY_AGG("column_name") WITHIN GROUP (ORDER BY "key_sequence") constraint_column_names, - [] constraint_column_usage, - FROM TABLE(RESULT_SCAN(LAST_QUERY_ID(-4))) - WHERE "table_catalog" ILIKE :CATALOG AND "table_schema" ILIKE :DB_SCHEMA AND "table_name" ILIKE :TABLE - GROUP BY "table_catalog", "table_schema", "table_name", "constraint_name" -), -unique_constraints AS ( - SELECT - "database_name" "table_catalog", - "schema_name" "table_schema", - "table_name" "table_name", - "constraint_name" "constraint_name", - 'UNIQUE' "constraint_type", - ARRAY_AGG("column_name") WITHIN GROUP (ORDER BY "key_sequence") constraint_column_names, - [] constraint_column_usage, - FROM TABLE(RESULT_SCAN(:UNIQUE_QUERY_ID)) - WHERE "table_catalog" ILIKE :CATALOG AND "table_schema" ILIKE :DB_SCHEMA AND "table_name" ILIKE :TABLE - GROUP BY "table_catalog", "table_schema", "table_name", "constraint_name" -), -fk_constraints AS ( - SELECT - "fk_database_name" "table_catalog", - "fk_schema_name" "table_schema", - "fk_table_name" "table_name", - "fk_name" "constraint_name", - 'FOREIGN KEY' "constraint_type", - ARRAY_AGG("fk_column_name") WITHIN GROUP (ORDER BY "key_sequence") constraint_column_names, - ARRAY_AGG({ - 'fk_catalog': "pk_database_name", - 'fk_db_schema': "pk_schema_name", - 'fk_table': "pk_table_name", - 'fk_column_name': "pk_column_name" - }) WITHIN GROUP (ORDER BY "key_sequence") constraint_column_usage, - FROM TABLE(RESULT_SCAN(:FK_QUERY_ID)) - WHERE "table_catalog" ILIKE :CATALOG AND "table_schema" ILIKE :DB_SCHEMA AND "table_name" ILIKE :TABLE - GROUP BY "table_catalog", "table_schema", "table_name", "constraint_name" -), -constraints AS ( - SELECT - "table_catalog", - "table_schema", - "table_name", - ARRAY_AGG({ - 'constraint_name': "constraint_name", - 'constraint_type': "constraint_type", - 'constraint_column_names': constraint_column_names, - 'constraint_column_usage': constraint_column_usage - }) table_constraints, - FROM ( - SELECT * FROM pk_constraints - UNION ALL - SELECT * FROM unique_constraints - UNION ALL - SELECT * FROM fk_constraints - ) - GROUP BY "table_catalog", "table_schema", "table_name" -), -tables AS ( +WITH tables AS ( SELECT "database_name" "catalog_name", "schema_name" "schema_name", ARRAY_AGG({ 'table_name': "name", 'table_type': "kind", - 'table_constraints': table_constraints, + 'table_constraints': null, 'table_columns': null }) db_schema_tables FROM TABLE(RESULT_SCAN(:SHOW_TABLE_QUERY_ID)) -LEFT JOIN constraints -ON "database_name" = "table_catalog" AND "schema_name" = "table_schema" AND "name" = "table_name" WHERE "database_name" ILIKE :CATALOG AND "schema_name" ILIKE :DB_SCHEMA AND "name" ILIKE :TABLE GROUP BY "database_name", "schema_name" ), @@ -99,7 +33,7 @@ db_schemas AS ( SELECT "database_name" "catalog_name", "name" "schema_name", - db_schema_tables, + COALESCE(db_schema_tables, []) db_schema_tables, FROM TABLE(RESULT_SCAN(:SHOW_SCHEMA_QUERY_ID)) LEFT JOIN tables ON "database_name" = "catalog_name" AND "name" = tables."schema_name" @@ -108,10 +42,10 @@ db_schemas AS ( SELECT { 'catalog_name': "name", - 'catalog_db_schemas': ARRAY_AGG({ + 'catalog_db_schemas': ARRAY_AGG(NULLIF({ 'db_schema_name': db_schemas."schema_name", 'db_schema_tables': db_schema_tables - }) + }, {})) } get_objects FROM TABLE(RESULT_SCAN(:SHOW_DB_QUERY_ID))
