This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 49cbd98e51b [FLINK-38312][table] Add support for `SHOW MATERIALIZED 
TABLES`
49cbd98e51b is described below

commit 49cbd98e51bc6fbc43845ca87bec74e1a0531769
Author: Sergey Nuyanzin <snuyan...@gmail.com>
AuthorDate: Wed Sep 3 21:05:33 2025 +0200

    [FLINK-38312][table] Add support for `SHOW MATERIALIZED TABLES`
---
 flink-python/pyflink/table/catalog.py              | 12 +++
 flink-python/pyflink/table/table_environment.py    | 13 +++
 .../cli/parser/SqlCommandParserImplTest.java       |  1 +
 .../src/main/codegen/data/Parser.tdd               |  3 +-
 .../src/main/codegen/includes/parserImpls.ftl      | 51 +++--------
 .../apache/flink/sql/parser/dql/SqlShowTables.java | 35 ++++++--
 .../apache/flink/sql/parser/dql/SqlShowViews.java  | 66 ---------------
 .../apache/flink/table/api/TableEnvironment.java   | 10 +++
 .../table/api/internal/TableEnvironmentImpl.java   |  5 ++
 .../apache/flink/table/catalog/CatalogManager.java | 30 +++++++
 .../table/catalog/GenericInMemoryCatalog.java      | 78 +++++++----------
 .../ShowMaterializedTablesOperation.java           | 98 ++++++++++++++++++++++
 .../org/apache/flink/table/catalog/Catalog.java    | 25 +++++-
 .../operations/converters/SqlNodeConverters.java   |  1 -
 .../converters/SqlShowTablesConverter.java         | 27 +++++-
 .../converters/SqlShowViewsConverter.java          | 52 ------------
 .../table/planner/calcite/FlinkPlannerImpl.scala   |  1 -
 .../table/planner/catalog/UnknownCatalogTest.java  |  5 +-
 .../operations/SqlOtherOperationConverterTest.java | 34 +++++++-
 .../SqlShowToOperationConverterTest.java           | 18 +++-
 20 files changed, 340 insertions(+), 225 deletions(-)

diff --git a/flink-python/pyflink/table/catalog.py 
b/flink-python/pyflink/table/catalog.py
index 8033a95ba2c..40bb06c8eb4 100644
--- a/flink-python/pyflink/table/catalog.py
+++ b/flink-python/pyflink/table/catalog.py
@@ -157,6 +157,18 @@ class Catalog(object):
         """
         return list(self._j_catalog.listViews(database_name))
 
+    def list_materialized_tables(self, database_name: str) -> List[str]:
+        """
+        Get names of all materialized tables under this database.
+        An empty list is returned if none exists.
+
+        :param database_name: Name of the given database.
+        :return: A list of the names of all materialized tables in the given 
database.
+        :raise: CatalogException in case of any runtime exception.
+                DatabaseNotExistException if the database does not exist.
+        """
+        return list(self._j_catalog.listMaterializedTables(database_name))
+
     def get_table(self, table_path: 'ObjectPath') -> 'CatalogBaseTable':
         """
         Get a CatalogTable or CatalogView identified by tablePath.
diff --git a/flink-python/pyflink/table/table_environment.py 
b/flink-python/pyflink/table/table_environment.py
index 5d3debe59a5..539815974b4 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -619,6 +619,19 @@ class TableEnvironment(object):
         j_view_name_array = self._j_tenv.listViews()
         return [item for item in j_view_name_array]
 
+    def list_materialized_tables(self) -> List[str]:
+        """
+        Gets the names of all materialized tables available
+        in the current namespace (the current database of the current catalog).
+
+        :return: A list of the names of all registered materialized tables
+        in the current database of the current catalog.
+
+        .. versionadded:: 2.2.0
+        """
+        j_materialized_table_name_array = self._j_tenv.listMaterializedTables()
+        return [item for item in j_materialized_table_name_array]
+
     def list_user_defined_functions(self) -> List[str]:
         """
         Gets the names of all user defined functions registered in this 
environment.
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/parser/SqlCommandParserImplTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/parser/SqlCommandParserImplTest.java
index 84e02975691..6490c633731 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/parser/SqlCommandParserImplTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/parser/SqlCommandParserImplTest.java
@@ -88,6 +88,7 @@ public class SqlCommandParserImplTest {
                 TestSpec.of("SHOW CREATE VIEW (what_ever);", OTHER),
                 TestSpec.of("SHOW CREATE syntax_error;", OTHER),
                 TestSpec.of("SHOW TABLES;", OTHER),
+                TestSpec.of("SHOW MATERIALIZED TABLES;", OTHER),
                 TestSpec.of("BEGIN STATEMENT SET;", OTHER),
                 TestSpec.of("BEGIN statement;", OTHER),
                 TestSpec.of("END;", OTHER),
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 05619b382e7..478bafc4005 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -131,6 +131,7 @@
     "org.apache.flink.sql.parser.dql.SqlShowPartitions"
     "org.apache.flink.sql.parser.dql.SqlShowProcedures"
     "org.apache.flink.sql.parser.dql.SqlShowTables"
+    "org.apache.flink.sql.parser.dql.SqlShowTables.SqlTableKind"
     "org.apache.flink.sql.parser.dql.SqlShowColumns"
     "org.apache.flink.sql.parser.dql.SqlShowCreate"
     "org.apache.flink.sql.parser.dql.SqlShowCreateMaterializedTable"
@@ -138,7 +139,6 @@
     "org.apache.flink.sql.parser.dql.SqlShowCreateTable"
     "org.apache.flink.sql.parser.dql.SqlShowCreateView"
     "org.apache.flink.sql.parser.dql.SqlShowCreateCatalog"
-    "org.apache.flink.sql.parser.dql.SqlShowViews"
     "org.apache.flink.sql.parser.dql.SqlRichDescribeFunction"
     "org.apache.flink.sql.parser.dql.SqlRichDescribeModel"
     "org.apache.flink.sql.parser.dql.SqlRichDescribeTable"
@@ -625,7 +625,6 @@
     "SqlShowModules()"
     "SqlShowPartitions()"
     "SqlShowProcedures()"
-    "SqlShowViews()"
     "SqlUnloadModule()"
     "SqlUseModules()"
     "SqlRichExplain()"
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 45b21f51af6..c84ecbc0ea3 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -660,46 +660,9 @@ SqlShowProcedures SqlShowProcedures() :
     }
 }
 
-/**
- * SHOW VIEWS FROM [catalog.] database sql call.
- */
-SqlShowViews SqlShowViews() :
-{
-    SqlIdentifier databaseName = null;
-    SqlCharStringLiteral likeLiteral = null;
-    String prep = null;
-    boolean notLike = false;
-    SqlParserPos pos;
-}
-{
-    <SHOW> <VIEWS>
-    { pos = getPos(); }
-    [
-        ( <FROM> { prep = "FROM"; } | <IN> { prep = "IN"; } )
-        { pos = getPos(); }
-        databaseName = CompoundIdentifier()
-    ]
-    [
-        [
-            <NOT>
-            {
-                notLike = true;
-            }
-        ]
-        <LIKE> <QUOTED_STRING>
-        {
-            String likeCondition = SqlParserUtil.parseString(token.image);
-            likeLiteral = SqlLiteral.createCharString(likeCondition, getPos());
-        }
-    ]
-    {
-        return new SqlShowViews(pos, prep, databaseName, notLike, likeLiteral);
-    }
-}
-
 /**
 * Parses a show tables statement.
-* SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE 
pattern ];
+* SHOW ( VIEWS | [ MATERIALIZED ]TABLES ) [ ( FROM | IN ) 
[catalog_name.]database_name ] [ [NOT] LIKE pattern ];
 */
 SqlShowTables SqlShowTables() :
 {
@@ -708,9 +671,17 @@ SqlShowTables SqlShowTables() :
     String prep = null;
     boolean notLike = false;
     SqlParserPos pos;
+    SqlTableKind kind;
 }
 {
-    <SHOW> <TABLES>
+    <SHOW>
+    (
+           <TABLES> { kind = SqlTableKind.TABLE; }
+       |
+           <VIEWS> { kind = SqlTableKind.VIEW; }
+       |
+           <MATERIALIZED> <TABLES> { kind = SqlTableKind.MATERIALIZED_TABLE; }
+    )
     { pos = getPos(); }
     [
         ( <FROM> { prep = "FROM"; } | <IN> { prep = "IN"; } )
@@ -731,7 +702,7 @@ SqlShowTables SqlShowTables() :
         }
     ]
     {
-        return new SqlShowTables(pos, prep, databaseName, notLike, 
likeLiteral);
+        return new SqlShowTables(pos, kind, prep, databaseName, notLike, 
likeLiteral);
     }
 }
 
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowTables.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowTables.java
index 4c1d8592a35..626e21f9348 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowTables.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowTables.java
@@ -29,17 +29,17 @@ import org.apache.calcite.sql.parser.SqlParserPos;
  * SHOW TABLES sql call. The full syntax for show functions is as followings:
  *
  * <pre>{@code
- * SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE
+ * SHOW ( VIEWS | [ MATERIALIZED ]TABLES ) [ ( FROM | IN ) 
[catalog_name.]database_name ] [ [NOT] LIKE
  * <sql_like_pattern> ] statement
  * }</pre>
  */
 public class SqlShowTables extends SqlShowCall {
 
-    public static final SqlSpecialOperator OPERATOR =
-            new SqlSpecialOperator("SHOW TABLES", SqlKind.OTHER);
+    private final SqlTableKind kind;
 
     public SqlShowTables(
             SqlParserPos pos,
+            SqlTableKind kind,
             String preposition,
             SqlIdentifier databaseName,
             boolean notLike,
@@ -52,15 +52,40 @@ public class SqlShowTables extends SqlShowCall {
                 likeLiteral == null ? null : "LIKE",
                 likeLiteral,
                 notLike);
+        this.kind = kind;
     }
 
     @Override
     public SqlOperator getOperator() {
-        return OPERATOR;
+        return kind.getOperator();
     }
 
     @Override
     String getOperationName() {
-        return "SHOW TABLES";
+        return getOperator().getName();
+    }
+
+    public SqlTableKind getTableKind() {
+        return kind;
+    }
+
+    /**
+     * The kind of table. Keep in sync with {@link
+     * org.apache.flink.table.catalog.CatalogBaseTable.TableKind}.
+     */
+    public enum SqlTableKind {
+        MATERIALIZED_TABLE(new SqlSpecialOperator("SHOW MATERIALIZED TABLES", 
SqlKind.OTHER)),
+        TABLE(new SqlSpecialOperator("SHOW TABLES", SqlKind.OTHER)),
+        VIEW(new SqlSpecialOperator("SHOW VIEWS", SqlKind.OTHER));
+
+        private final SqlSpecialOperator operator;
+
+        SqlTableKind(final SqlSpecialOperator operator) {
+            this.operator = operator;
+        }
+
+        public SqlSpecialOperator getOperator() {
+            return operator;
+        }
     }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowViews.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowViews.java
deleted file mode 100644
index 3fb593f8f04..00000000000
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowViews.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.sql.parser.dql;
-
-import org.apache.calcite.sql.SqlCharStringLiteral;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.SqlSpecialOperator;
-import org.apache.calcite.sql.parser.SqlParserPos;
-
-/**
- * SHOW VIEWS sql call. The full syntax for show functions is as followings:
- *
- * <pre>{@code
- * SHOW VIEWS [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE
- * <sql_like_pattern> ] statement
- * }</pre>
- */
-public class SqlShowViews extends SqlShowCall {
-
-    public static final SqlSpecialOperator OPERATOR =
-            new SqlSpecialOperator("SHOW VIEWS", SqlKind.OTHER);
-
-    public SqlShowViews(
-            SqlParserPos pos,
-            String preposition,
-            SqlIdentifier databaseName,
-            boolean notLike,
-            SqlCharStringLiteral likeLiteral) {
-        // only LIKE currently supported for SHOW VIEWS
-        super(
-                pos,
-                preposition,
-                databaseName,
-                likeLiteral == null ? null : "LIKE",
-                likeLiteral,
-                notLike);
-    }
-
-    @Override
-    public SqlOperator getOperator() {
-        return OPERATOR;
-    }
-
-    @Override
-    String getOperationName() {
-        return "SHOW VIEWS";
-    }
-}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index b0bc8538833..dada96ae619 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -1236,6 +1236,16 @@ public interface TableEnvironment {
      */
     String[] listViews();
 
+    /**
+     * Gets the names of all materialized tables available in the current 
namespace (the current
+     * database of the current catalog).
+     *
+     * @return A list of the names of all registered materialized tables in 
the current database of
+     *     the current catalog.
+     * @see #listTemporaryViews()
+     */
+    String[] listMaterializedTables();
+
     /**
      * Gets the names of all temporary tables and views available in the 
current namespace (the
      * current database of the current catalog).
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 37017b9df32..058f7a92be9 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -731,6 +731,11 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
         return 
catalogManager.listViews().stream().sorted().toArray(String[]::new);
     }
 
+    @Override
+    public String[] listMaterializedTables() {
+        return 
catalogManager.listMaterializedTables().stream().sorted().toArray(String[]::new);
+    }
+
     @Override
     public String[] listTemporaryTables() {
         return 
catalogManager.listTemporaryTables().stream().sorted().toArray(String[]::new);
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 2d6d090b3ee..c1ee34749af 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -904,6 +904,36 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
                 .filter(e -> e.getValue() instanceof CatalogView);
     }
 
+    /**
+     * Returns an array of names of materialized tables registered in the 
namespace of the current
+     * catalog and database.
+     *
+     * @return names of registered materialized tables
+     */
+    public Set<String> listMaterializedTables() {
+        return listMaterializedTables(getCurrentCatalog(), 
getCurrentDatabase());
+    }
+
+    /**
+     * Returns an array of names of materialized tables registered in the 
namespace of the given
+     * catalog and database.
+     *
+     * @return names of registered materialized tables
+     */
+    public Set<String> listMaterializedTables(String catalogName, String 
databaseName) {
+        Catalog catalog = getCatalogOrThrowException(catalogName);
+        if (catalog == null) {
+            throw new ValidationException(String.format("Catalog %s does not 
exist", catalogName));
+        }
+
+        try {
+            return new HashSet<>(catalog.listMaterializedTables(databaseName));
+        } catch (DatabaseNotExistException e) {
+            throw new ValidationException(
+                    String.format("Database %s does not exist", databaseName), 
e);
+        }
+    }
+
     /**
      * Lists all available schemas in the root of the catalog manager. It is 
not equivalent to
      * listing all catalogs as it includes also different catalog parts of the 
temporary objects.
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index a41a012f9d1..15c8f00dc91 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -44,6 +44,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -325,35 +326,20 @@ public class GenericInMemoryCatalog extends 
AbstractCatalog {
 
     @Override
     public List<String> listTables(String databaseName) throws 
DatabaseNotExistException {
-        checkArgument(
-                !StringUtils.isNullOrWhitespaceOnly(databaseName),
-                "databaseName cannot be null or empty");
-
-        if (!databaseExists(databaseName)) {
-            throw new DatabaseNotExistException(getName(), databaseName);
-        }
-
-        return tables.keySet().stream()
-                .filter(k -> k.getDatabaseName().equals(databaseName))
-                .map(k -> k.getObjectName())
-                .collect(Collectors.toList());
+        return listObjectsUnderDatabase(tables, databaseName, objectPath -> 
true);
     }
 
     @Override
     public List<String> listViews(String databaseName) throws 
DatabaseNotExistException {
-        checkArgument(
-                !StringUtils.isNullOrWhitespaceOnly(databaseName),
-                "databaseName cannot be null or empty");
-
-        if (!databaseExists(databaseName)) {
-            throw new DatabaseNotExistException(getName(), databaseName);
-        }
+        return listObjectsUnderDatabase(
+                tables, databaseName, k -> (tables.get(k) instanceof 
CatalogView));
+    }
 
-        return tables.keySet().stream()
-                .filter(k -> k.getDatabaseName().equals(databaseName))
-                .filter(k -> (tables.get(k) instanceof CatalogView))
-                .map(k -> k.getObjectName())
-                .collect(Collectors.toList());
+    @Override
+    public List<String> listMaterializedTables(String databaseName)
+            throws DatabaseNotExistException {
+        return listObjectsUnderDatabase(
+                tables, databaseName, k -> (tables.get(k) instanceof 
CatalogMaterializedTable));
     }
 
     @Override
@@ -447,18 +433,7 @@ public class GenericInMemoryCatalog extends 
AbstractCatalog {
 
     @Override
     public List<String> listModels(String databaseName) throws 
DatabaseNotExistException {
-        checkArgument(
-                !StringUtils.isNullOrWhitespaceOnly(databaseName),
-                "databaseName cannot be null or empty");
-
-        if (!databaseExists(databaseName)) {
-            throw new DatabaseNotExistException(getName(), databaseName);
-        }
-
-        return models.keySet().stream()
-                .filter(k -> k.getDatabaseName().equals(databaseName))
-                .map(k -> k.getObjectName())
-                .collect(Collectors.toList());
+        return listObjectsUnderDatabase(models, databaseName, k -> true);
     }
 
     @Override
@@ -543,18 +518,7 @@ public class GenericInMemoryCatalog extends 
AbstractCatalog {
 
     @Override
     public List<String> listFunctions(String databaseName) throws 
DatabaseNotExistException {
-        checkArgument(
-                !StringUtils.isNullOrWhitespaceOnly(databaseName),
-                "databaseName cannot be null or empty");
-
-        if (!databaseExists(databaseName)) {
-            throw new DatabaseNotExistException(getName(), databaseName);
-        }
-
-        return functions.keySet().stream()
-                .filter(k -> k.getDatabaseName().equals(databaseName))
-                .map(k -> k.getObjectName())
-                .collect(Collectors.toList());
+        return listObjectsUnderDatabase(functions, databaseName, k -> true);
     }
 
     @Override
@@ -904,4 +868,22 @@ public class GenericInMemoryCatalog extends 
AbstractCatalog {
             throw new PartitionNotExistException(getName(), tablePath, 
partitionSpec);
         }
     }
+
+    private List<String> listObjectsUnderDatabase(
+            Map<ObjectPath, ?> map, String databaseName, Predicate<ObjectPath> 
filter)
+            throws DatabaseNotExistException {
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(databaseName),
+                "databaseName cannot be null or empty");
+
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        return map.keySet().stream()
+                .filter(k -> k.getDatabaseName().equals(databaseName))
+                .filter(filter)
+                .map(ObjectPath::getObjectName)
+                .collect(Collectors.toList());
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowMaterializedTablesOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowMaterializedTablesOperation.java
new file mode 100644
index 00000000000..d800b9228b8
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowMaterializedTablesOperation.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.operations.utils.ShowLikeOperator;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+
+/**
+ * Operation to describe a SHOW MATERIALIZED TABLES statement. The full syntax 
for SHOW MATERIALIZED
+ * TABLES is as followings:
+ *
+ * <pre>{@code
+ * SHOW MATERIALIZED TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ 
[NOT] LIKE
+ * &lt;sql_like_pattern&gt; ] statement
+ * }</pre>
+ */
+@Internal
+public class ShowMaterializedTablesOperation extends AbstractShowOperation {
+    private final @Nullable String databaseName;
+
+    public ShowMaterializedTablesOperation(
+            @Nullable String catalogName,
+            @Nullable String databaseName,
+            @Nullable String preposition,
+            @Nullable ShowLikeOperator likeOp) {
+        super(catalogName, preposition, likeOp);
+        this.databaseName = databaseName;
+    }
+
+    public ShowMaterializedTablesOperation(
+            @Nullable String catalogName,
+            @Nullable String databaseName,
+            @Nullable ShowLikeOperator likeOp) {
+        this(catalogName, databaseName, null, likeOp);
+    }
+
+    @Override
+    protected Collection<String> retrieveDataForTableResult(Context ctx) {
+        final CatalogManager catalogManager = ctx.getCatalogManager();
+        final String qualifiedCatalogName = 
catalogManager.qualifyCatalog(catalogName);
+        final String qualifiedDatabaseName = 
catalogManager.qualifyDatabase(databaseName);
+        if (preposition == null) {
+            return catalogManager.listMaterializedTables();
+        } else {
+            Catalog catalog = 
catalogManager.getCatalogOrThrowException(qualifiedCatalogName);
+            if (catalog.databaseExists(qualifiedDatabaseName)) {
+                return catalogManager.listMaterializedTables(
+                        qualifiedCatalogName, qualifiedDatabaseName);
+            } else {
+                throw new ValidationException(
+                        String.format(
+                                "Database '%s'.'%s' doesn't exist.",
+                                qualifiedCatalogName, qualifiedDatabaseName));
+            }
+        }
+    }
+
+    @Override
+    protected String getOperationName() {
+        return "SHOW MATERIALIZED TABLES";
+    }
+
+    @Override
+    protected String getColumnName() {
+        return "materialized table";
+    }
+
+    @Override
+    public String getPrepositionSummaryString() {
+        if (databaseName == null) {
+            return super.getPrepositionSummaryString();
+        }
+        return super.getPrepositionSummaryString() + "." + databaseName;
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index bc8c8f185a3..e230b616cdb 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -214,13 +214,13 @@ public interface Catalog {
     void alterDatabase(String name, CatalogDatabase newDatabase, boolean 
ignoreIfNotExists)
             throws DatabaseNotExistException, CatalogException;
 
-    // ------ tables and views ------
+    // ------ tables, views and materialized tables ------
 
     /**
-     * Get names of all tables and views under this database. An empty list is 
returned if none
-     * exists.
+     * Get names of all tables, views and materialized tables under this 
database. An empty list is
+     * returned if none exists.
      *
-     * @return a list of the names of all tables and views in this database
+     * @return a list of the names of all tables, views and materialized 
tables in this database
      * @throws DatabaseNotExistException if the database does not exist
      * @throws CatalogException in case of any runtime exception
      */
@@ -236,6 +236,23 @@ public interface Catalog {
      */
     List<String> listViews(String databaseName) throws 
DatabaseNotExistException, CatalogException;
 
+    /**
+     * Get names of all materialized tables under this database. An empty list 
is returned if none
+     * exists.
+     *
+     * @param databaseName the name of the given database
+     * @return a list of the names of all materialized tables in the given 
database
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    default List<String> listMaterializedTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "listMaterializedTables(String) is not implemented for 
%s.",
+                        this.getClass()));
+    }
+
     /**
      * Returns a {@link CatalogTable} or {@link CatalogView} identified by the 
given {@link
      * ObjectPath}. The framework will resolve the metadata objects when 
necessary.
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
index 26fe98680fe..9f67c8bde10 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
@@ -73,7 +73,6 @@ public class SqlNodeConverters {
         register(new SqlDropMaterializedTableConverter());
         register(new SqlDropModelConverter());
         register(new SqlShowTablesConverter());
-        register(new SqlShowViewsConverter());
         register(new SqlShowCatalogsConverter());
         register(new SqlDescribeFunctionConverter());
         register(new SqlDescribeModelConverter());
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowTablesConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowTablesConverter.java
index d252b513085..1cf3fd67536 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowTablesConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowTablesConverter.java
@@ -19,8 +19,11 @@
 package org.apache.flink.table.planner.operations.converters;
 
 import org.apache.flink.sql.parser.dql.SqlShowTables;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ShowMaterializedTablesOperation;
 import org.apache.flink.table.operations.ShowTablesOperation;
+import org.apache.flink.table.operations.ShowViewsOperation;
 import org.apache.flink.table.operations.utils.ShowLikeOperator;
 
 import javax.annotation.Nullable;
@@ -32,7 +35,17 @@ public class SqlShowTablesConverter extends 
AbstractSqlShowConverter<SqlShowTabl
             @Nullable String catalogName,
             @Nullable String databaseName,
             @Nullable ShowLikeOperator likeOp) {
-        return new ShowTablesOperation(catalogName, databaseName, likeOp);
+        switch (sqlShowCall.getTableKind()) {
+            case MATERIALIZED_TABLE:
+                return new ShowMaterializedTablesOperation(catalogName, 
databaseName, likeOp);
+            case TABLE:
+                return new ShowTablesOperation(catalogName, databaseName, 
likeOp);
+            case VIEW:
+                return new ShowViewsOperation(catalogName, databaseName, 
likeOp);
+            default:
+                throw new ValidationException(
+                        "Not supported table kind " + 
sqlShowCall.getTableKind() + " yet");
+        }
     }
 
     @Override
@@ -42,7 +55,17 @@ public class SqlShowTablesConverter extends 
AbstractSqlShowConverter<SqlShowTabl
             @Nullable String databaseName,
             String prep,
             @Nullable ShowLikeOperator likeOp) {
-        return new ShowTablesOperation(catalogName, databaseName, prep, 
likeOp);
+        switch (sqlShowCall.getTableKind()) {
+            case MATERIALIZED_TABLE:
+                return new ShowMaterializedTablesOperation(catalogName, 
databaseName, prep, likeOp);
+            case TABLE:
+                return new ShowTablesOperation(catalogName, databaseName, 
prep, likeOp);
+            case VIEW:
+                return new ShowViewsOperation(catalogName, databaseName, prep, 
likeOp);
+            default:
+                throw new ValidationException(
+                        "Not supported table kind " + 
sqlShowCall.getTableKind() + " yet");
+        }
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowViewsConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowViewsConverter.java
deleted file mode 100644
index b2f89a9a737..00000000000
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowViewsConverter.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.operations.converters;
-
-import org.apache.flink.sql.parser.dql.SqlShowViews;
-import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.ShowViewsOperation;
-import org.apache.flink.table.operations.utils.ShowLikeOperator;
-
-import javax.annotation.Nullable;
-
-public class SqlShowViewsConverter extends 
AbstractSqlShowConverter<SqlShowViews> {
-    @Override
-    public Operation getOperationWithoutPrep(
-            SqlShowViews sqlShowCall,
-            @Nullable String catalogName,
-            @Nullable String databaseName,
-            @Nullable ShowLikeOperator likeOp) {
-        return new ShowViewsOperation(catalogName, databaseName, likeOp);
-    }
-
-    @Override
-    public Operation getOperation(
-            SqlShowViews sqlShowCall,
-            @Nullable String catalogName,
-            @Nullable String databaseName,
-            String prep,
-            @Nullable ShowLikeOperator likeOp) {
-        return new ShowViewsOperation(catalogName, databaseName, prep, likeOp);
-    }
-
-    @Override
-    public Operation convertSqlNode(SqlShowViews sqlShowViews, ConvertContext 
context) {
-        return convertShowOperation(sqlShowViews, context);
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index 9d8570848e7..ee329a0ecd6 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -144,7 +144,6 @@ class FlinkPlannerImpl(
         || sqlNode.isInstanceOf[SqlShowFunctions]
         || sqlNode.isInstanceOf[SqlShowJars]
         || sqlNode.isInstanceOf[SqlShowModules]
-        || sqlNode.isInstanceOf[SqlShowViews]
         || sqlNode.isInstanceOf[SqlShowColumns]
         || sqlNode.isInstanceOf[SqlShowPartitions]
         || sqlNode.isInstanceOf[SqlShowProcedures]
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java
index 42df8dda6c9..756e429ee08 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java
@@ -118,7 +118,9 @@ class UnknownCatalogTest {
                 "SHOW PROCEDURES",
                 "SHOW PROCEDURES IN db",
                 "SHOW COLUMNS IN db",
-                "SHOW DATABASES"
+                "SHOW DATABASES",
+                "SHOW MATERIALIZED TABLES",
+                "SHOW MATERIALIZED TABLES IN db"
             })
     void showForUnsetCatalog(String sql) {
         TableEnvironment tEnv = TableEnvironment.create(ENVIRONMENT_SETTINGS);
@@ -133,6 +135,7 @@ class UnknownCatalogTest {
     @ValueSource(
             strings = {
                 "SHOW TABLES",
+                "SHOW MATERIALIZED TABLES",
                 "SHOW VIEWS",
                 "SHOW PROCEDURES",
                 // Here `db` is considered as object name
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java
index 07e62ca75eb..56c9e0a95e9 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.table.operations.ShowCreateCatalogOperation;
 import org.apache.flink.table.operations.ShowDatabasesOperation;
 import org.apache.flink.table.operations.ShowFunctionsOperation;
 import org.apache.flink.table.operations.ShowFunctionsOperation.FunctionScope;
+import org.apache.flink.table.operations.ShowMaterializedTablesOperation;
 import org.apache.flink.table.operations.ShowModulesOperation;
 import org.apache.flink.table.operations.ShowPartitionsOperation;
 import org.apache.flink.table.operations.ShowProceduresOperation;
@@ -73,7 +74,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
  * Test cases for the statements that neither belong to DDL nor DML for {@link
  * SqlNodeToOperationConversion}.
  */
-public class SqlOtherOperationConverterTest extends 
SqlNodeToOperationConversionTestBase {
+class SqlOtherOperationConverterTest extends 
SqlNodeToOperationConversionTestBase {
 
     @Test
     void testUseCatalog() {
@@ -307,6 +308,37 @@ public class SqlOtherOperationConverterTest extends 
SqlNodeToOperationConversion
                         "SHOW VIEWS"));
     }
 
+    @ParameterizedTest(name = "{index}: {0}")
+    @MethodSource("inputForShowMaterializedTablesTest")
+    void testShowMaterializedTables(
+            String sql, ShowMaterializedTablesOperation expected, String 
expectedSummary) {
+        Operation operation = parse(sql);
+        assertThat(operation)
+                .isInstanceOf(ShowMaterializedTablesOperation.class)
+                .isEqualTo(expected);
+        assertThat(operation.asSummaryString()).isEqualTo(expectedSummary);
+    }
+
+    private static Stream<Arguments> inputForShowMaterializedTablesTest() {
+        return Stream.of(
+                Arguments.of(
+                        "SHOW MATERIALIZED TABLES from cat1.db1 not like 't%'",
+                        new ShowMaterializedTablesOperation(
+                                "cat1",
+                                "db1",
+                                "FROM",
+                                ShowLikeOperator.of(LikeType.NOT_LIKE, "t%")),
+                        "SHOW MATERIALIZED TABLES FROM cat1.db1 NOT LIKE 
't%'"),
+                Arguments.of(
+                        "SHOW MATERIALIZED TABLES in db2",
+                        new ShowMaterializedTablesOperation("builtin", "db2", 
"IN", null),
+                        "SHOW MATERIALIZED TABLES IN builtin.db2"),
+                Arguments.of(
+                        "SHOW MATERIALIZED TABLES",
+                        new ShowMaterializedTablesOperation("builtin", 
"default", null, null),
+                        "SHOW MATERIALIZED TABLES"));
+    }
+
     @Test
     void testShowCreateCatalog() {
         Operation operation = parse("show create catalog cat1");
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlShowToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlShowToOperationConverterTest.java
index e625df03ccd..60606aa9771 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlShowToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlShowToOperationConverterTest.java
@@ -42,7 +42,14 @@ public class SqlShowToOperationConverterTest extends 
SqlNodeToOperationConversio
     }
 
     @ParameterizedTest
-    @ValueSource(strings = {"SHOW TABLES", "SHOW VIEWS", "SHOW FUNCTIONS", 
"SHOW PROCEDURES"})
+    @ValueSource(
+            strings = {
+                "SHOW TABLES",
+                "SHOW VIEWS",
+                "SHOW FUNCTIONS",
+                "SHOW PROCEDURES",
+                "SHOW MATERIALIZED TABLES"
+            })
     void testParseShowFunctionForUnsetCatalog(String sql) {
         catalogManager.setCurrentCatalog(null);
         // No exception should be thrown during parsing.
@@ -51,7 +58,14 @@ public class SqlShowToOperationConverterTest extends 
SqlNodeToOperationConversio
     }
 
     @ParameterizedTest
-    @ValueSource(strings = {"SHOW TABLES", "SHOW VIEWS", "SHOW FUNCTIONS", 
"SHOW PROCEDURES"})
+    @ValueSource(
+            strings = {
+                "SHOW TABLES",
+                "SHOW VIEWS",
+                "SHOW FUNCTIONS",
+                "SHOW PROCEDURES",
+                "SHOW MATERIALIZED TABLES"
+            })
     void testParseShowFunctionForUnsetDatabase(String sql) {
         catalogManager.setCurrentDatabase(null);
         // No exception should be thrown during parsing.


Reply via email to