This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 49cbd98e51b [FLINK-38312][table] Add support for `SHOW MATERIALIZED TABLES` 49cbd98e51b is described below commit 49cbd98e51bc6fbc43845ca87bec74e1a0531769 Author: Sergey Nuyanzin <snuyan...@gmail.com> AuthorDate: Wed Sep 3 21:05:33 2025 +0200 [FLINK-38312][table] Add support for `SHOW MATERIALIZED TABLES` --- flink-python/pyflink/table/catalog.py | 12 +++ flink-python/pyflink/table/table_environment.py | 13 +++ .../cli/parser/SqlCommandParserImplTest.java | 1 + .../src/main/codegen/data/Parser.tdd | 3 +- .../src/main/codegen/includes/parserImpls.ftl | 51 +++-------- .../apache/flink/sql/parser/dql/SqlShowTables.java | 35 ++++++-- .../apache/flink/sql/parser/dql/SqlShowViews.java | 66 --------------- .../apache/flink/table/api/TableEnvironment.java | 10 +++ .../table/api/internal/TableEnvironmentImpl.java | 5 ++ .../apache/flink/table/catalog/CatalogManager.java | 30 +++++++ .../table/catalog/GenericInMemoryCatalog.java | 78 +++++++---------- .../ShowMaterializedTablesOperation.java | 98 ++++++++++++++++++++++ .../org/apache/flink/table/catalog/Catalog.java | 25 +++++- .../operations/converters/SqlNodeConverters.java | 1 - .../converters/SqlShowTablesConverter.java | 27 +++++- .../converters/SqlShowViewsConverter.java | 52 ------------ .../table/planner/calcite/FlinkPlannerImpl.scala | 1 - .../table/planner/catalog/UnknownCatalogTest.java | 5 +- .../operations/SqlOtherOperationConverterTest.java | 34 +++++++- .../SqlShowToOperationConverterTest.java | 18 +++- 20 files changed, 340 insertions(+), 225 deletions(-) diff --git a/flink-python/pyflink/table/catalog.py b/flink-python/pyflink/table/catalog.py index 8033a95ba2c..40bb06c8eb4 100644 --- a/flink-python/pyflink/table/catalog.py +++ b/flink-python/pyflink/table/catalog.py @@ -157,6 +157,18 @@ class Catalog(object): """ return list(self._j_catalog.listViews(database_name)) + def list_materialized_tables(self, database_name: str) -> List[str]: + """ + Get names of all materialized tables under this database. + An empty list is returned if none exists. + + :param database_name: Name of the given database. + :return: A list of the names of all materialized tables in the given database. + :raise: CatalogException in case of any runtime exception. + DatabaseNotExistException if the database does not exist. + """ + return list(self._j_catalog.listMaterializedTables(database_name)) + def get_table(self, table_path: 'ObjectPath') -> 'CatalogBaseTable': """ Get a CatalogTable or CatalogView identified by tablePath. diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index 5d3debe59a5..539815974b4 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -619,6 +619,19 @@ class TableEnvironment(object): j_view_name_array = self._j_tenv.listViews() return [item for item in j_view_name_array] + def list_materialized_tables(self) -> List[str]: + """ + Gets the names of all materialized tables available + in the current namespace (the current database of the current catalog). + + :return: A list of the names of all registered materialized tables + in the current database of the current catalog. + + .. versionadded:: 2.2.0 + """ + j_materialized_table_name_array = self._j_tenv.listMaterializedTables() + return [item for item in j_materialized_table_name_array] + def list_user_defined_functions(self) -> List[str]: """ Gets the names of all user defined functions registered in this environment. diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/parser/SqlCommandParserImplTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/parser/SqlCommandParserImplTest.java index 84e02975691..6490c633731 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/parser/SqlCommandParserImplTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/parser/SqlCommandParserImplTest.java @@ -88,6 +88,7 @@ public class SqlCommandParserImplTest { TestSpec.of("SHOW CREATE VIEW (what_ever);", OTHER), TestSpec.of("SHOW CREATE syntax_error;", OTHER), TestSpec.of("SHOW TABLES;", OTHER), + TestSpec.of("SHOW MATERIALIZED TABLES;", OTHER), TestSpec.of("BEGIN STATEMENT SET;", OTHER), TestSpec.of("BEGIN statement;", OTHER), TestSpec.of("END;", OTHER), diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 05619b382e7..478bafc4005 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -131,6 +131,7 @@ "org.apache.flink.sql.parser.dql.SqlShowPartitions" "org.apache.flink.sql.parser.dql.SqlShowProcedures" "org.apache.flink.sql.parser.dql.SqlShowTables" + "org.apache.flink.sql.parser.dql.SqlShowTables.SqlTableKind" "org.apache.flink.sql.parser.dql.SqlShowColumns" "org.apache.flink.sql.parser.dql.SqlShowCreate" "org.apache.flink.sql.parser.dql.SqlShowCreateMaterializedTable" @@ -138,7 +139,6 @@ "org.apache.flink.sql.parser.dql.SqlShowCreateTable" "org.apache.flink.sql.parser.dql.SqlShowCreateView" "org.apache.flink.sql.parser.dql.SqlShowCreateCatalog" - "org.apache.flink.sql.parser.dql.SqlShowViews" "org.apache.flink.sql.parser.dql.SqlRichDescribeFunction" "org.apache.flink.sql.parser.dql.SqlRichDescribeModel" "org.apache.flink.sql.parser.dql.SqlRichDescribeTable" @@ -625,7 +625,6 @@ "SqlShowModules()" "SqlShowPartitions()" "SqlShowProcedures()" - "SqlShowViews()" "SqlUnloadModule()" "SqlUseModules()" "SqlRichExplain()" diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 45b21f51af6..c84ecbc0ea3 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -660,46 +660,9 @@ SqlShowProcedures SqlShowProcedures() : } } -/** - * SHOW VIEWS FROM [catalog.] database sql call. - */ -SqlShowViews SqlShowViews() : -{ - SqlIdentifier databaseName = null; - SqlCharStringLiteral likeLiteral = null; - String prep = null; - boolean notLike = false; - SqlParserPos pos; -} -{ - <SHOW> <VIEWS> - { pos = getPos(); } - [ - ( <FROM> { prep = "FROM"; } | <IN> { prep = "IN"; } ) - { pos = getPos(); } - databaseName = CompoundIdentifier() - ] - [ - [ - <NOT> - { - notLike = true; - } - ] - <LIKE> <QUOTED_STRING> - { - String likeCondition = SqlParserUtil.parseString(token.image); - likeLiteral = SqlLiteral.createCharString(likeCondition, getPos()); - } - ] - { - return new SqlShowViews(pos, prep, databaseName, notLike, likeLiteral); - } -} - /** * Parses a show tables statement. -* SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE pattern ]; +* SHOW ( VIEWS | [ MATERIALIZED ]TABLES ) [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE pattern ]; */ SqlShowTables SqlShowTables() : { @@ -708,9 +671,17 @@ SqlShowTables SqlShowTables() : String prep = null; boolean notLike = false; SqlParserPos pos; + SqlTableKind kind; } { - <SHOW> <TABLES> + <SHOW> + ( + <TABLES> { kind = SqlTableKind.TABLE; } + | + <VIEWS> { kind = SqlTableKind.VIEW; } + | + <MATERIALIZED> <TABLES> { kind = SqlTableKind.MATERIALIZED_TABLE; } + ) { pos = getPos(); } [ ( <FROM> { prep = "FROM"; } | <IN> { prep = "IN"; } ) @@ -731,7 +702,7 @@ SqlShowTables SqlShowTables() : } ] { - return new SqlShowTables(pos, prep, databaseName, notLike, likeLiteral); + return new SqlShowTables(pos, kind, prep, databaseName, notLike, likeLiteral); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowTables.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowTables.java index 4c1d8592a35..626e21f9348 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowTables.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowTables.java @@ -29,17 +29,17 @@ import org.apache.calcite.sql.parser.SqlParserPos; * SHOW TABLES sql call. The full syntax for show functions is as followings: * * <pre>{@code - * SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE + * SHOW ( VIEWS | [ MATERIALIZED ]TABLES ) [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE * <sql_like_pattern> ] statement * }</pre> */ public class SqlShowTables extends SqlShowCall { - public static final SqlSpecialOperator OPERATOR = - new SqlSpecialOperator("SHOW TABLES", SqlKind.OTHER); + private final SqlTableKind kind; public SqlShowTables( SqlParserPos pos, + SqlTableKind kind, String preposition, SqlIdentifier databaseName, boolean notLike, @@ -52,15 +52,40 @@ public class SqlShowTables extends SqlShowCall { likeLiteral == null ? null : "LIKE", likeLiteral, notLike); + this.kind = kind; } @Override public SqlOperator getOperator() { - return OPERATOR; + return kind.getOperator(); } @Override String getOperationName() { - return "SHOW TABLES"; + return getOperator().getName(); + } + + public SqlTableKind getTableKind() { + return kind; + } + + /** + * The kind of table. Keep in sync with {@link + * org.apache.flink.table.catalog.CatalogBaseTable.TableKind}. + */ + public enum SqlTableKind { + MATERIALIZED_TABLE(new SqlSpecialOperator("SHOW MATERIALIZED TABLES", SqlKind.OTHER)), + TABLE(new SqlSpecialOperator("SHOW TABLES", SqlKind.OTHER)), + VIEW(new SqlSpecialOperator("SHOW VIEWS", SqlKind.OTHER)); + + private final SqlSpecialOperator operator; + + SqlTableKind(final SqlSpecialOperator operator) { + this.operator = operator; + } + + public SqlSpecialOperator getOperator() { + return operator; + } } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowViews.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowViews.java deleted file mode 100644 index 3fb593f8f04..00000000000 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowViews.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.sql.parser.dql; - -import org.apache.calcite.sql.SqlCharStringLiteral; -import org.apache.calcite.sql.SqlIdentifier; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.SqlOperator; -import org.apache.calcite.sql.SqlSpecialOperator; -import org.apache.calcite.sql.parser.SqlParserPos; - -/** - * SHOW VIEWS sql call. The full syntax for show functions is as followings: - * - * <pre>{@code - * SHOW VIEWS [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE - * <sql_like_pattern> ] statement - * }</pre> - */ -public class SqlShowViews extends SqlShowCall { - - public static final SqlSpecialOperator OPERATOR = - new SqlSpecialOperator("SHOW VIEWS", SqlKind.OTHER); - - public SqlShowViews( - SqlParserPos pos, - String preposition, - SqlIdentifier databaseName, - boolean notLike, - SqlCharStringLiteral likeLiteral) { - // only LIKE currently supported for SHOW VIEWS - super( - pos, - preposition, - databaseName, - likeLiteral == null ? null : "LIKE", - likeLiteral, - notLike); - } - - @Override - public SqlOperator getOperator() { - return OPERATOR; - } - - @Override - String getOperationName() { - return "SHOW VIEWS"; - } -} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java index b0bc8538833..dada96ae619 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java @@ -1236,6 +1236,16 @@ public interface TableEnvironment { */ String[] listViews(); + /** + * Gets the names of all materialized tables available in the current namespace (the current + * database of the current catalog). + * + * @return A list of the names of all registered materialized tables in the current database of + * the current catalog. + * @see #listTemporaryViews() + */ + String[] listMaterializedTables(); + /** * Gets the names of all temporary tables and views available in the current namespace (the * current database of the current catalog). diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 37017b9df32..058f7a92be9 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -731,6 +731,11 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { return catalogManager.listViews().stream().sorted().toArray(String[]::new); } + @Override + public String[] listMaterializedTables() { + return catalogManager.listMaterializedTables().stream().sorted().toArray(String[]::new); + } + @Override public String[] listTemporaryTables() { return catalogManager.listTemporaryTables().stream().sorted().toArray(String[]::new); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index 2d6d090b3ee..c1ee34749af 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -904,6 +904,36 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { .filter(e -> e.getValue() instanceof CatalogView); } + /** + * Returns an array of names of materialized tables registered in the namespace of the current + * catalog and database. + * + * @return names of registered materialized tables + */ + public Set<String> listMaterializedTables() { + return listMaterializedTables(getCurrentCatalog(), getCurrentDatabase()); + } + + /** + * Returns an array of names of materialized tables registered in the namespace of the given + * catalog and database. + * + * @return names of registered materialized tables + */ + public Set<String> listMaterializedTables(String catalogName, String databaseName) { + Catalog catalog = getCatalogOrThrowException(catalogName); + if (catalog == null) { + throw new ValidationException(String.format("Catalog %s does not exist", catalogName)); + } + + try { + return new HashSet<>(catalog.listMaterializedTables(databaseName)); + } catch (DatabaseNotExistException e) { + throw new ValidationException( + String.format("Database %s does not exist", databaseName), e); + } + } + /** * Lists all available schemas in the root of the catalog manager. It is not equivalent to * listing all catalogs as it includes also different catalog parts of the temporary objects. diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java index a41a012f9d1..15c8f00dc91 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java @@ -44,6 +44,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.function.Predicate; import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkArgument; @@ -325,35 +326,20 @@ public class GenericInMemoryCatalog extends AbstractCatalog { @Override public List<String> listTables(String databaseName) throws DatabaseNotExistException { - checkArgument( - !StringUtils.isNullOrWhitespaceOnly(databaseName), - "databaseName cannot be null or empty"); - - if (!databaseExists(databaseName)) { - throw new DatabaseNotExistException(getName(), databaseName); - } - - return tables.keySet().stream() - .filter(k -> k.getDatabaseName().equals(databaseName)) - .map(k -> k.getObjectName()) - .collect(Collectors.toList()); + return listObjectsUnderDatabase(tables, databaseName, objectPath -> true); } @Override public List<String> listViews(String databaseName) throws DatabaseNotExistException { - checkArgument( - !StringUtils.isNullOrWhitespaceOnly(databaseName), - "databaseName cannot be null or empty"); - - if (!databaseExists(databaseName)) { - throw new DatabaseNotExistException(getName(), databaseName); - } + return listObjectsUnderDatabase( + tables, databaseName, k -> (tables.get(k) instanceof CatalogView)); + } - return tables.keySet().stream() - .filter(k -> k.getDatabaseName().equals(databaseName)) - .filter(k -> (tables.get(k) instanceof CatalogView)) - .map(k -> k.getObjectName()) - .collect(Collectors.toList()); + @Override + public List<String> listMaterializedTables(String databaseName) + throws DatabaseNotExistException { + return listObjectsUnderDatabase( + tables, databaseName, k -> (tables.get(k) instanceof CatalogMaterializedTable)); } @Override @@ -447,18 +433,7 @@ public class GenericInMemoryCatalog extends AbstractCatalog { @Override public List<String> listModels(String databaseName) throws DatabaseNotExistException { - checkArgument( - !StringUtils.isNullOrWhitespaceOnly(databaseName), - "databaseName cannot be null or empty"); - - if (!databaseExists(databaseName)) { - throw new DatabaseNotExistException(getName(), databaseName); - } - - return models.keySet().stream() - .filter(k -> k.getDatabaseName().equals(databaseName)) - .map(k -> k.getObjectName()) - .collect(Collectors.toList()); + return listObjectsUnderDatabase(models, databaseName, k -> true); } @Override @@ -543,18 +518,7 @@ public class GenericInMemoryCatalog extends AbstractCatalog { @Override public List<String> listFunctions(String databaseName) throws DatabaseNotExistException { - checkArgument( - !StringUtils.isNullOrWhitespaceOnly(databaseName), - "databaseName cannot be null or empty"); - - if (!databaseExists(databaseName)) { - throw new DatabaseNotExistException(getName(), databaseName); - } - - return functions.keySet().stream() - .filter(k -> k.getDatabaseName().equals(databaseName)) - .map(k -> k.getObjectName()) - .collect(Collectors.toList()); + return listObjectsUnderDatabase(functions, databaseName, k -> true); } @Override @@ -904,4 +868,22 @@ public class GenericInMemoryCatalog extends AbstractCatalog { throw new PartitionNotExistException(getName(), tablePath, partitionSpec); } } + + private List<String> listObjectsUnderDatabase( + Map<ObjectPath, ?> map, String databaseName, Predicate<ObjectPath> filter) + throws DatabaseNotExistException { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "databaseName cannot be null or empty"); + + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + return map.keySet().stream() + .filter(k -> k.getDatabaseName().equals(databaseName)) + .filter(filter) + .map(ObjectPath::getObjectName) + .collect(Collectors.toList()); + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowMaterializedTablesOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowMaterializedTablesOperation.java new file mode 100644 index 00000000000..d800b9228b8 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowMaterializedTablesOperation.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.operations.utils.ShowLikeOperator; + +import javax.annotation.Nullable; + +import java.util.Collection; + +/** + * Operation to describe a SHOW MATERIALIZED TABLES statement. The full syntax for SHOW MATERIALIZED + * TABLES is as followings: + * + * <pre>{@code + * SHOW MATERIALIZED TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE + * <sql_like_pattern> ] statement + * }</pre> + */ +@Internal +public class ShowMaterializedTablesOperation extends AbstractShowOperation { + private final @Nullable String databaseName; + + public ShowMaterializedTablesOperation( + @Nullable String catalogName, + @Nullable String databaseName, + @Nullable String preposition, + @Nullable ShowLikeOperator likeOp) { + super(catalogName, preposition, likeOp); + this.databaseName = databaseName; + } + + public ShowMaterializedTablesOperation( + @Nullable String catalogName, + @Nullable String databaseName, + @Nullable ShowLikeOperator likeOp) { + this(catalogName, databaseName, null, likeOp); + } + + @Override + protected Collection<String> retrieveDataForTableResult(Context ctx) { + final CatalogManager catalogManager = ctx.getCatalogManager(); + final String qualifiedCatalogName = catalogManager.qualifyCatalog(catalogName); + final String qualifiedDatabaseName = catalogManager.qualifyDatabase(databaseName); + if (preposition == null) { + return catalogManager.listMaterializedTables(); + } else { + Catalog catalog = catalogManager.getCatalogOrThrowException(qualifiedCatalogName); + if (catalog.databaseExists(qualifiedDatabaseName)) { + return catalogManager.listMaterializedTables( + qualifiedCatalogName, qualifiedDatabaseName); + } else { + throw new ValidationException( + String.format( + "Database '%s'.'%s' doesn't exist.", + qualifiedCatalogName, qualifiedDatabaseName)); + } + } + } + + @Override + protected String getOperationName() { + return "SHOW MATERIALIZED TABLES"; + } + + @Override + protected String getColumnName() { + return "materialized table"; + } + + @Override + public String getPrepositionSummaryString() { + if (databaseName == null) { + return super.getPrepositionSummaryString(); + } + return super.getPrepositionSummaryString() + "." + databaseName; + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java index bc8c8f185a3..e230b616cdb 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java @@ -214,13 +214,13 @@ public interface Catalog { void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException; - // ------ tables and views ------ + // ------ tables, views and materialized tables ------ /** - * Get names of all tables and views under this database. An empty list is returned if none - * exists. + * Get names of all tables, views and materialized tables under this database. An empty list is + * returned if none exists. * - * @return a list of the names of all tables and views in this database + * @return a list of the names of all tables, views and materialized tables in this database * @throws DatabaseNotExistException if the database does not exist * @throws CatalogException in case of any runtime exception */ @@ -236,6 +236,23 @@ public interface Catalog { */ List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException; + /** + * Get names of all materialized tables under this database. An empty list is returned if none + * exists. + * + * @param databaseName the name of the given database + * @return a list of the names of all materialized tables in the given database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException in case of any runtime exception + */ + default List<String> listMaterializedTables(String databaseName) + throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException( + String.format( + "listMaterializedTables(String) is not implemented for %s.", + this.getClass())); + } + /** * Returns a {@link CatalogTable} or {@link CatalogView} identified by the given {@link * ObjectPath}. The framework will resolve the metadata objects when necessary. diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java index 26fe98680fe..9f67c8bde10 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java @@ -73,7 +73,6 @@ public class SqlNodeConverters { register(new SqlDropMaterializedTableConverter()); register(new SqlDropModelConverter()); register(new SqlShowTablesConverter()); - register(new SqlShowViewsConverter()); register(new SqlShowCatalogsConverter()); register(new SqlDescribeFunctionConverter()); register(new SqlDescribeModelConverter()); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowTablesConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowTablesConverter.java index d252b513085..1cf3fd67536 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowTablesConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowTablesConverter.java @@ -19,8 +19,11 @@ package org.apache.flink.table.planner.operations.converters; import org.apache.flink.sql.parser.dql.SqlShowTables; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ShowMaterializedTablesOperation; import org.apache.flink.table.operations.ShowTablesOperation; +import org.apache.flink.table.operations.ShowViewsOperation; import org.apache.flink.table.operations.utils.ShowLikeOperator; import javax.annotation.Nullable; @@ -32,7 +35,17 @@ public class SqlShowTablesConverter extends AbstractSqlShowConverter<SqlShowTabl @Nullable String catalogName, @Nullable String databaseName, @Nullable ShowLikeOperator likeOp) { - return new ShowTablesOperation(catalogName, databaseName, likeOp); + switch (sqlShowCall.getTableKind()) { + case MATERIALIZED_TABLE: + return new ShowMaterializedTablesOperation(catalogName, databaseName, likeOp); + case TABLE: + return new ShowTablesOperation(catalogName, databaseName, likeOp); + case VIEW: + return new ShowViewsOperation(catalogName, databaseName, likeOp); + default: + throw new ValidationException( + "Not supported table kind " + sqlShowCall.getTableKind() + " yet"); + } } @Override @@ -42,7 +55,17 @@ public class SqlShowTablesConverter extends AbstractSqlShowConverter<SqlShowTabl @Nullable String databaseName, String prep, @Nullable ShowLikeOperator likeOp) { - return new ShowTablesOperation(catalogName, databaseName, prep, likeOp); + switch (sqlShowCall.getTableKind()) { + case MATERIALIZED_TABLE: + return new ShowMaterializedTablesOperation(catalogName, databaseName, prep, likeOp); + case TABLE: + return new ShowTablesOperation(catalogName, databaseName, prep, likeOp); + case VIEW: + return new ShowViewsOperation(catalogName, databaseName, prep, likeOp); + default: + throw new ValidationException( + "Not supported table kind " + sqlShowCall.getTableKind() + " yet"); + } } @Override diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowViewsConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowViewsConverter.java deleted file mode 100644 index b2f89a9a737..00000000000 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowViewsConverter.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.operations.converters; - -import org.apache.flink.sql.parser.dql.SqlShowViews; -import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.operations.ShowViewsOperation; -import org.apache.flink.table.operations.utils.ShowLikeOperator; - -import javax.annotation.Nullable; - -public class SqlShowViewsConverter extends AbstractSqlShowConverter<SqlShowViews> { - @Override - public Operation getOperationWithoutPrep( - SqlShowViews sqlShowCall, - @Nullable String catalogName, - @Nullable String databaseName, - @Nullable ShowLikeOperator likeOp) { - return new ShowViewsOperation(catalogName, databaseName, likeOp); - } - - @Override - public Operation getOperation( - SqlShowViews sqlShowCall, - @Nullable String catalogName, - @Nullable String databaseName, - String prep, - @Nullable ShowLikeOperator likeOp) { - return new ShowViewsOperation(catalogName, databaseName, prep, likeOp); - } - - @Override - public Operation convertSqlNode(SqlShowViews sqlShowViews, ConvertContext context) { - return convertShowOperation(sqlShowViews, context); - } -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala index 9d8570848e7..ee329a0ecd6 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala @@ -144,7 +144,6 @@ class FlinkPlannerImpl( || sqlNode.isInstanceOf[SqlShowFunctions] || sqlNode.isInstanceOf[SqlShowJars] || sqlNode.isInstanceOf[SqlShowModules] - || sqlNode.isInstanceOf[SqlShowViews] || sqlNode.isInstanceOf[SqlShowColumns] || sqlNode.isInstanceOf[SqlShowPartitions] || sqlNode.isInstanceOf[SqlShowProcedures] diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java index 42df8dda6c9..756e429ee08 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java @@ -118,7 +118,9 @@ class UnknownCatalogTest { "SHOW PROCEDURES", "SHOW PROCEDURES IN db", "SHOW COLUMNS IN db", - "SHOW DATABASES" + "SHOW DATABASES", + "SHOW MATERIALIZED TABLES", + "SHOW MATERIALIZED TABLES IN db" }) void showForUnsetCatalog(String sql) { TableEnvironment tEnv = TableEnvironment.create(ENVIRONMENT_SETTINGS); @@ -133,6 +135,7 @@ class UnknownCatalogTest { @ValueSource( strings = { "SHOW TABLES", + "SHOW MATERIALIZED TABLES", "SHOW VIEWS", "SHOW PROCEDURES", // Here `db` is considered as object name diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java index 07e62ca75eb..56c9e0a95e9 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java @@ -30,6 +30,7 @@ import org.apache.flink.table.operations.ShowCreateCatalogOperation; import org.apache.flink.table.operations.ShowDatabasesOperation; import org.apache.flink.table.operations.ShowFunctionsOperation; import org.apache.flink.table.operations.ShowFunctionsOperation.FunctionScope; +import org.apache.flink.table.operations.ShowMaterializedTablesOperation; import org.apache.flink.table.operations.ShowModulesOperation; import org.apache.flink.table.operations.ShowPartitionsOperation; import org.apache.flink.table.operations.ShowProceduresOperation; @@ -73,7 +74,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; * Test cases for the statements that neither belong to DDL nor DML for {@link * SqlNodeToOperationConversion}. */ -public class SqlOtherOperationConverterTest extends SqlNodeToOperationConversionTestBase { +class SqlOtherOperationConverterTest extends SqlNodeToOperationConversionTestBase { @Test void testUseCatalog() { @@ -307,6 +308,37 @@ public class SqlOtherOperationConverterTest extends SqlNodeToOperationConversion "SHOW VIEWS")); } + @ParameterizedTest(name = "{index}: {0}") + @MethodSource("inputForShowMaterializedTablesTest") + void testShowMaterializedTables( + String sql, ShowMaterializedTablesOperation expected, String expectedSummary) { + Operation operation = parse(sql); + assertThat(operation) + .isInstanceOf(ShowMaterializedTablesOperation.class) + .isEqualTo(expected); + assertThat(operation.asSummaryString()).isEqualTo(expectedSummary); + } + + private static Stream<Arguments> inputForShowMaterializedTablesTest() { + return Stream.of( + Arguments.of( + "SHOW MATERIALIZED TABLES from cat1.db1 not like 't%'", + new ShowMaterializedTablesOperation( + "cat1", + "db1", + "FROM", + ShowLikeOperator.of(LikeType.NOT_LIKE, "t%")), + "SHOW MATERIALIZED TABLES FROM cat1.db1 NOT LIKE 't%'"), + Arguments.of( + "SHOW MATERIALIZED TABLES in db2", + new ShowMaterializedTablesOperation("builtin", "db2", "IN", null), + "SHOW MATERIALIZED TABLES IN builtin.db2"), + Arguments.of( + "SHOW MATERIALIZED TABLES", + new ShowMaterializedTablesOperation("builtin", "default", null, null), + "SHOW MATERIALIZED TABLES")); + } + @Test void testShowCreateCatalog() { Operation operation = parse("show create catalog cat1"); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlShowToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlShowToOperationConverterTest.java index e625df03ccd..60606aa9771 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlShowToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlShowToOperationConverterTest.java @@ -42,7 +42,14 @@ public class SqlShowToOperationConverterTest extends SqlNodeToOperationConversio } @ParameterizedTest - @ValueSource(strings = {"SHOW TABLES", "SHOW VIEWS", "SHOW FUNCTIONS", "SHOW PROCEDURES"}) + @ValueSource( + strings = { + "SHOW TABLES", + "SHOW VIEWS", + "SHOW FUNCTIONS", + "SHOW PROCEDURES", + "SHOW MATERIALIZED TABLES" + }) void testParseShowFunctionForUnsetCatalog(String sql) { catalogManager.setCurrentCatalog(null); // No exception should be thrown during parsing. @@ -51,7 +58,14 @@ public class SqlShowToOperationConverterTest extends SqlNodeToOperationConversio } @ParameterizedTest - @ValueSource(strings = {"SHOW TABLES", "SHOW VIEWS", "SHOW FUNCTIONS", "SHOW PROCEDURES"}) + @ValueSource( + strings = { + "SHOW TABLES", + "SHOW VIEWS", + "SHOW FUNCTIONS", + "SHOW PROCEDURES", + "SHOW MATERIALIZED TABLES" + }) void testParseShowFunctionForUnsetDatabase(String sql) { catalogManager.setCurrentDatabase(null); // No exception should be thrown during parsing.