This is an automated email from the ASF dual-hosted git repository. zeroshade pushed a commit to branch fixup-metadata-getobjects-snowflake in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
commit 9d4695b1fcb66379bbcdd39f8299da3002d35112 Author: Matt Topol <[email protected]> AuthorDate: Mon Oct 14 16:25:54 2024 -0400 feat(go/adbc/driver/snowflake): improve GetObjects performance and semantics --- go/adbc/driver/internal/driverbase/connection.go | 42 ++- go/adbc/driver/snowflake/connection.go | 306 ++++++++++++++------- .../snowflake/queries/get_objects_catalogs.sql | 25 -- .../snowflake/queries/get_objects_dbschemas.sql | 32 ++- .../snowflake/queries/get_objects_tables.sql | 94 +++---- 5 files changed, 305 insertions(+), 194 deletions(-) diff --git a/go/adbc/driver/internal/driverbase/connection.go b/go/adbc/driver/internal/driverbase/connection.go index 6e7881635..37433e0ce 100644 --- a/go/adbc/driver/internal/driverbase/connection.go +++ b/go/adbc/driver/internal/driverbase/connection.go @@ -349,14 +349,17 @@ func (cnxn *connection) GetObjects(ctx context.Context, depth adbc.ObjectDepth, bufferSize := len(catalogs) addCatalogCh := make(chan GetObjectsInfo, bufferSize) - for _, cat := range catalogs { - addCatalogCh <- GetObjectsInfo{CatalogName: Nullable(cat)} - } - - close(addCatalogCh) + errCh := make(chan error, 1) + go func() { + defer close(addCatalogCh) + for _, cat := range catalogs { + addCatalogCh <- GetObjectsInfo{CatalogName: Nullable(cat)} + } + }() if depth == adbc.ObjectDepthCatalogs { - return BuildGetObjectsRecordReader(cnxn.Base().Alloc, addCatalogCh) + close(errCh) + return BuildGetObjectsRecordReader(cnxn.Base().Alloc, addCatalogCh, errCh) } g, ctxG := errgroup.WithContext(ctx) @@ -386,7 +389,7 @@ func (cnxn *connection) GetObjects(ctx context.Context, depth adbc.ObjectDepth, g.Go(func() error { defer close(addDbSchemasCh); return gSchemas.Wait() }) if depth == adbc.ObjectDepthDBSchemas { - rdr, err := BuildGetObjectsRecordReader(cnxn.Base().Alloc, addDbSchemasCh) + rdr, err := BuildGetObjectsRecordReader(cnxn.Base().Alloc, addDbSchemasCh, errCh) return rdr, errors.Join(err, g.Wait()) } @@ -432,7 +435,7 @@ func (cnxn *connection) GetObjects(ctx context.Context, depth adbc.ObjectDepth, g.Go(func() error { defer close(addTablesCh); return gTables.Wait() }) - rdr, err := BuildGetObjectsRecordReader(cnxn.Base().Alloc, addTablesCh) + rdr, err := BuildGetObjectsRecordReader(cnxn.Base().Alloc, addTablesCh, errCh) return rdr, errors.Join(err, g.Wait()) } @@ -659,17 +662,26 @@ func (g *GetObjectsInfo) Scan(src any) error { // BuildGetObjectsRecordReader constructs a RecordReader for the GetObjects ADBC method. // It accepts a channel of GetObjectsInfo to allow concurrent retrieval of metadata and // serialization to Arrow record. -func BuildGetObjectsRecordReader(mem memory.Allocator, in chan GetObjectsInfo) (array.RecordReader, error) { +func BuildGetObjectsRecordReader(mem memory.Allocator, in <-chan GetObjectsInfo, errCh <-chan error) (array.RecordReader, error) { bldr := array.NewRecordBuilder(mem, adbc.GetObjectsSchema) defer bldr.Release() - for catalog := range in { - b, err := json.Marshal(catalog) - if err != nil { - return nil, err - } +CATALOGLOOP: + for { + select { + case catalog, ok := <-in: + if !ok { + break CATALOGLOOP + } + b, err := json.Marshal(catalog) + if err != nil { + return nil, err + } - if err := json.Unmarshal(b, bldr); err != nil { + if err := json.Unmarshal(b, bldr); err != nil { + return nil, err + } + case err := <-errCh: return nil, err } } diff --git a/go/adbc/driver/snowflake/connection.go b/go/adbc/driver/snowflake/connection.go index a8361a365..77d3c2cd5 100644 --- a/go/adbc/driver/snowflake/connection.go +++ b/go/adbc/driver/snowflake/connection.go @@ -24,6 +24,7 @@ import ( "embed" "fmt" "io" + "io/fs" "path" "strconv" "strings" @@ -42,7 +43,6 @@ const ( defaultPrefetchConcurrency = 10 queryTemplateGetObjectsAll = "get_objects_all.sql" - queryTemplateGetObjectsCatalogs = "get_objects_catalogs.sql" queryTemplateGetObjectsDbSchemas = "get_objects_dbschemas.sql" queryTemplateGetObjectsTables = "get_objects_tables.sql" queryTemplateGetObjectsTerseCatalogs = "get_objects_terse_catalogs.sql" @@ -73,9 +73,51 @@ type connectionImpl struct { useHighPrecision bool } +func escapeSingleQuoteForLike(arg string) string { + if len(arg) == 0 { + return arg + } + + idx := strings.IndexByte(arg, '\'') + if idx == -1 { + return arg + } + + var b strings.Builder + b.Grow(len(arg)) + + for { + before, after, found := strings.Cut(arg, `'`) + b.WriteString(before) + if !found { + return b.String() + } + + if before[len(before)-1] != '\\' { + b.WriteByte('\\') + } + b.WriteByte('\'') + arg = after + } +} + +func getQueryID(ctx context.Context, query string, driverConn any) (string, error) { + rows, err := driverConn.(driver.QueryerContext).QueryContext(ctx, query, nil) + if err != nil { + return "", err + } + + return rows.(gosnowflake.SnowflakeRows).GetQueryID(), rows.Close() +} + +func isWildcardStr(ident string) bool { + return strings.ContainsAny(ident, "_%") +} + func (c *connectionImpl) GetObjects(ctx context.Context, depth adbc.ObjectDepth, catalog *string, dbSchema *string, tableName *string, columnName *string, tableType []string) (array.RecordReader, error) { var ( pkQueryID, fkQueryID, uniqueQueryID, terseDbQueryID string + showSchemaQueryID, tableQueryID string ) conn, err := c.sqldb.Conn(ctx) @@ -85,82 +127,165 @@ func (c *connectionImpl) GetObjects(ctx context.Context, depth adbc.ObjectDepth, defer conn.Close() gQueryIDs, gQueryIDsCtx := errgroup.WithContext(ctx) + queryFile := queryTemplateGetObjectsAll switch depth { case adbc.ObjectDepthCatalogs: - if catalog == nil { - queryFile = queryTemplateGetObjectsTerseCatalogs - // if the catalog is null, show the terse databases - // which doesn't require a database context - gQueryIDs.Go(func() error { - return conn.Raw(func(driverConn any) error { - rows, err := driverConn.(driver.QueryerContext).QueryContext(gQueryIDsCtx, "SHOW TERSE DATABASES", nil) - if err != nil { - return err - } + queryFile = queryTemplateGetObjectsTerseCatalogs + gQueryIDs.Go(func() error { + return conn.Raw(func(driverConn any) (err error) { + query := "SHOW TERSE /* ADBC:getObjectsCatalogs */ DATABASES" + if catalog != nil && len(*catalog) > 0 && *catalog != "%" && *catalog != ".*" { + query += " LIKE '" + escapeSingleQuoteForLike(*catalog) + "'" + } + query += " IN ACCOUNT" - terseDbQueryID = rows.(gosnowflake.SnowflakeRows).GetQueryID() - return rows.Close() - }) + terseDbQueryID, err = getQueryID(gQueryIDsCtx, query, driverConn) + return }) - } else { - queryFile = queryTemplateGetObjectsCatalogs - } + }) case adbc.ObjectDepthDBSchemas: queryFile = queryTemplateGetObjectsDbSchemas + gQueryIDs.Go(func() error { + return conn.Raw(func(driverConn any) (err error) { + query := "SHOW TERSE /* ADBC:getObjectsDBSchemas */ SCHEMAS" + if dbSchema != nil && len(*dbSchema) > 0 && *dbSchema != "%" && *dbSchema != ".*" { + query += " LIKE '" + escapeSingleQuoteForLike(*dbSchema) + "'" + } + if catalog == nil || isWildcardStr(*catalog) { + query += " IN ACCOUNT" + } else { + query += " IN DATABASE \"" + quoteTblName(*catalog) + "\"" + } + + showSchemaQueryID, err = getQueryID(gQueryIDsCtx, query, driverConn) + return + }) + }) + + gQueryIDs.Go(func() error { + return conn.Raw(func(driverConn any) (err error) { + query := "SHOW TERSE /* ADBC:getObjectsDBSchemas */ DATABASES" + if catalog != nil && len(*catalog) > 0 && *catalog != "%" && *catalog != ".*" { + query += " LIKE '" + escapeSingleQuoteForLike(*catalog) + "'" + } + query += " IN ACCOUNT" + + terseDbQueryID, err = getQueryID(gQueryIDsCtx, query, driverConn) + return + }) + }) case adbc.ObjectDepthTables: queryFile = queryTemplateGetObjectsTables fallthrough default: + var suffix string + if catalog == nil { + suffix = " IN ACCOUNT" + } else { + escapedCatalog := quoteTblName(*catalog) + if dbSchema == nil || isWildcardStr(*dbSchema) { + suffix = " IN DATABASE \"" + escapedCatalog + "\"" + } else { + escapedSchema := quoteTblName(*dbSchema) + if tableName == nil || isWildcardStr(*tableName) { + suffix = " IN SCHEMA \"" + escapedCatalog + "\".\"" + escapedSchema + "\"" + } else { + escapedTable := quoteTblName(*tableName) + suffix = " IN TABLE \"" + escapedCatalog + "\".\"" + escapedSchema + "\".\"" + escapedTable + "\"" + } + } + } + // Detailed constraint info not available in information_schema // Need to dispatch SHOW queries and use conn.Raw to extract the queryID for reuse in GetObjects query gQueryIDs.Go(func() error { - return conn.Raw(func(driverConn any) error { - rows, err := driverConn.(driver.QueryerContext).QueryContext(gQueryIDsCtx, "SHOW PRIMARY KEYS", nil) - if err != nil { - return err + return conn.Raw(func(driverConn any) (err error) { + pkQueryID, err = getQueryID(gQueryIDsCtx, "SHOW PRIMARY KEYS /* ADBC:getObjectsTables */"+suffix, driverConn) + return err + }) + }) + + gQueryIDs.Go(func() error { + return conn.Raw(func(driverConn any) (err error) { + fkQueryID, err = getQueryID(gQueryIDsCtx, "SHOW IMPORTED KEYS /* ADBC:getObjectsTables */"+suffix, driverConn) + return err + }) + }) + + gQueryIDs.Go(func() error { + return conn.Raw(func(driverConn any) (err error) { + uniqueQueryID, err = getQueryID(gQueryIDsCtx, "SHOW UNIQUE KEYS /* ADBC:getObjectsTables */"+suffix, driverConn) + return err + }) + }) + + gQueryIDs.Go(func() error { + return conn.Raw(func(driverConn any) (err error) { + query := "SHOW TERSE /* ADBC:getObjectsDBSchemas */ SCHEMAS" + if dbSchema != nil && len(*dbSchema) > 0 && *dbSchema != "%" && *dbSchema != ".*" { + query += " LIKE '" + escapeSingleQuoteForLike(*dbSchema) + "'" + } + if catalog == nil || isWildcardStr(*catalog) { + query += " IN ACCOUNT" + } else { + query += " IN DATABASE \"" + quoteTblName(*catalog) + "\"" } - pkQueryID = rows.(gosnowflake.SnowflakeRows).GetQueryID() - return rows.Close() + showSchemaQueryID, err = getQueryID(gQueryIDsCtx, query, driverConn) + return }) }) gQueryIDs.Go(func() error { - return conn.Raw(func(driverConn any) error { - rows, err := driverConn.(driver.QueryerContext).QueryContext(gQueryIDsCtx, "SHOW IMPORTED KEYS", nil) - if err != nil { - return err + return conn.Raw(func(driverConn any) (err error) { + query := "SHOW TERSE /* ADBC:getObjectsDBSchemas */ DATABASES" + if catalog != nil && len(*catalog) > 0 && *catalog != "%" && *catalog != ".*" { + query += " LIKE '" + escapeSingleQuoteForLike(*catalog) + "'" } + query += " IN ACCOUNT" - fkQueryID = rows.(gosnowflake.SnowflakeRows).GetQueryID() - return rows.Close() + terseDbQueryID, err = getQueryID(gQueryIDsCtx, query, driverConn) + return }) }) gQueryIDs.Go(func() error { - return conn.Raw(func(driverConn any) error { - rows, err := driverConn.(driver.QueryerContext).QueryContext(gQueryIDsCtx, "SHOW UNIQUE KEYS", nil) - if err != nil { - return err + return conn.Raw(func(driverConn any) (err error) { + objType := "objects" + if len(tableType) == 1 { + if strings.EqualFold("VIEW", tableType[0]) { + objType = "views" + } else if strings.EqualFold("TABLE", tableType[0]) { + objType = "tables" + } } - uniqueQueryID = rows.(gosnowflake.SnowflakeRows).GetQueryID() - return rows.Close() + query := "SHOW TERSE /* ADBC:getObjectsTables */ " + objType + if tableName != nil && len(*tableName) > 0 && *tableName != "%" && *tableName != ".*" { + query += " LIKE '" + escapeSingleQuoteForLike(*tableName) + "'" + } + if catalog == nil || isWildcardStr(*catalog) { + query += " IN ACCOUNT" + } else { + escapedCatalog := quoteTblName(*catalog) + if dbSchema == nil || isWildcardStr(*dbSchema) { + query += " IN DATABASE \"" + escapedCatalog + "\"" + } else { + query += " IN SCHEMA \"" + escapedCatalog + "\".\"" + quoteTblName(*dbSchema) + "\"" + } + } + + tableQueryID, err = getQueryID(gQueryIDsCtx, query, driverConn) + return }) }) } - f, err := queryTemplates.Open(path.Join("queries", queryFile)) + queryBytes, err := fs.ReadFile(queryTemplates, path.Join("queries", queryFile)) if err != nil { return nil, err } - defer f.Close() - - var bldr strings.Builder - if _, err := io.Copy(&bldr, f); err != nil { - return nil, err - } // Need constraint subqueries to complete before we can query GetObjects if err := gQueryIDs.Wait(); err != nil { @@ -180,76 +305,71 @@ func (c *connectionImpl) GetObjects(ctx context.Context, depth adbc.ObjectDepth, sql.Named("FK_QUERY_ID", fkQueryID), sql.Named("UNIQUE_QUERY_ID", uniqueQueryID), sql.Named("SHOW_DB_QUERY_ID", terseDbQueryID), - } - - // the connection that is used is not the same connection context where the database may have been set - // if the caller called SetCurrentCatalog() so need to ensure the database context is appropriate - if !isNilOrEmpty(catalog) { - _, e := conn.ExecContext(context.Background(), fmt.Sprintf("USE DATABASE %s;", quoteTblName(*catalog)), nil) - if e != nil { - return nil, errToAdbcErr(adbc.StatusIO, e) + sql.Named("SHOW_SCHEMA_QUERY_ID", showSchemaQueryID), + sql.Named("SHOW_TABLE_QUERY_ID", tableQueryID), + } + + // currently only the Columns / all case still requires a current database/schema + // to be propagated. The rest of the cases all solely use SHOW queries for the metadata + // just as done by the snowflake JDBC driver. In those cases we don't need to propagate + // the current session database/schema. + if depth == adbc.ObjectDepthColumns || depth == adbc.ObjectDepthAll { + // the connection that is used is not the same connection context where the database may have been set + // if the caller called SetCurrentCatalog() so need to ensure the database context is appropriate + if !isNilOrEmpty(catalog) { + _, e := conn.ExecContext(context.Background(), fmt.Sprintf("USE DATABASE %s;", quoteTblName(*catalog)), nil) + if e != nil { + return nil, errToAdbcErr(adbc.StatusIO, e) + } } - } - // the connection that is used is not the same connection context where the schema may have been set - // if the caller called SetCurrentDbSchema() so need to ensure the schema context is appropriate - if !isNilOrEmpty(dbSchema) { - _, e2 := conn.ExecContext(context.Background(), fmt.Sprintf("USE SCHEMA %s;", quoteTblName(*dbSchema)), nil) - if e2 != nil { - return nil, errToAdbcErr(adbc.StatusIO, e2) + // the connection that is used is not the same connection context where the schema may have been set + // if the caller called SetCurrentDbSchema() so need to ensure the schema context is appropriate + if !isNilOrEmpty(dbSchema) { + _, e2 := conn.ExecContext(context.Background(), fmt.Sprintf("USE SCHEMA %s;", quoteTblName(*dbSchema)), nil) + if e2 != nil { + return nil, errToAdbcErr(adbc.StatusIO, e2) + } } } - query := bldr.String() + query := string(queryBytes) rows, err := conn.QueryContext(ctx, query, args...) if err != nil { return nil, errToAdbcErr(adbc.StatusIO, err) } defer rows.Close() - catalogCh := make(chan driverbase.GetObjectsInfo, 1) - readerCh := make(chan array.RecordReader) + catalogCh := make(chan driverbase.GetObjectsInfo, 5) errCh := make(chan error) go func() { - rdr, err := driverbase.BuildGetObjectsRecordReader(c.Alloc, catalogCh) - if err != nil { - errCh <- err - } - - readerCh <- rdr - close(readerCh) - }() - - for rows.Next() { - var getObjectsCatalog driverbase.GetObjectsInfo - if err := rows.Scan(&getObjectsCatalog); err != nil { - return nil, errToAdbcErr(adbc.StatusInvalidData, err) - } + defer close(catalogCh) + for rows.Next() { + var getObjectsCatalog driverbase.GetObjectsInfo + if err := rows.Scan(&getObjectsCatalog); err != nil { + errCh <- errToAdbcErr(adbc.StatusInvalidData, err) + return + } - // A few columns need additional processing outside of Snowflake - for i, sch := range getObjectsCatalog.CatalogDbSchemas { - for j, tab := range sch.DbSchemaTables { - for k, col := range tab.TableColumns { - field := c.toArrowField(col) - xdbcDataType := driverbase.ToXdbcDataType(field.Type) + // A few columns need additional processing outside of Snowflake + for i, sch := range getObjectsCatalog.CatalogDbSchemas { + for j, tab := range sch.DbSchemaTables { + for k, col := range tab.TableColumns { + field := c.toArrowField(col) + xdbcDataType := driverbase.ToXdbcDataType(field.Type) - getObjectsCatalog.CatalogDbSchemas[i].DbSchemaTables[j].TableColumns[k].XdbcDataType = driverbase.Nullable(int16(field.Type.ID())) - getObjectsCatalog.CatalogDbSchemas[i].DbSchemaTables[j].TableColumns[k].XdbcSqlDataType = driverbase.Nullable(int16(xdbcDataType)) + getObjectsCatalog.CatalogDbSchemas[i].DbSchemaTables[j].TableColumns[k].XdbcDataType = driverbase.Nullable(int16(field.Type.ID())) + getObjectsCatalog.CatalogDbSchemas[i].DbSchemaTables[j].TableColumns[k].XdbcSqlDataType = driverbase.Nullable(int16(xdbcDataType)) + } } } - } - catalogCh <- getObjectsCatalog - } - close(catalogCh) + catalogCh <- getObjectsCatalog + } + }() - select { - case rdr := <-readerCh: - return rdr, nil - case err := <-errCh: - return nil, err - } + return driverbase.BuildGetObjectsRecordReader(c.Alloc, catalogCh, errCh) } func isNilOrEmpty(str *string) bool { @@ -266,7 +386,7 @@ func (c *connectionImpl) PrepareDriverInfo(ctx context.Context, infoCodes []adbc // ListTableTypes implements driverbase.TableTypeLister. func (*connectionImpl) ListTableTypes(ctx context.Context) ([]string, error) { - return []string{"BASE TABLE", "TEMPORARY TABLE", "VIEW"}, nil + return []string{"TABLE", "VIEW"}, nil } // GetCurrentCatalog implements driverbase.CurrentNamespacer. diff --git a/go/adbc/driver/snowflake/queries/get_objects_catalogs.sql b/go/adbc/driver/snowflake/queries/get_objects_catalogs.sql deleted file mode 100644 index ec2cef515..000000000 --- a/go/adbc/driver/snowflake/queries/get_objects_catalogs.sql +++ /dev/null @@ -1,25 +0,0 @@ --- Licensed to the Apache Software Foundation (ASF) under one --- or more contributor license agreements. See the NOTICE file --- distributed with this work for additional information --- regarding copyright ownership. The ASF licenses this file --- to you under the Apache License, Version 2.0 (the --- "License"); you may not use this file except in compliance --- with the License. You may obtain a copy of the License at --- --- http://www.apache.org/licenses/LICENSE-2.0 --- --- Unless required by applicable law or agreed to in writing, --- software distributed under the License is distributed on an --- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY --- KIND, either express or implied. See the License for the --- specific language governing permissions and limitations --- under the License. - -SELECT - { - 'catalog_name': database_name, - 'catalog_db_schemas': null - } get_objects -FROM - information_schema.databases -WHERE database_name ILIKE :CATALOG; diff --git a/go/adbc/driver/snowflake/queries/get_objects_dbschemas.sql b/go/adbc/driver/snowflake/queries/get_objects_dbschemas.sql index 360a6d083..627d11c32 100644 --- a/go/adbc/driver/snowflake/queries/get_objects_dbschemas.sql +++ b/go/adbc/driver/snowflake/queries/get_objects_dbschemas.sql @@ -16,23 +16,27 @@ -- under the License. WITH db_schemas AS ( - SELECT - catalog_name, - schema_name, - FROM information_schema.schemata - WHERE catalog_name ILIKE :CATALOG AND schema_name ILIKE :DB_SCHEMA + SELECT + "database_name" as "catalog_name", + "name" as "schema_name" + FROM table(RESULT_SCAN(:SHOW_SCHEMA_QUERY_ID)) + WHERE "database_name" ILIKE :CATALOG +), db_info AS ( + SELECT "name" AS "database_name" + FROM table(RESULT_SCAN(:SHOW_DB_QUERY_ID)) + WHERE "name" ILIKE :CATALOG ) -SELECT +SELECT { - 'catalog_name': database_name, + 'catalog_name': "database_name", 'catalog_db_schemas': ARRAY_AGG({ - 'db_schema_name': schema_name, + 'db_schema_name': "schema_name", 'db_schema_tables': null }) } get_objects -FROM - information_schema.databases -LEFT JOIN db_schemas -ON database_name = catalog_name -WHERE database_name ILIKE :CATALOG -GROUP BY database_name; +FROM + db_info +LEFT JOIN db_schemas +ON "database_name" = "catalog_name" +WHERE "database_name" ILIKE :CATALOG +GROUP BY "database_name"; diff --git a/go/adbc/driver/snowflake/queries/get_objects_tables.sql b/go/adbc/driver/snowflake/queries/get_objects_tables.sql index b3b16ff51..ec284947c 100644 --- a/go/adbc/driver/snowflake/queries/get_objects_tables.sql +++ b/go/adbc/driver/snowflake/queries/get_objects_tables.sql @@ -17,37 +17,37 @@ WITH pk_constraints AS ( SELECT - "database_name" table_catalog, - "schema_name" table_schema, - "table_name" table_name, - "constraint_name" constraint_name, - 'PRIMARY KEY' constraint_type, + "database_name" "table_catalog", + "schema_name" "table_schema", + "table_name" "table_name", + "constraint_name" "constraint_name", + 'PRIMARY KEY' "constraint_type", ARRAY_AGG("column_name") WITHIN GROUP (ORDER BY "key_sequence") constraint_column_names, [] constraint_column_usage, - FROM TABLE(RESULT_SCAN(:PK_QUERY_ID)) - WHERE table_catalog ILIKE :CATALOG AND table_schema ILIKE :DB_SCHEMA AND table_name ILIKE :TABLE - GROUP BY table_catalog, table_schema, table_name, "constraint_name" + FROM TABLE(RESULT_SCAN(LAST_QUERY_ID(-4))) + WHERE "table_catalog" ILIKE :CATALOG AND "table_schema" ILIKE :DB_SCHEMA AND "table_name" ILIKE :TABLE + GROUP BY "table_catalog", "table_schema", "table_name", "constraint_name" ), unique_constraints AS ( SELECT - "database_name" table_catalog, - "schema_name" table_schema, - "table_name" table_name, - "constraint_name" constraint_name, - 'UNIQUE' constraint_type, + "database_name" "table_catalog", + "schema_name" "table_schema", + "table_name" "table_name", + "constraint_name" "constraint_name", + 'UNIQUE' "constraint_type", ARRAY_AGG("column_name") WITHIN GROUP (ORDER BY "key_sequence") constraint_column_names, [] constraint_column_usage, FROM TABLE(RESULT_SCAN(:UNIQUE_QUERY_ID)) - WHERE table_catalog ILIKE :CATALOG AND table_schema ILIKE :DB_SCHEMA AND table_name ILIKE :TABLE - GROUP BY table_catalog, table_schema, table_name, "constraint_name" + WHERE "table_catalog" ILIKE :CATALOG AND "table_schema" ILIKE :DB_SCHEMA AND "table_name" ILIKE :TABLE + GROUP BY "table_catalog", "table_schema", "table_name", "constraint_name" ), fk_constraints AS ( SELECT - "fk_database_name" table_catalog, - "fk_schema_name" table_schema, - "fk_table_name" table_name, - "fk_name" constraint_name, - 'FOREIGN KEY' constraint_type, + "fk_database_name" "table_catalog", + "fk_schema_name" "table_schema", + "fk_table_name" "table_name", + "fk_name" "constraint_name", + 'FOREIGN KEY' "constraint_type", ARRAY_AGG("fk_column_name") WITHIN GROUP (ORDER BY "key_sequence") constraint_column_names, ARRAY_AGG({ 'fk_catalog': "pk_database_name", @@ -56,17 +56,17 @@ fk_constraints AS ( 'fk_column_name': "pk_column_name" }) WITHIN GROUP (ORDER BY "key_sequence") constraint_column_usage, FROM TABLE(RESULT_SCAN(:FK_QUERY_ID)) - WHERE table_catalog ILIKE :CATALOG AND table_schema ILIKE :DB_SCHEMA AND table_name ILIKE :TABLE - GROUP BY table_catalog, table_schema, table_name, constraint_name + WHERE "table_catalog" ILIKE :CATALOG AND "table_schema" ILIKE :DB_SCHEMA AND "table_name" ILIKE :TABLE + GROUP BY "table_catalog", "table_schema", "table_name", "constraint_name" ), constraints AS ( SELECT - table_catalog, - table_schema, - table_name, + "table_catalog", + "table_schema", + "table_name", ARRAY_AGG({ - 'constraint_name': constraint_name, - 'constraint_type': constraint_type, + 'constraint_name': "constraint_name", + 'constraint_type': "constraint_type", 'constraint_column_names': constraint_column_names, 'constraint_column_usage': constraint_column_usage }) table_constraints, @@ -77,45 +77,45 @@ constraints AS ( UNION ALL SELECT * FROM fk_constraints ) - GROUP BY table_catalog, table_schema, table_name + GROUP BY "table_catalog", "table_schema", "table_name" ), tables AS ( SELECT - table_catalog catalog_name, - table_schema schema_name, + "database_name" "catalog_name", + "schema_name" "schema_name", ARRAY_AGG({ - 'table_name': table_name, - 'table_type': table_type, + 'table_name': "name", + 'table_type': "kind", 'table_constraints': table_constraints, 'table_columns': null }) db_schema_tables -FROM information_schema.tables +FROM TABLE(RESULT_SCAN(:SHOW_TABLE_QUERY_ID)) LEFT JOIN constraints -USING (table_catalog, table_schema, table_name) -WHERE table_catalog ILIKE :CATALOG AND table_schema ILIKE :DB_SCHEMA AND table_name ILIKE :TABLE -GROUP BY table_catalog, table_schema +ON "database_name" = "table_catalog" AND "schema_name" = "table_schema" AND "name" = "table_name" +WHERE "database_name" ILIKE :CATALOG AND "schema_name" ILIKE :DB_SCHEMA AND "name" ILIKE :TABLE +GROUP BY "database_name", "schema_name" ), db_schemas AS ( SELECT - catalog_name, - schema_name, + "database_name" "catalog_name", + "name" "schema_name", db_schema_tables, - FROM information_schema.schemata + FROM TABLE(RESULT_SCAN(:SHOW_SCHEMA_QUERY_ID)) LEFT JOIN tables - USING (catalog_name, schema_name) - WHERE catalog_name ILIKE :CATALOG AND schema_name ILIKE :DB_SCHEMA + ON "database_name" = "catalog_name" AND "name" = tables."schema_name" + WHERE "database_name" ILIKE :CATALOG AND "name" ILIKE :DB_SCHEMA ) SELECT { - 'catalog_name': database_name, + 'catalog_name': "name", 'catalog_db_schemas': ARRAY_AGG({ - 'db_schema_name': schema_name, + 'db_schema_name': db_schemas."schema_name", 'db_schema_tables': db_schema_tables }) } get_objects FROM - information_schema.databases + TABLE(RESULT_SCAN(:SHOW_DB_QUERY_ID)) LEFT JOIN db_schemas -ON database_name = catalog_name -WHERE database_name ILIKE :CATALOG -GROUP BY database_name; +ON "name" = "catalog_name" +WHERE "name" ILIKE :CATALOG +GROUP BY "name";
