This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 04b385b3a080ef8888cfdcb3071e01131223bf4f
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Wed Aug 14 13:14:52 2024 +0200

    [FLINK-36050][table] Enhance `SHOW VIEW` syntax
---
 docs/content.zh/docs/dev/table/sql/show.md         |  11 ++-
 docs/content/docs/dev/table/sql/show.md            |  11 ++-
 .../src/test/resources/sql/catalog_database.q      |  49 ++++++++-
 .../src/test/resources/sql/catalog_database.q      |  59 ++++++++++-
 .../src/main/codegen/includes/parserImpls.ftl      |  29 +++++-
 .../apache/flink/sql/parser/dql/SqlShowTables.java |  13 +--
 .../apache/flink/sql/parser/dql/SqlShowViews.java  |  70 ++++++++++++-
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  32 ++++++
 .../table/operations/ShowTablesOperation.java      |   7 +-
 .../flink/table/operations/ShowViewsOperation.java | 110 ++++++++++++++++++++-
 .../operations/SqlNodeToOperationConversion.java   |  37 +++++--
 .../operations/SqlOtherOperationConverterTest.java |  37 +++++++
 .../flink/table/api/TableEnvironmentTest.scala     |  31 ++++++
 13 files changed, 457 insertions(+), 39 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/sql/show.md 
b/docs/content.zh/docs/dev/table/sql/show.md
index 2c24cbb6759..1b9e0460987 100644
--- a/docs/content.zh/docs/dev/table/sql/show.md
+++ b/docs/content.zh/docs/dev/table/sql/show.md
@@ -913,10 +913,17 @@ SHOW PROCEDURES [ ( FROM | IN ) 
[catalog_name.]database_name ] [ [NOT] (LIKE | I
 ## SHOW VIEWS
 
 ```sql
-SHOW VIEWS
+SHOW VIEWS [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE 
<sql_like_pattern> ]
 ```
 
-展示当前 catalog 和当前 database 中所有的视图。
+Show all views for an optionally specified database. If no database is 
specified then the views are returned from the current database. Additionally, 
the output of this statement may be filtered by an optional matching pattern.
+
+**LIKE**
+Show all views with given view name and optional `LIKE` clause, whose name is 
whether similar to the `<sql_like_pattern>`.
+
+The syntax of sql pattern in `LIKE` clause is the same as that of `MySQL` 
dialect.
+* `%` matches any number of characters, even zero characters, `\%` matches one 
`%` character.
+* `_` matches exactly one character, `\_` matches one `_` character.
 
 ## SHOW CREATE VIEW
 
diff --git a/docs/content/docs/dev/table/sql/show.md 
b/docs/content/docs/dev/table/sql/show.md
index e0b3a9d7fb8..6ba1fa40828 100644
--- a/docs/content/docs/dev/table/sql/show.md
+++ b/docs/content/docs/dev/table/sql/show.md
@@ -923,10 +923,17 @@ The same behavior as `LIKE` but the SQL pattern is 
case-insensitive.
 ## SHOW VIEWS
 
 ```sql
-SHOW VIEWS
+SHOW VIEWS [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE 
<sql_like_pattern> ]
 ```
 
-Show all views in the current catalog and the current database.
+Show all views for an optionally specified database. If no database is 
specified then the views are returned from the current database. Additionally, 
the output of this statement may be filtered by an optional matching pattern.
+
+**LIKE**
+Show all views with given view name and optional `LIKE` clause, whose name is 
whether similar to the `<sql_like_pattern>`.
+
+The syntax of sql pattern in `LIKE` clause is the same as that of `MySQL` 
dialect.
+* `%` matches any number of characters, even zero characters, `\%` matches one 
`%` character.
+* `_` matches exactly one character, `\_` matches one `_` character.
 
 ## SHOW CREATE VIEW
 
diff --git 
a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q 
b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
index 0ddaa1a46ae..cde20202ee3 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
@@ -802,7 +802,7 @@ show tables;
 !ok
 
 # ==========================================================================
-# test enhanced show tables
+# test enhanced show tables and views
 # ==========================================================================
 
 create catalog catalog1 with ('type'='generic_in_memory');
@@ -829,6 +829,10 @@ create view catalog1.db1.v_person as select * from 
catalog1.db1.person;
 [INFO] Execute statement succeeded.
 !info
 
+create view catalog1.db1.v_address comment 'view comment' as select * from 
catalog1.db1.address;
+[INFO] Execute statement succeeded.
+!info
+
 show tables from catalog1.db1;
 +------------+
 | table name |
@@ -836,9 +840,20 @@ show tables from catalog1.db1;
 |    address |
 |        dim |
 |     person |
+|  v_address |
 |   v_person |
 +------------+
-4 rows in set
+5 rows in set
+!ok
+
+show views from catalog1.db1;
++-----------+
+| view name |
++-----------+
+| v_address |
+|  v_person |
++-----------+
+2 rows in set
 !ok
 
 show tables from catalog1.db1 like '%person%';
@@ -851,14 +866,33 @@ show tables from catalog1.db1 like '%person%';
 2 rows in set
 !ok
 
+show views from catalog1.db1 like '%person%';
++-----------+
+| view name |
++-----------+
+|  v_person |
++-----------+
+1 row in set
+!ok
+
 show tables in catalog1.db1 not like '%person%';
 +------------+
 | table name |
 +------------+
 |    address |
 |        dim |
+|  v_address |
 +------------+
-2 rows in set
+3 rows in set
+!ok
+
+show views in catalog1.db1 not like '%person%';
++-----------+
+| view name |
++-----------+
+| v_address |
++-----------+
+1 row in set
 !ok
 
 use catalog catalog1;
@@ -873,3 +907,12 @@ show tables from db1 like 'p_r%';
 +------------+
 1 row in set
 !ok
+
+show views from db1 like '%p_r%';
++-----------+
+| view name |
++-----------+
+|  v_person |
++-----------+
+1 row in set
+!ok
diff --git 
a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q 
b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
index c79ed374590..3053d4d6b05 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
@@ -928,7 +928,7 @@ show tables;
 !ok
 
 # ==========================================================================
-# test enhanced show tables
+# test enhanced show tables and views
 # ==========================================================================
 
 create catalog catalog1 with ('type'='generic_in_memory');
@@ -991,6 +991,16 @@ create view catalog1.db1.v_person as select * from 
catalog1.db1.person;
 1 row in set
 !ok
 
+create view catalog1.db1.v_address comment 'view comment' as select * from 
catalog1.db1.address;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
 show tables from catalog1.db1;
 !output
 +------------+
@@ -999,9 +1009,21 @@ show tables from catalog1.db1;
 |    address |
 |        dim |
 |     person |
+|  v_address |
 |   v_person |
 +------------+
-4 rows in set
+5 rows in set
+!ok
+
+show views from catalog1.db1;
+!output
++-----------+
+| view name |
++-----------+
+| v_address |
+|  v_person |
++-----------+
+2 rows in set
 !ok
 
 show tables from catalog1.db1 like '%person%';
@@ -1015,6 +1037,16 @@ show tables from catalog1.db1 like '%person%';
 2 rows in set
 !ok
 
+show views from catalog1.db1 like '%person%';
+!output
++-----------+
+| view name |
++-----------+
+|  v_person |
++-----------+
+1 row in set
+!ok
+
 show tables in catalog1.db1 not like '%person%';
 !output
 +------------+
@@ -1022,8 +1054,19 @@ show tables in catalog1.db1 not like '%person%';
 +------------+
 |    address |
 |        dim |
+|  v_address |
 +------------+
-2 rows in set
+3 rows in set
+!ok
+
+show views in catalog1.db1 not like '%person%';
+!output
++-----------+
+| view name |
++-----------+
+| v_address |
++-----------+
+1 row in set
 !ok
 
 use catalog catalog1;
@@ -1045,3 +1088,13 @@ show tables from db1 like 'p_r%';
 +------------+
 1 row in set
 !ok
+
+show views from db1 like '%p_r%';
+!output
++-----------+
+| view name |
++-----------+
+|  v_person |
++-----------+
+1 row in set
+!ok
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 2c7220a34b1..4822f73ed94 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -626,16 +626,39 @@ SqlShowProcedures SqlShowProcedures() :
 }
 
 /**
- * Parse a "Show Views" metadata query command.
+ * SHOW VIEWS FROM [catalog.] database sql call.
  */
 SqlShowViews SqlShowViews() :
 {
+    SqlIdentifier databaseName = null;
+    SqlCharStringLiteral likeLiteral = null;
+    String prep = null;
+    boolean notLike = false;
     SqlParserPos pos;
 }
 {
-    <SHOW> <VIEWS> { pos = getPos(); }
+    <SHOW> <VIEWS>
+    { pos = getPos(); }
+    [
+        ( <FROM> { prep = "FROM"; } | <IN> { prep = "IN"; } )
+        { pos = getPos(); }
+        databaseName = CompoundIdentifier()
+    ]
+    [
+        [
+            <NOT>
+            {
+                notLike = true;
+            }
+        ]
+        <LIKE> <QUOTED_STRING>
+        {
+            String likeCondition = SqlParserUtil.parseString(token.image);
+            likeLiteral = SqlLiteral.createCharString(likeCondition, getPos());
+        }
+    ]
     {
-        return new SqlShowViews(pos);
+        return new SqlShowViews(pos, prep, databaseName, notLike, likeLiteral);
     }
 }
 
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowTables.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowTables.java
index 59a9ebd82a7..09b0965b49f 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowTables.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowTables.java
@@ -101,10 +101,8 @@ public class SqlShowTables extends SqlCall {
                 : Collections.singletonList(databaseName);
     }
 
-    public String[] fullDatabaseName() {
-        return Objects.isNull(this.databaseName)
-                ? new String[] {}
-                : databaseName.names.toArray(new String[0]);
+    public List<String> fullDatabaseName() {
+        return Objects.isNull(this.databaseName) ? Collections.emptyList() : 
databaseName.names;
     }
 
     @Override
@@ -116,11 +114,8 @@ public class SqlShowTables extends SqlCall {
             databaseName.unparse(writer, leftPrec, rightPrec);
         }
         if (isWithLike()) {
-            if (isNotLike()) {
-                writer.keyword(String.format("NOT LIKE '%s'", 
getLikeSqlPattern()));
-            } else {
-                writer.keyword(String.format("LIKE '%s'", 
getLikeSqlPattern()));
-            }
+            final String prefix = isNotLike() ? "NOT " : "";
+            writer.keyword(String.format("%sLIKE '%s'", prefix, 
getLikeSqlPattern()));
         }
     }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowViews.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowViews.java
index 8b076b22bdc..22f9d2be157 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowViews.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowViews.java
@@ -19,6 +19,8 @@
 package org.apache.flink.sql.parser.dql;
 
 import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperator;
@@ -28,6 +30,9 @@ import org.apache.calcite.sql.parser.SqlParserPos;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
 
 /** SHOW VIEWS sql call. */
 public class SqlShowViews extends SqlCall {
@@ -35,8 +40,53 @@ public class SqlShowViews extends SqlCall {
     public static final SqlSpecialOperator OPERATOR =
             new SqlSpecialOperator("SHOW VIEWS", SqlKind.OTHER);
 
+    protected final String preposition;
+    protected final SqlIdentifier databaseName;
+    protected final boolean notLike;
+    protected final SqlCharStringLiteral likeLiteral;
+
     public SqlShowViews(SqlParserPos pos) {
         super(pos);
+        this.preposition = null;
+        this.databaseName = null;
+        this.notLike = false;
+        this.likeLiteral = null;
+    }
+
+    public SqlShowViews(
+            SqlParserPos pos,
+            String preposition,
+            SqlIdentifier databaseName,
+            boolean notLike,
+            SqlCharStringLiteral likeLiteral) {
+        super(pos);
+        this.preposition = preposition;
+        this.databaseName =
+                preposition != null
+                        ? requireNonNull(databaseName, "Database must not be 
null")
+                        : null;
+        this.notLike = notLike;
+        this.likeLiteral = likeLiteral;
+    }
+
+    public String getLikeSqlPattern() {
+        return likeLiteral == null ? null : 
likeLiteral.getValueAs(String.class);
+    }
+
+    public boolean isNotLike() {
+        return notLike;
+    }
+
+    public SqlCharStringLiteral getLikeLiteral() {
+        return likeLiteral;
+    }
+
+    public boolean isWithLike() {
+        return Objects.nonNull(likeLiteral);
+    }
+
+    public String getPreposition() {
+        return preposition;
     }
 
     @Override
@@ -46,11 +96,27 @@ public class SqlShowViews extends SqlCall {
 
     @Override
     public List<SqlNode> getOperandList() {
-        return Collections.EMPTY_LIST;
+        return databaseName == null
+                ? Collections.emptyList()
+                : Collections.singletonList(databaseName);
+    }
+
+    public List<String> fullDatabaseName() {
+        return databaseName == null ? Collections.emptyList() : 
databaseName.names;
     }
 
     @Override
     public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-        writer.keyword("SHOW VIEWS");
+        if (preposition == null) {
+            writer.keyword("SHOW VIEWS");
+        } else if (databaseName != null) {
+            writer.keyword("SHOW VIEWS " + preposition);
+            databaseName.unparse(writer, leftPrec, rightPrec);
+        }
+
+        if (isWithLike()) {
+            final String prefix = isNotLike() ? "NOT " : "";
+            writer.keyword(String.format("%sLIKE '%s'", prefix, 
getLikeSqlPattern()));
+        }
     }
 }
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index a880ea76c14..7e29a0548e6 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -2301,6 +2301,38 @@ class FlinkSqlParserImplTest extends SqlParserTest {
     @Test
     void testShowViews() {
         sql("show views").ok("SHOW VIEWS");
+        sql("show views not like '%'").ok("SHOW VIEWS NOT LIKE '%'");
+
+        sql("show views from db1").ok("SHOW VIEWS FROM `DB1`");
+        sql("show views in db1").ok("SHOW VIEWS IN `DB1`");
+
+        sql("show views from catalog1.db1").ok("SHOW VIEWS FROM 
`CATALOG1`.`DB1`");
+        sql("show views in catalog1.db1").ok("SHOW VIEWS IN `CATALOG1`.`DB1`");
+
+        sql("show views from db1 like '%'").ok("SHOW VIEWS FROM `DB1` LIKE 
'%'");
+        sql("show views in db1 like '%'").ok("SHOW VIEWS IN `DB1` LIKE '%'");
+
+        sql("show views from catalog1.db1 like '%'")
+                .ok("SHOW VIEWS FROM `CATALOG1`.`DB1` LIKE '%'");
+        sql("show views in catalog1.db1 like '%'").ok("SHOW VIEWS IN 
`CATALOG1`.`DB1` LIKE '%'");
+
+        sql("show views from db1 not like '%'").ok("SHOW VIEWS FROM `DB1` NOT 
LIKE '%'");
+        sql("show views in db1 not like '%'").ok("SHOW VIEWS IN `DB1` NOT LIKE 
'%'");
+
+        sql("show views from catalog1.db1 not like '%'")
+                .ok("SHOW VIEWS FROM `CATALOG1`.`DB1` NOT LIKE '%'");
+        sql("show views in catalog1.db1 not like '%'")
+                .ok("SHOW VIEWS IN `CATALOG1`.`DB1` NOT LIKE '%'");
+
+        sql("show views ^db1^").fails("(?s).*Encountered \"db1\" at line 1, 
column 12.\n.*");
+        sql("show views ^catalog1^.db1")
+                .fails("(?s).*Encountered \"catalog1\" at line 1, column 
12.\n.*");
+
+        sql("show views ^search^ db1")
+                .fails("(?s).*Encountered \"search\" at line 1, column 
12.\n.*");
+
+        sql("show views from db1 ^likes^ '%t'")
+                .fails("(?s).*Encountered \"likes\" at line 1, column 
21.\n.*");
     }
 
     @Test
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowTablesOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowTablesOperation.java
index 2cb7868491a..5b362837015 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowTablesOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowTablesOperation.java
@@ -106,11 +106,8 @@ public class ShowTablesOperation implements ShowOperation {
             builder.append(String.format(" %s %s.%s", preposition, 
catalogName, databaseName));
         }
         if (this.useLike) {
-            if (notLike) {
-                builder.append(String.format(" %s LIKE %s", "NOT", 
likePattern));
-            } else {
-                builder.append(String.format(" LIKE %s", likePattern));
-            }
+            final String prefix = notLike ? "NOT " : "";
+            builder.append(String.format(" %sLIKE '%s'", prefix, likePattern));
         }
         return builder.toString();
     }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowViewsOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowViewsOperation.java
index c1a19552c33..d609e6e6694 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowViewsOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowViewsOperation.java
@@ -19,23 +19,125 @@
 package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.functions.SqlLikeUtils;
+
+import java.util.Set;
 
 import static 
org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Operation to describe a SHOW VIEWS statement. */
 @Internal
 public class ShowViewsOperation implements ShowOperation {
 
+    private final String catalogName;
+    private final String databaseName;
+    private final boolean useLike;
+    private final boolean notLike;
+    private final String likePattern;
+    private final String preposition;
+
+    public ShowViewsOperation() {
+        catalogName = null;
+        databaseName = null;
+        useLike = false;
+        notLike = false;
+        likePattern = null;
+        preposition = null;
+    }
+
+    public ShowViewsOperation(String likePattern, boolean useLike, boolean 
notLike) {
+        this.catalogName = null;
+        this.databaseName = null;
+        this.useLike = useLike;
+        this.notLike = notLike;
+        this.likePattern =
+                useLike ? checkNotNull(likePattern, "Like pattern must not be 
null.") : null;
+        this.preposition = null;
+    }
+
+    public ShowViewsOperation(
+            String catalogName,
+            String databaseName,
+            String likePattern,
+            boolean useLike,
+            boolean notLike,
+            String preposition) {
+        this.catalogName = checkNotNull(catalogName, "Catalog name must not be 
null.");
+        this.databaseName = checkNotNull(databaseName, "Database name must not 
be null");
+        this.useLike = useLike;
+        this.notLike = notLike;
+        this.likePattern =
+                useLike ? checkNotNull(likePattern, "Like pattern must not be 
null.") : null;
+        this.preposition = checkNotNull(preposition, "Preposition must not be 
null");
+    }
+
+    public String getLikePattern() {
+        return likePattern;
+    }
+
+    public String getPreposition() {
+        return preposition;
+    }
+
+    public boolean isUseLike() {
+        return useLike;
+    }
+
+    public boolean isNotLike() {
+        return notLike;
+    }
+
+    public String getCatalogName() {
+        return catalogName;
+    }
+
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
     @Override
     public String asSummaryString() {
-        return "SHOW VIEWS";
+        StringBuilder builder = new StringBuilder().append("SHOW VIEWS");
+        if (preposition != null) {
+            builder.append(String.format(" %s %s.%s", preposition, 
catalogName, databaseName));
+        }
+        if (useLike) {
+            final String prefix = notLike ? "NOT " : "";
+            builder.append(String.format(" %sLIKE '%s'", prefix, likePattern));
+        }
+        return builder.toString();
     }
 
     @Override
     public TableResultInternal execute(Context ctx) {
-        String[] views =
-                
ctx.getCatalogManager().listViews().stream().sorted().toArray(String[]::new);
-        return buildStringArrayResult("view name", views);
+        final Set<String> views;
+        if (preposition == null) {
+            views = ctx.getCatalogManager().listViews();
+        } else {
+            Catalog catalog = 
ctx.getCatalogManager().getCatalogOrThrowException(catalogName);
+            if (catalog.databaseExists(databaseName)) {
+                views = ctx.getCatalogManager().listViews(catalogName, 
databaseName);
+            } else {
+                throw new ValidationException(
+                        String.format(
+                                "Database '%s'.'%s' doesn't exist.", 
catalogName, databaseName));
+            }
+        }
+
+        final String[] rows;
+        if (useLike) {
+            rows =
+                    views.stream()
+                            .filter(row -> notLike != SqlLikeUtils.like(row, 
likePattern, "\\"))
+                            .sorted()
+                            .toArray(String[]::new);
+        } else {
+            rows = views.stream().sorted().toArray(String[]::new);
+        }
+        return buildStringArrayResult("view name", rows);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
index 1a7e4b16317..a4937a67070 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
@@ -939,19 +939,19 @@ public class SqlNodeToOperationConversion {
                     sqlShowTables.isWithLike(),
                     sqlShowTables.isNotLike());
         }
-        String[] fullDatabaseName = sqlShowTables.fullDatabaseName();
-        if (fullDatabaseName.length > 2) {
+        List<String> fullDatabaseName = sqlShowTables.fullDatabaseName();
+        if (fullDatabaseName.size() > 2) {
             throw new ValidationException(
                     String.format(
                             "show tables from/in identifier [ %s ] format 
error",
                             String.join(".", fullDatabaseName)));
         }
         String catalogName =
-                (fullDatabaseName.length == 1)
+                (fullDatabaseName.size() == 1)
                         ? catalogManager.getCurrentCatalog()
-                        : fullDatabaseName[0];
+                        : fullDatabaseName.get(0);
         String databaseName =
-                (fullDatabaseName.length == 1) ? fullDatabaseName[0] : 
fullDatabaseName[1];
+                (fullDatabaseName.size() == 1) ? fullDatabaseName.get(0) : 
fullDatabaseName.get(1);
         return new ShowTablesOperation(
                 catalogName,
                 databaseName,
@@ -1002,7 +1002,32 @@ public class SqlNodeToOperationConversion {
 
     /** Convert SHOW VIEWS statement. */
     private Operation convertShowViews(SqlShowViews sqlShowViews) {
-        return new ShowViewsOperation();
+        if (sqlShowViews.getPreposition() == null) {
+            return new ShowViewsOperation(
+                    sqlShowViews.getLikeSqlPattern(),
+                    sqlShowViews.isWithLike(),
+                    sqlShowViews.isNotLike());
+        }
+        List<String> fullDatabaseName = sqlShowViews.fullDatabaseName();
+        if (fullDatabaseName.size() > 2) {
+            throw new ValidationException(
+                    String.format(
+                            "show views from/in identifier [ %s ] format 
error",
+                            String.join(".", fullDatabaseName)));
+        }
+        String catalogName =
+                (fullDatabaseName.size() == 1)
+                        ? catalogManager.getCurrentCatalog()
+                        : fullDatabaseName.get(0);
+        String databaseName =
+                (fullDatabaseName.size() == 1) ? fullDatabaseName.get(0) : 
fullDatabaseName.get(1);
+        return new ShowViewsOperation(
+                catalogName,
+                databaseName,
+                sqlShowViews.getLikeSqlPattern(),
+                sqlShowViews.isWithLike(),
+                sqlShowViews.isNotLike(),
+                sqlShowViews.getPreposition());
     }
 
     /** Convert RICH EXPLAIN statement. */
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java
index 0a9b76e5f74..2b641c78496 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.operations.ShowModulesOperation;
 import org.apache.flink.table.operations.ShowPartitionsOperation;
 import org.apache.flink.table.operations.ShowProceduresOperation;
 import org.apache.flink.table.operations.ShowTablesOperation;
+import org.apache.flink.table.operations.ShowViewsOperation;
 import org.apache.flink.table.operations.UnloadModuleOperation;
 import org.apache.flink.table.operations.UseCatalogOperation;
 import org.apache.flink.table.operations.UseDatabaseOperation;
@@ -199,6 +200,8 @@ public class SqlOtherOperationConverterTest extends 
SqlNodeToOperationConversion
         assertThat(showTablesOperation.getPreposition()).isEqualTo("FROM");
         assertThat(showTablesOperation.isUseLike()).isTrue();
         assertThat(showTablesOperation.isNotLike()).isTrue();
+        assertThat(showTablesOperation.asSummaryString())
+                .isEqualTo("SHOW TABLES FROM cat1.db1 NOT LIKE 't%'");
 
         final String sql2 = "SHOW TABLES in db2";
         showTablesOperation = (ShowTablesOperation) parse(sql2);
@@ -207,12 +210,46 @@ public class SqlOtherOperationConverterTest extends 
SqlNodeToOperationConversion
         assertThat(showTablesOperation.getPreposition()).isEqualTo("IN");
         assertThat(showTablesOperation.isUseLike()).isFalse();
         assertThat(showTablesOperation.isNotLike()).isFalse();
+        assertThat(showTablesOperation.asSummaryString()).isEqualTo("SHOW 
TABLES IN builtin.db2");
 
         final String sql3 = "SHOW TABLES";
         showTablesOperation = (ShowTablesOperation) parse(sql3);
         assertThat(showTablesOperation.getCatalogName()).isNull();
         assertThat(showTablesOperation.getDatabaseName()).isNull();
         assertThat(showTablesOperation.getPreposition()).isNull();
+        assertThat(showTablesOperation.asSummaryString()).isEqualTo("SHOW 
TABLES");
+    }
+
+    @Test
+    void testShowViews() {
+        final String sql = "SHOW VIEWS from cat1.db1 not like 't%'";
+        Operation operation = parse(sql);
+        assertThat(operation).isInstanceOf(ShowViewsOperation.class);
+
+        ShowViewsOperation showViewsOperation = (ShowViewsOperation) operation;
+        assertThat(showViewsOperation.getCatalogName()).isEqualTo("cat1");
+        assertThat(showViewsOperation.getDatabaseName()).isEqualTo("db1");
+        assertThat(showViewsOperation.getPreposition()).isEqualTo("FROM");
+        assertThat(showViewsOperation.isUseLike()).isTrue();
+        assertThat(showViewsOperation.isNotLike()).isTrue();
+        assertThat(showViewsOperation.asSummaryString())
+                .isEqualTo("SHOW VIEWS FROM cat1.db1 NOT LIKE 't%'");
+
+        final String sql2 = "SHOW VIEWS in db2";
+        showViewsOperation = (ShowViewsOperation) parse(sql2);
+        assertThat(showViewsOperation.getCatalogName()).isEqualTo("builtin");
+        assertThat(showViewsOperation.getDatabaseName()).isEqualTo("db2");
+        assertThat(showViewsOperation.getPreposition()).isEqualTo("IN");
+        assertThat(showViewsOperation.isUseLike()).isFalse();
+        assertThat(showViewsOperation.isNotLike()).isFalse();
+        assertThat(showViewsOperation.asSummaryString()).isEqualTo("SHOW VIEWS 
IN builtin.db2");
+
+        final String sql3 = "SHOW VIEWS";
+        showViewsOperation = (ShowViewsOperation) parse(sql3);
+        assertThat(showViewsOperation.getCatalogName()).isNull();
+        assertThat(showViewsOperation.getDatabaseName()).isNull();
+        assertThat(showViewsOperation.getPreposition()).isNull();
+        assertThat(showViewsOperation.asSummaryString()).isEqualTo("SHOW 
VIEWS");
     }
 
     @Test
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 9a14f0cc071..7c377393d69 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -1480,6 +1480,37 @@ class TableEnvironmentTest {
     checkData(util.Arrays.asList(Row.of("person")).iterator(), 
tableResult3.collect())
   }
 
+  @Test
+  def testExecuteSqlWithEnhancedShowViews(): Unit = {
+    val createCatalogResult =
+      tableEnv.executeSql("CREATE CATALOG catalog1 
WITH('type'='generic_in_memory')")
+    assertEquals(ResultKind.SUCCESS, createCatalogResult.getResultKind)
+
+    val createDbResult = tableEnv.executeSql("CREATE database catalog1.db1")
+    assertEquals(ResultKind.SUCCESS, createDbResult.getResultKind)
+
+    val createTableStmt =
+      """
+        |CREATE VIEW catalog1.db1.view1 AS SELECT 1, 'abc'
+      """.stripMargin
+    val tableResult1 = tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val createTableStmt2 =
+      """
+        |CREATE VIEW catalog1.db1.view2 AS SELECT 123
+      """.stripMargin
+    val tableResult2 = tableEnv.executeSql(createTableStmt2)
+    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+    val tableResult3 = tableEnv.executeSql("SHOW VIEWS FROM catalog1.db1 like 
'%w1'")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
+    assertEquals(
+      ResolvedSchema.of(Column.physical("view name", DataTypes.STRING())),
+      tableResult3.getResolvedSchema)
+    checkData(util.Arrays.asList(Row.of("view1")).iterator(), 
tableResult3.collect())
+  }
+
   @Test
   def testExecuteSqlWithShowFunctions(): Unit = {
     val tableResult = tableEnv.executeSql("SHOW FUNCTIONS")

Reply via email to