This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 36366f7  [FLINK-21579][table-api] Support "SHOW USER FUNCTIONS" 
statement in Table API and SQL Client
36366f7 is described below

commit 36366f77c81f7ed25e989d7495e3394aa1707788
Author: yulei0824 <[email protected]>
AuthorDate: Mon Mar 15 22:28:31 2021 +0800

    [FLINK-21579][table-api] Support "SHOW USER FUNCTIONS" statement in Table 
API and SQL Client
    
    This closes #15110
---
 docs/content.zh/docs/dev/table/sql/show.md         |  49 +-
 docs/content/docs/dev/table/sql/show.md            |  49 +-
 .../apache/flink/table/client/cli/CliClient.java   |   6 +-
 .../apache/flink/table/client/cli/CliStrings.java  |   2 +-
 .../flink/table/client/cli/SqlCommandParser.java   |   1 -
 .../table/client/cli/SqlCommandParserTest.java     |  16 +
 .../src/test/resources/sql/function.q              | 987 +--------------------
 .../flink-sql-client/src/test/resources/sql/misc.q |   2 +-
 .../src/main/codegen/includes/parserImpls.ftl      |  16 +-
 .../parser/hive/FlinkHiveSqlParserImplTest.java    |   2 +-
 .../src/main/codegen/includes/parserImpls.ftl      |  15 +-
 .../flink/sql/parser/dql/SqlShowFunctions.java     |  24 +-
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |   3 +-
 .../table/api/internal/TableEnvironmentImpl.java   |  21 +-
 .../table/operations/ShowFunctionsOperation.java   |  36 +-
 .../operations/SqlToOperationConverter.java        |   4 +-
 .../operations/SqlToOperationConverterTest.java    |  21 +
 .../flink/table/api/TableEnvironmentTest.scala     |  12 +
 .../table/sqlexec/SqlToOperationConverter.java     |   4 +-
 .../flink/table/api/internal/TableEnvImpl.scala    |  15 +-
 .../table/sqlexec/SqlToOperationConverterTest.java |  21 +
 21 files changed, 282 insertions(+), 1024 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/sql/show.md 
b/docs/content.zh/docs/dev/table/sql/show.md
index 79003bb..c398bf9 100644
--- a/docs/content.zh/docs/dev/table/sql/show.md
+++ b/docs/content.zh/docs/dev/table/sql/show.md
@@ -28,7 +28,7 @@ under the License.
 
 
 
-SHOW 语句用于列出所有的 catalog,或者列出当前 catalog 中所有的 database,或者列出当前 catalog 和当前 
database 的所有表或视图,或者列出当前正在使用的 catalog 和 database, 或者列出所有的 function,包括:临时系统 
function,系统 function,临时 catalog function,当前 catalog 和 database 中的 catalog 
function。
+SHOW 语句用于列出所有的 catalog,或者列出当前 catalog 中所有的 database,或者列出当前 catalog 和当前 
database 的所有表或视图,或者列出当前正在使用的 catalog 和 database, 或者列出当前 catalog 和当前 database 
中所有的 function,包括:系统 function 和用户定义的 function,或者仅仅列出当前 catalog 和当前 database 
中用户定义的 function。
 
 目前 Flink SQL 支持下列 SHOW 语句:
 - SHOW CATALOGS
@@ -138,6 +138,17 @@ tEnv.executeSql("SHOW FUNCTIONS").print();
 // |           ... |
 // +---------------+
 
+// create a user defined function
+tEnv.executeSql("CREATE FUNCTION f1 AS ...");
+// show user defined functions
+tEnv.executeSql("SHOW USER FUNCTIONS").print();
+// +---------------+
+// | function name |
+// +---------------+
+// |            f1 |
+// |           ... |
+// +---------------+
+
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -191,6 +202,17 @@ tEnv.executeSql("SHOW FUNCTIONS").print()
 // |           ... |
 // +---------------+
 
+// create a user defined function
+tEnv.executeSql("CREATE FUNCTION f1 AS ...")
+// show user defined functions
+tEnv.executeSql("SHOW USER FUNCTIONS").print()
+// +---------------+
+// | function name |
+// +---------------+
+// |            f1 |
+// |           ... |
+// +---------------+
+
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
@@ -244,6 +266,17 @@ table_env.execute_sql("SHOW FUNCTIONS").print()
 # |           ... |
 # +---------------+
 
+# create a user defined function
+table_env.execute_sql("CREATE FUNCTION f1 AS ...")
+# show user defined functions
+table_env.execute_sql("SHOW USER FUNCTIONS").print()
+# +---------------+
+# | function name |
+# +---------------+
+# |            f1 |
+# |           ... |
+# +---------------+
+
 ```
 {{< /tab >}}
 {{< tab "SQL CLI" >}}
@@ -272,6 +305,13 @@ mod
 sha256
 ...
 
+Flink SQL> CREATE FUNCTION f1 AS ...;
+[INFO] Function has been created.
+
+Flink SQL> SHOW USER FUNCTIONS;
+f1
+...
+
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -329,7 +369,10 @@ SHOW VIEWS
 ## SHOW FUNCTIONS
 
 ```sql
-SHOW FUNCTIONS
+SHOW [USER] FUNCTIONS
 ```
 
-展示所有的 function,包括:临时系统 function, 系统 function, 临时 catalog function,当前 catalog 和 
database 中的 catalog function。
+展示当前 catalog 和当前 database 中所有的 function,包括:系统 function 和用户定义的 function。
+
+**USER**
+仅仅展示当前 catalog 和当前 database 中用户定义的 function。
diff --git a/docs/content/docs/dev/table/sql/show.md 
b/docs/content/docs/dev/table/sql/show.md
index f1e5d5d..d972d0f 100644
--- a/docs/content/docs/dev/table/sql/show.md
+++ b/docs/content/docs/dev/table/sql/show.md
@@ -26,7 +26,7 @@ under the License.
 
 # SHOW Statements
 
-SHOW statements are used to list all catalogs, or list all databases in the 
current catalog, or list all tables/views in the current catalog and the 
current database, or show current catalog and database, or list all functions 
including temp system functions, system functions, temp catalog functions and 
catalog functions in the current catalog and the current database.
+SHOW statements are used to list all catalogs, or list all databases in the 
current catalog, or list all tables/views in the current catalog and the 
current database, or show current catalog and database, or list all functions 
including system functions and user-defined functions in the current catalog 
and current database, or list only user-defined functions in the current 
catalog and current database.
 
 Flink SQL supports the following SHOW statements for now:
 - SHOW CATALOGS
@@ -138,6 +138,17 @@ tEnv.executeSql("SHOW FUNCTIONS").print();
 // |           ... |
 // +---------------+
 
+// create a user defined function
+tEnv.executeSql("CREATE FUNCTION f1 AS ...");
+// show user defined functions
+tEnv.executeSql("SHOW USER FUNCTIONS").print();
+// +---------------+
+// | function name |
+// +---------------+
+// |            f1 |
+// |           ... |
+// +---------------+
+
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -191,6 +202,17 @@ tEnv.executeSql("SHOW FUNCTIONS").print()
 // |           ... |
 // +---------------+
 
+// create a user defined function
+tEnv.executeSql("CREATE FUNCTION f1 AS ...")
+// show user defined functions
+tEnv.executeSql("SHOW USER FUNCTIONS").print()
+// +---------------+
+// | function name |
+// +---------------+
+// |            f1 |
+// |           ... |
+// +---------------+
+
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
@@ -244,6 +266,17 @@ table_env.execute_sql("SHOW FUNCTIONS").print()
 # |           ... |
 # +---------------+
 
+# create a user defined function
+table_env.execute_sql("CREATE FUNCTION f1 AS ...")
+# show user defined functions
+table_env.execute_sql("SHOW USER FUNCTIONS").print()
+# +---------------+
+# | function name |
+# +---------------+
+# |            f1 |
+# |           ... |
+# +---------------+
+
 ```
 {{< /tab >}}
 {{< tab "SQL CLI" >}}
@@ -272,6 +305,13 @@ mod
 sha256
 ...
 
+Flink SQL> CREATE FUNCTION f1 AS ...;
+[INFO] Function has been created.
+
+Flink SQL> SHOW USER FUNCTIONS;
+f1
+...
+
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -329,7 +369,10 @@ Show all views in the current catalog and the current 
database.
 ## SHOW FUNCTIONS
 
 ```sql
-SHOW FUNCTIONS
+SHOW [USER] FUNCTIONS
 ```
 
-Show all functions including temp system functions, system functions, temp 
catalog functions and catalog functions in the current catalog and current 
database.
+Show all functions including system functions and user-defined functions in 
the current catalog and current database.
+
+**USER**
+Show only user-defined functions in the current catalog and current database.
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index 9bcf055..c10b548 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -312,7 +312,7 @@ public class CliClient implements AutoCloseable {
                 callShowViews();
                 break;
             case SHOW_FUNCTIONS:
-                callShowFunctions();
+                callShowFunctions(cmdCall);
                 break;
             case SHOW_MODULES:
                 callShowModules(cmdCall);
@@ -568,10 +568,10 @@ public class CliClient implements AutoCloseable {
         terminal.flush();
     }
 
-    private void callShowFunctions() {
+    private void callShowFunctions(SqlCommandCall cmdCall) {
         final List<String> functions;
         try {
-            functions = getShowResult("FUNCTIONS");
+            functions = getShowResult(cmdCall);
         } catch (SqlExecutionException e) {
             printExecutionException(e);
             return;
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
index 6a9299c..c4bf43b 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -89,7 +89,7 @@ public final class CliStrings {
                     .append(
                             formatCommand(
                                     SqlCommand.SHOW_FUNCTIONS,
-                                    "Shows all user-defined and built-in 
functions."))
+                                    "Shows all user-defined and built-in 
functions or only user-defined functions. Syntax: 'SHOW [USER] FUNCTIONS;'"))
                     .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all 
registered tables."))
                     .append(
                             formatCommand(
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
index 3c2b21e..64ceda9 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
@@ -165,7 +165,6 @@ public final class SqlCommandParser {
             operands = new String[0];
         } else if (operation instanceof ShowFunctionsOperation) {
             cmd = SqlCommand.SHOW_FUNCTIONS;
-            operands = new String[0];
         } else if (operation instanceof ShowPartitionsOperation) {
             cmd = SqlCommand.SHOW_PARTITIONS;
         } else if (operation instanceof CreateCatalogFunctionOperation
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
index bf42c5e..305d40a 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
@@ -334,6 +334,22 @@ public class SqlCommandParserTest {
                         // show full modules
                         TestItem.validSql(
                                 "SHOW FULL MODULES", SqlCommand.SHOW_MODULES, 
"SHOW FULL MODULES"),
+                        // show functions
+                        TestItem.validSql(
+                                "SHOW FUNCTIONS;", SqlCommand.SHOW_FUNCTIONS, 
"SHOW FUNCTIONS"),
+                        TestItem.validSql(
+                                "  SHOW FUNCTIONS   ;",
+                                SqlCommand.SHOW_FUNCTIONS,
+                                "SHOW FUNCTIONS"),
+                        // show user functions
+                        TestItem.validSql(
+                                "SHOW USER FUNCTIONS;",
+                                SqlCommand.SHOW_FUNCTIONS,
+                                "SHOW USER FUNCTIONS"),
+                        TestItem.validSql(
+                                "  SHOW USER FUNCTIONS   ;",
+                                SqlCommand.SHOW_FUNCTIONS,
+                                "SHOW USER FUNCTIONS"),
                         // Test create function.
                         TestItem.invalidSql(
                                 "CREATE FUNCTION ",
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/function.q 
b/flink-table/flink-sql-client/src/test/resources/sql/function.q
index f601379..1634854 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/function.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/function.q
@@ -20,148 +20,8 @@ create function func1 as 'LowerUDF' LANGUAGE JAVA;
 [INFO] Function has been created.
 !info
 
-# TODO: [FLINK-21579] optimize this by introducing SHOW USER FUNCTIONS
-# should contain func1
-show functions;
-IFNULL
-TYPEOF
-abs
-acos
-and
-array
-as
-asc
-asin
-at
-atan
-atan2
-avg
-between
-bin
-cardinality
-cast
-ceil
-charLength
-collect
-concat
-concat_ws
-cos
-cosh
-cot
-count
-currentDate
-currentRange
-currentRow
-currentTime
-currentTimestamp
-dateFormat
-degrees
-desc
-distinct
-divide
-e
-element
-end
-equals
-exp
-extract
-flatten
-floor
-fromBase64
+show user functions;
 func1
-get
-greaterThan
-greaterThanOrEqual
-hex
-ifThenElse
-in
-initCap
-isFalse
-isNotFalse
-isNotNull
-isNotTrue
-isNull
-isTrue
-lessThan
-lessThanOrEqual
-like
-ln
-localTime
-localTimestamp
-log
-log10
-log2
-lower
-lowerCase
-lpad
-ltrim
-map
-max
-md5
-min
-minus
-minusPrefix
-mod
-not
-notBetween
-notEquals
-or
-over
-overlay
-pi
-plus
-position
-power
-proctime
-radians
-rand
-randInteger
-rangeTo
-regexpExtract
-regexpReplace
-reinterpretCast
-repeat
-replace
-round
-row
-rowtime
-rpad
-rtrim
-sha1
-sha2
-sha224
-sha256
-sha384
-sha512
-sign
-similar
-sin
-sinh
-sqrt
-start
-stddevPop
-stddevSamp
-streamRecordTimestamp
-substring
-sum
-sum0
-tan
-tanh
-temporalOverlaps
-times
-timestampDiff
-toBase64
-trim
-truncate
-unboundedRange
-unboundedRow
-upper
-upperCase
-uuid
-varPop
-varSamp
-withColumns
-withoutColumns
 !ok
 
 SET execution.result-mode=tableau;
@@ -185,147 +45,9 @@ create temporary function if not exists func2 as 
'LowerUDF' LANGUAGE JAVA;
 [INFO] Function has been created.
 !info
 
-show functions;
-IFNULL
-TYPEOF
-abs
-acos
-and
-array
-as
-asc
-asin
-at
-atan
-atan2
-avg
-between
-bin
-cardinality
-cast
-ceil
-charLength
-collect
-concat
-concat_ws
-cos
-cosh
-cot
-count
-currentDate
-currentRange
-currentRow
-currentTime
-currentTimestamp
-dateFormat
-degrees
-desc
-distinct
-divide
-e
-element
-end
-equals
-exp
-extract
-flatten
-floor
-fromBase64
+show user functions;
 func1
 func2
-get
-greaterThan
-greaterThanOrEqual
-hex
-ifThenElse
-in
-initCap
-isFalse
-isNotFalse
-isNotNull
-isNotTrue
-isNull
-isTrue
-lessThan
-lessThanOrEqual
-like
-ln
-localTime
-localTimestamp
-log
-log10
-log2
-lower
-lowerCase
-lpad
-ltrim
-map
-max
-md5
-min
-minus
-minusPrefix
-mod
-not
-notBetween
-notEquals
-or
-over
-overlay
-pi
-plus
-position
-power
-proctime
-radians
-rand
-randInteger
-rangeTo
-regexpExtract
-regexpReplace
-reinterpretCast
-repeat
-replace
-round
-row
-rowtime
-rpad
-rtrim
-sha1
-sha2
-sha224
-sha256
-sha384
-sha512
-sign
-similar
-sin
-sinh
-sqrt
-start
-stddevPop
-stddevSamp
-streamRecordTimestamp
-substring
-sum
-sum0
-tan
-tanh
-temporalOverlaps
-times
-timestampDiff
-toBase64
-trim
-truncate
-unboundedRange
-unboundedRow
-upper
-upperCase
-uuid
-varPop
-varSamp
-withColumns
-withoutColumns
 !ok
 
 # ====== test function with full qualified name ======
@@ -355,147 +77,9 @@ create temporary function if not exists c1.db.func4 as 
'LowerUDF' LANGUAGE JAVA;
 !info
 
 # no func3 and func4 because we are not under catalog c1
-show functions;
-IFNULL
-TYPEOF
-abs
-acos
-and
-array
-as
-asc
-asin
-at
-atan
-atan2
-avg
-between
-bin
-cardinality
-cast
-ceil
-charLength
-collect
-concat
-concat_ws
-cos
-cosh
-cot
-count
-currentDate
-currentRange
-currentRow
-currentTime
-currentTimestamp
-dateFormat
-degrees
-desc
-distinct
-divide
-e
-element
-end
-equals
-exp
-extract
-flatten
-floor
-fromBase64
+show user functions;
 func1
 func2
-get
-greaterThan
-greaterThanOrEqual
-hex
-ifThenElse
-in
-initCap
-isFalse
-isNotFalse
-isNotNull
-isNotTrue
-isNull
-isTrue
-lessThan
-lessThanOrEqual
-like
-ln
-localTime
-localTimestamp
-log
-log10
-log2
-lower
-lowerCase
-lpad
-ltrim
-map
-max
-md5
-min
-minus
-minusPrefix
-mod
-not
-notBetween
-notEquals
-or
-over
-overlay
-pi
-plus
-position
-power
-proctime
-radians
-rand
-randInteger
-rangeTo
-regexpExtract
-regexpReplace
-reinterpretCast
-repeat
-replace
-round
-row
-rowtime
-rpad
-rtrim
-sha1
-sha2
-sha224
-sha256
-sha384
-sha512
-sign
-similar
-sin
-sinh
-sqrt
-start
-stddevPop
-stddevSamp
-streamRecordTimestamp
-substring
-sum
-sum0
-tan
-tanh
-temporalOverlaps
-times
-timestampDiff
-toBase64
-trim
-truncate
-unboundedRange
-unboundedRow
-upper
-upperCase
-uuid
-varPop
-varSamp
-withColumns
-withoutColumns
 !ok
 
 use catalog c1;
@@ -506,148 +90,10 @@ use db;
 [INFO] Database changed.
 !info
 
-# should contain func3 and func4 now
-show functions;
-IFNULL
-TYPEOF
-abs
-acos
-and
-array
-as
-asc
-asin
-at
-atan
-atan2
-avg
-between
-bin
-cardinality
-cast
-ceil
-charLength
-collect
-concat
-concat_ws
-cos
-cosh
-cot
-count
-currentDate
-currentRange
-currentRow
-currentTime
-currentTimestamp
-dateFormat
-degrees
-desc
-distinct
-divide
-e
-element
-end
-equals
-exp
-extract
-flatten
-floor
-fromBase64
+# should show func3 and func4 now
+show user functions;
 func3
 func4
-get
-greaterThan
-greaterThanOrEqual
-hex
-ifThenElse
-in
-initCap
-isFalse
-isNotFalse
-isNotNull
-isNotTrue
-isNull
-isTrue
-lessThan
-lessThanOrEqual
-like
-ln
-localTime
-localTimestamp
-log
-log10
-log2
-lower
-lowerCase
-lpad
-ltrim
-map
-max
-md5
-min
-minus
-minusPrefix
-mod
-not
-notBetween
-notEquals
-or
-over
-overlay
-pi
-plus
-position
-power
-proctime
-radians
-rand
-randInteger
-rangeTo
-regexpExtract
-regexpReplace
-reinterpretCast
-repeat
-replace
-round
-row
-rowtime
-rpad
-rtrim
-sha1
-sha2
-sha224
-sha256
-sha384
-sha512
-sign
-similar
-sin
-sinh
-sqrt
-start
-stddevPop
-stddevSamp
-streamRecordTimestamp
-substring
-sum
-sum0
-tan
-tanh
-temporalOverlaps
-times
-timestampDiff
-toBase64
-trim
-truncate
-unboundedRange
-unboundedRow
-upper
-upperCase
-uuid
-varPop
-varSamp
-withColumns
-withoutColumns
 !ok
 
 # test create function with database name
@@ -663,148 +109,10 @@ use `default`;
 [INFO] Database changed.
 !info
 
-# should contain func5 and func6
-show functions;
-IFNULL
-TYPEOF
-abs
-acos
-and
-array
-as
-asc
-asin
-at
-atan
-atan2
-avg
-between
-bin
-cardinality
-cast
-ceil
-charLength
-collect
-concat
-concat_ws
-cos
-cosh
-cot
-count
-currentDate
-currentRange
-currentRow
-currentTime
-currentTimestamp
-dateFormat
-degrees
-desc
-distinct
-divide
-e
-element
-end
-equals
-exp
-extract
-flatten
-floor
-fromBase64
+# should show func5 and func6
+show user functions;
 func5
 func6
-get
-greaterThan
-greaterThanOrEqual
-hex
-ifThenElse
-in
-initCap
-isFalse
-isNotFalse
-isNotNull
-isNotTrue
-isNull
-isTrue
-lessThan
-lessThanOrEqual
-like
-ln
-localTime
-localTimestamp
-log
-log10
-log2
-lower
-lowerCase
-lpad
-ltrim
-map
-max
-md5
-min
-minus
-minusPrefix
-mod
-not
-notBetween
-notEquals
-or
-over
-overlay
-pi
-plus
-position
-power
-proctime
-radians
-rand
-randInteger
-rangeTo
-regexpExtract
-regexpReplace
-reinterpretCast
-repeat
-replace
-round
-row
-rowtime
-rpad
-rtrim
-sha1
-sha2
-sha224
-sha256
-sha384
-sha512
-sign
-similar
-sin
-sinh
-sqrt
-start
-stddevPop
-stddevSamp
-streamRecordTimestamp
-substring
-sum
-sum0
-tan
-tanh
-temporalOverlaps
-times
-timestampDiff
-toBase64
-trim
-truncate
-unboundedRange
-unboundedRow
-upper
-upperCase
-uuid
-varPop
-varSamp
-withColumns
-withoutColumns
 !ok
 
 # ==========================================================================
@@ -836,148 +144,10 @@ drop function if exists non_func;
 !info
 
 # should contain func11, not contain func10
-show functions;
-IFNULL
-TYPEOF
-abs
-acos
-and
-array
-as
-asc
-asin
-at
-atan
-atan2
-avg
-between
-bin
-cardinality
-cast
-ceil
-charLength
-collect
-concat
-concat_ws
-cos
-cosh
-cot
-count
-currentDate
-currentRange
-currentRow
-currentTime
-currentTimestamp
-dateFormat
-degrees
-desc
-distinct
-divide
-e
-element
-end
-equals
-exp
-extract
-flatten
-floor
-fromBase64
+show user functions;
 func11
 func3
 func4
-get
-greaterThan
-greaterThanOrEqual
-hex
-ifThenElse
-in
-initCap
-isFalse
-isNotFalse
-isNotNull
-isNotTrue
-isNull
-isTrue
-lessThan
-lessThanOrEqual
-like
-ln
-localTime
-localTimestamp
-log
-log10
-log2
-lower
-lowerCase
-lpad
-ltrim
-map
-max
-md5
-min
-minus
-minusPrefix
-mod
-not
-notBetween
-notEquals
-or
-over
-overlay
-pi
-plus
-position
-power
-proctime
-radians
-rand
-randInteger
-rangeTo
-regexpExtract
-regexpReplace
-reinterpretCast
-repeat
-replace
-round
-row
-rowtime
-rpad
-rtrim
-sha1
-sha2
-sha224
-sha256
-sha384
-sha512
-sign
-similar
-sin
-sinh
-sqrt
-start
-stddevPop
-stddevSamp
-streamRecordTimestamp
-substring
-sum
-sum0
-tan
-tanh
-temporalOverlaps
-times
-timestampDiff
-toBase64
-trim
-truncate
-unboundedRange
-unboundedRow
-upper
-upperCase
-uuid
-varPop
-varSamp
-withColumns
-withoutColumns
 !ok
 
 # ==========================================================================
@@ -1017,145 +187,6 @@ create function lowerudf AS 'LowerUDF';
 [INFO] Function has been created.
 !info
 
-# should contain lowerudf function
-show functions;
-IFNULL
-TYPEOF
-abs
-acos
-and
-array
-as
-asc
-asin
-at
-atan
-atan2
-avg
-between
-bin
-cardinality
-cast
-ceil
-charLength
-collect
-concat
-concat_ws
-cos
-cosh
-cot
-count
-currentDate
-currentRange
-currentRow
-currentTime
-currentTimestamp
-dateFormat
-degrees
-desc
-distinct
-divide
-e
-element
-end
-equals
-exp
-extract
-flatten
-floor
-fromBase64
-get
-greaterThan
-greaterThanOrEqual
-hex
-ifThenElse
-in
-initCap
-isFalse
-isNotFalse
-isNotNull
-isNotTrue
-isNull
-isTrue
-lessThan
-lessThanOrEqual
-like
-ln
-localTime
-localTimestamp
-log
-log10
-log2
-lower
-lowerCase
+show user functions;
 lowerudf
-lpad
-ltrim
-map
-max
-md5
-min
-minus
-minusPrefix
-mod
-not
-notBetween
-notEquals
-or
-over
-overlay
-pi
-plus
-position
-power
-proctime
-radians
-rand
-randInteger
-rangeTo
-regexpExtract
-regexpReplace
-reinterpretCast
-repeat
-replace
-round
-row
-rowtime
-rpad
-rtrim
-sha1
-sha2
-sha224
-sha256
-sha384
-sha512
-sign
-similar
-sin
-sinh
-sqrt
-start
-stddevPop
-stddevSamp
-streamRecordTimestamp
-substring
-sum
-sum0
-tan
-tanh
-temporalOverlaps
-times
-timestampDiff
-toBase64
-trim
-truncate
-unboundedRange
-unboundedRow
-upper
-upperCase
-uuid
-varPop
-varSamp
-withColumns
-withoutColumns
 !ok
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/misc.q 
b/flink-table/flink-sql-client/src/test/resources/sql/misc.q
index af81b1a..471f61c 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/misc.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/misc.q
@@ -33,7 +33,7 @@ QUIT          Quits the SQL CLI client.
 RESET          Resets all session configuration properties.
 SELECT         Executes a SQL SELECT query on the Flink cluster.
 SET            Sets a session configuration property. Syntax: 'SET 
<key>=<value>;'. Use 'SET;' for listing all properties.
-SHOW FUNCTIONS         Shows all user-defined and built-in functions.
+SHOW FUNCTIONS         Shows all user-defined and built-in functions or only 
user-defined functions. Syntax: 'SHOW [USER] FUNCTIONS;'
 SHOW TABLES            Shows all registered tables.
 SOURCE         Reads a SQL SELECT query from a file and executes it on the 
Flink cluster.
 USE CATALOG            Sets the current catalog. The current database is set 
to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'
diff --git 
a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
index 2c1fee7..4866eb0 100644
--- 
a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
+++ 
b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
@@ -1035,18 +1035,22 @@ SqlDrop SqlDropFunction(Span s, boolean replace) :
 }
 
 /**
- * Hive syntax:
- *
- * SHOW FUNCTIONS [LIKE "<pattern>"];
- */
+* Parses a show functions statement.
+* SHOW [USER] FUNCTIONS;
+*/
 SqlShowFunctions SqlShowFunctions() :
 {
     SqlParserPos pos;
+    boolean requireUser = false;
 }
 {
-    <SHOW> <FUNCTIONS> { pos = getPos();}
+    <SHOW> { pos = getPos();}
+    [
+        <USER> { requireUser = true; }
+    ]
+    <FUNCTIONS>
     {
-        return new SqlShowFunctions(pos, null);
+        return new SqlShowFunctions(pos.plus(getPos()), requireUser);
     }
 }
 
diff --git 
a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
 
b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
index 74ea329..0f5441f 100644
--- 
a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
@@ -250,8 +250,8 @@ public class FlinkHiveSqlParserImplTest extends 
SqlParserTest {
 
     @Test
     public void testShowFunctions() {
-        // TODO: support SHOW FUNCTIONS LIKE 'regex_pattern'
         sql("show functions").ok("SHOW FUNCTIONS");
+        sql("show user functions").ok("SHOW USER FUNCTIONS");
     }
 
     @Test
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 2c17b30..ab8af2f 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -368,16 +368,23 @@ SqlAlterFunction SqlAlterFunction() :
     }
 }
 
+/**
+* Parses a show functions statement.
+* SHOW [USER] FUNCTIONS;
+*/
 SqlShowFunctions SqlShowFunctions() :
 {
-    SqlIdentifier database = null;
     SqlParserPos pos;
+    boolean requireUser = false;
 }
 {
-    <SHOW> <FUNCTIONS> { pos = getPos();}
-    [database = CompoundIdentifier()]
+    <SHOW> { pos = getPos();}
+    [
+        <USER> { requireUser = true; }
+    ]
+    <FUNCTIONS>
     {
-        return new SqlShowFunctions(pos, database);
+        return new SqlShowFunctions(pos.plus(getPos()), requireUser);
     }
 }
 
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowFunctions.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowFunctions.java
index 2f448f5..8447984 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowFunctions.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowFunctions.java
@@ -19,28 +19,27 @@
 package org.apache.flink.sql.parser.dql;
 
 import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlSpecialOperator;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.util.ImmutableNullableList;
 
+import java.util.Collections;
 import java.util.List;
 
-/** SHOW FUNCTION Sql Call. */
+/** SHOW [USER] FUNCTIONS Sql Call. */
 public class SqlShowFunctions extends SqlCall {
 
     public static final SqlSpecialOperator OPERATOR =
             new SqlSpecialOperator("SHOW FUNCTIONS", SqlKind.OTHER);
 
-    private final SqlIdentifier databaseName;
+    private final boolean requireUser;
 
-    public SqlShowFunctions(SqlParserPos pos, SqlIdentifier database) {
+    public SqlShowFunctions(SqlParserPos pos, boolean requireUser) {
         super(pos);
-        this.databaseName = database;
+        this.requireUser = requireUser;
     }
 
     @Override
@@ -50,18 +49,19 @@ public class SqlShowFunctions extends SqlCall {
 
     @Override
     public List<SqlNode> getOperandList() {
-        return ImmutableNullableList.of(databaseName);
+        return Collections.EMPTY_LIST;
     }
 
     @Override
     public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-        writer.keyword("SHOW FUNCTIONS");
-        if (databaseName != null) {
-            databaseName.unparse(writer, leftPrec, rightPrec);
+        if (requireUser) {
+            writer.keyword("SHOW USER FUNCTIONS");
+        } else {
+            writer.keyword("SHOW FUNCTIONS");
         }
     }
 
-    public String[] getDatabasePath() {
-        return databaseName.names.toArray(new String[0]);
+    public boolean requireUser() {
+        return requireUser;
     }
 }
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 6c8e18b..7c5675d 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -177,8 +177,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
     @Test
     public void testShowFunctions() {
         sql("show functions").ok("SHOW FUNCTIONS");
-        sql("show functions db1").ok("SHOW FUNCTIONS `DB1`");
-        sql("show functions catalog1.db1").ok("SHOW FUNCTIONS 
`CATALOG1`.`DB1`");
+        sql("show user functions").ok("SHOW USER FUNCTIONS");
     }
 
     @Test
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index dd1d562..1c0f628 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -188,7 +188,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
             "Unsupported SQL query! executeSql() only accepts a single SQL 
statement of type "
                     + "CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, 
DROP DATABASE, ALTER DATABASE, "
                     + "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE 
CATALOG, DROP CATALOG, "
-                    + "USE CATALOG, USE [CATALOG.]DATABASE, SHOW CATALOGS, 
SHOW DATABASES, SHOW TABLES, SHOW FUNCTIONS, SHOW PARTITIONS"
+                    + "USE CATALOG, USE [CATALOG.]DATABASE, SHOW CATALOGS, 
SHOW DATABASES, SHOW TABLES, SHOW [USER] FUNCTIONS, SHOW PARTITIONS"
                     + "CREATE VIEW, DROP VIEW, SHOW VIEWS, INSERT, DESCRIBE, 
LOAD MODULE, UNLOAD "
                     + "MODULE, USE MODULES, SHOW [FULL] MODULES.";
 
@@ -1110,7 +1110,24 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
         } else if (operation instanceof ShowTablesOperation) {
             return buildShowResult("table name", listTables());
         } else if (operation instanceof ShowFunctionsOperation) {
-            return buildShowResult("function name", listFunctions());
+            ShowFunctionsOperation showFunctionsOperation = 
(ShowFunctionsOperation) operation;
+            String[] functionNames = null;
+            ShowFunctionsOperation.FunctionScope functionScope =
+                    showFunctionsOperation.getFunctionScope();
+            switch (functionScope) {
+                case USER:
+                    functionNames = listUserDefinedFunctions();
+                    break;
+                case ALL:
+                    functionNames = listFunctions();
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            String.format(
+                                    "SHOW FUNCTIONS with %s scope is not 
supported.",
+                                    functionScope));
+            }
+            return buildShowResult("function name", functionNames);
         } else if (operation instanceof ShowViewsOperation) {
             return buildShowResult("view name", listViews());
         } else if (operation instanceof ShowPartitionsOperation) {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowFunctionsOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowFunctionsOperation.java
index 00d3045..466b0e7 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowFunctionsOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowFunctionsOperation.java
@@ -18,11 +18,43 @@
 
 package org.apache.flink.table.operations;
 
-/** Operation to describe a SHOW FUNCTIONS statement. */
+/** Operation to describe a SHOW [USER] FUNCTIONS statement. */
 public class ShowFunctionsOperation implements ShowOperation {
 
+    /**
+     * Represent scope of function.
+     *
+     * <ul>
+     *   <li><b>USER</b> return only user-defined functions
+     *   <li><b>ALL</b> return all user-defined and built-in functions
+     * </ul>
+     */
+    public enum FunctionScope {
+        USER,
+        ALL
+    }
+
+    private final FunctionScope functionScope;
+
+    public ShowFunctionsOperation() {
+        // "SHOW FUNCTIONS" default is ALL scope
+        this.functionScope = FunctionScope.ALL;
+    }
+
+    public ShowFunctionsOperation(FunctionScope functionScope) {
+        this.functionScope = functionScope;
+    }
+
     @Override
     public String asSummaryString() {
-        return "SHOW FUNCTIONS";
+        if (functionScope == FunctionScope.ALL) {
+            return "SHOW FUNCTIONS";
+        } else {
+            return String.format("SHOW %s FUNCTIONS", functionScope);
+        }
+    }
+
+    public FunctionScope getFunctionScope() {
+        return functionScope;
     }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 7614268..62ad70f 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -91,6 +91,7 @@ import 
org.apache.flink.table.operations.ShowCurrentCatalogOperation;
 import org.apache.flink.table.operations.ShowCurrentDatabaseOperation;
 import org.apache.flink.table.operations.ShowDatabasesOperation;
 import org.apache.flink.table.operations.ShowFunctionsOperation;
+import org.apache.flink.table.operations.ShowFunctionsOperation.FunctionScope;
 import org.apache.flink.table.operations.ShowModulesOperation;
 import org.apache.flink.table.operations.ShowPartitionsOperation;
 import org.apache.flink.table.operations.ShowTablesOperation;
@@ -756,7 +757,8 @@ public class SqlToOperationConverter {
 
     /** Convert SHOW FUNCTIONS statement. */
     private Operation convertShowFunctions(SqlShowFunctions sqlShowFunctions) {
-        return new ShowFunctionsOperation();
+        return new ShowFunctionsOperation(
+                sqlShowFunctions.requireUser() ? FunctionScope.USER : 
FunctionScope.ALL);
     }
 
     /** Convert SHOW PARTITIONS statement. */
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 2ae1d21..cc961fd 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -48,6 +48,8 @@ import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.LoadModuleOperation;
 import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ShowFunctionsOperation;
+import org.apache.flink.table.operations.ShowFunctionsOperation.FunctionScope;
 import org.apache.flink.table.operations.ShowModulesOperation;
 import org.apache.flink.table.operations.UnloadModuleOperation;
 import org.apache.flink.table.operations.UseCatalogOperation;
@@ -375,6 +377,15 @@ public class SqlToOperationConverterTest {
     }
 
     @Test
+    public void testShowFunctions() {
+        final String sql1 = "SHOW FUNCTIONS";
+        assertShowFunctions(sql1, sql1, FunctionScope.ALL);
+
+        final String sql2 = "SHOW USER FUNCTIONS";
+        assertShowFunctions(sql2, sql2, FunctionScope.USER);
+    }
+
+    @Test
     public void testCreateTable() {
         final String sql =
                 "CREATE TABLE tbl1 (\n"
@@ -1386,6 +1397,16 @@ public class SqlToOperationConverterTest {
         return testItem;
     }
 
+    private void assertShowFunctions(
+            String sql, String expectedSummary, FunctionScope expectedScope) {
+        Operation operation = parse(sql, SqlDialect.DEFAULT);
+        assert operation instanceof ShowFunctionsOperation;
+        final ShowFunctionsOperation showFunctionsOperation = 
(ShowFunctionsOperation) operation;
+
+        assertEquals(expectedScope, showFunctionsOperation.getFunctionScope());
+        assertEquals(expectedSummary, 
showFunctionsOperation.asSummaryString());
+    }
+
     private Operation parse(String sql, FlinkPlannerImpl planner, 
CalciteParser parser) {
         SqlNode node = parser.parse(sql);
         return SqlToOperationConverter.convert(planner, catalogManager, 
node).get();
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index b7c69c7..c978ce6 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -504,6 +504,18 @@ class TableEnvironmentTest {
     checkData(
       tableEnv.listFunctions().map(Row.of(_)).toList.asJava.iterator(),
       tableResult.collect())
+
+    val funcName = classOf[TestUDF].getName
+    val tableResult1 = tableEnv.executeSql(s"CREATE FUNCTION 
default_database.f1 AS '$funcName'")
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+    val tableResult2 = tableEnv.executeSql("SHOW USER FUNCTIONS")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+    assertEquals(
+      TableSchema.builder().field("function name", DataTypes.STRING()).build(),
+      tableResult2.getTableSchema)
+    checkData(
+      util.Arrays.asList(Row.of("f1")).iterator(),
+      tableResult2.collect())
   }
 
   @Test
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index 230adf2..1602aaf 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -73,6 +73,7 @@ import 
org.apache.flink.table.operations.ShowCurrentCatalogOperation;
 import org.apache.flink.table.operations.ShowCurrentDatabaseOperation;
 import org.apache.flink.table.operations.ShowDatabasesOperation;
 import org.apache.flink.table.operations.ShowFunctionsOperation;
+import org.apache.flink.table.operations.ShowFunctionsOperation.FunctionScope;
 import org.apache.flink.table.operations.ShowTablesOperation;
 import org.apache.flink.table.operations.ShowViewsOperation;
 import org.apache.flink.table.operations.UseCatalogOperation;
@@ -574,7 +575,8 @@ public class SqlToOperationConverter {
 
     /** Convert SHOW FUNCTIONS statement. */
     private Operation convertShowFunctions(SqlShowFunctions sqlShowFunctions) {
-        return new ShowFunctionsOperation();
+        return new ShowFunctionsOperation(
+                sqlShowFunctions.requireUser() ? FunctionScope.USER : 
FunctionScope.ALL);
     }
 
     /** Convert CREATE VIEW statement. */
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 0c730f0..79a7221 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -37,7 +37,8 @@ import org.apache.flink.table.functions.{AggregateFunction, 
ScalarFunction, Tabl
 import org.apache.flink.table.module.{Module, ModuleEntry, ModuleManager}
 import org.apache.flink.table.operations.ddl._
 import org.apache.flink.table.operations.utils.OperationTreeBuilder
-import org.apache.flink.table.operations.{CatalogQueryOperation, 
TableSourceQueryOperation, _}
+import org.apache.flink.table.operations.{CatalogQueryOperation, 
ShowFunctionsOperation, TableSourceQueryOperation, _}
+import org.apache.flink.table.operations.ShowFunctionsOperation.FunctionScope
 import org.apache.flink.table.planner.{ParserImpl, 
PlanningConfigurationBuilder}
 import org.apache.flink.table.sinks.{BatchSelectTableSink, BatchTableSink, 
OutputFormatTableSink, OverwritableTableSink, PartitionableTableSink, 
TableSink, TableSinkUtils}
 import org.apache.flink.table.sources.TableSource
@@ -770,8 +771,16 @@ abstract class TableEnvImpl(
         buildShowResult("current database name", 
Array(catalogManager.getCurrentDatabase))
       case _: ShowTablesOperation =>
         buildShowResult("table name", listTables())
-      case _: ShowFunctionsOperation =>
-        buildShowResult("function name", listFunctions())
+      case showFunctionsOperation: ShowFunctionsOperation =>
+        val functionScope = showFunctionsOperation.getFunctionScope()
+        val functionNames = functionScope match {
+          case FunctionScope.USER => listUserDefinedFunctions()
+          case FunctionScope.ALL => listFunctions()
+          case _ =>
+            throw new UnsupportedOperationException(
+              s"SHOW FUNCTIONS with $functionScope scope is not supported.")
+        }
+        buildShowResult("function name", functionNames)
       case createViewOperation: CreateViewOperation =>
         if (createViewOperation.isTemporary) {
           catalogManager.createTemporaryTable(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index cca74d7..587bda3 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -50,6 +50,8 @@ import 
org.apache.flink.table.expressions.resolver.ExpressionResolver.Expression
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ShowFunctionsOperation;
+import org.apache.flink.table.operations.ShowFunctionsOperation.FunctionScope;
 import org.apache.flink.table.operations.UseCatalogOperation;
 import org.apache.flink.table.operations.UseDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
@@ -257,6 +259,15 @@ public class SqlToOperationConverterTest {
     }
 
     @Test
+    public void testShowFunctions() {
+        final String sql1 = "SHOW FUNCTIONS";
+        assertShowFunctions(sql1, sql1, FunctionScope.ALL);
+
+        final String sql2 = "SHOW USER FUNCTIONS";
+        assertShowFunctions(sql2, sql2, FunctionScope.USER);
+    }
+
+    @Test
     public void testCreateTable() {
         final String sql =
                 "CREATE TABLE tbl1 (\n"
@@ -674,6 +685,16 @@ public class SqlToOperationConverterTest {
         return testItem;
     }
 
+    private void assertShowFunctions(
+            String sql, String expectedSummary, FunctionScope expectedScope) {
+        Operation operation = parse(sql, SqlDialect.DEFAULT);
+        assert operation instanceof ShowFunctionsOperation;
+        final ShowFunctionsOperation showFunctionsOperation = 
(ShowFunctionsOperation) operation;
+
+        assertEquals(expectedScope, showFunctionsOperation.getFunctionScope());
+        assertEquals(expectedSummary, 
showFunctionsOperation.asSummaryString());
+    }
+
     private CalciteParser getParserBySqlDialect(SqlDialect sqlDialect) {
         tableConfig.setSqlDialect(sqlDialect);
         return planningConfigurationBuilder.createCalciteParser();

Reply via email to