This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new def2f543809 [FLINK-28631][sql-gateway][hive] Support to GetFunctions 
in the HiveServer2Endpoint
def2f543809 is described below

commit def2f5438090779fa23862027de9b1c4db36b21f
Author: Shengkai <[email protected]>
AuthorDate: Sat Aug 6 22:44:01 2022 +0800

    [FLINK-28631][sql-gateway][hive] Support to GetFunctions in the 
HiveServer2Endpoint
    
    This closes #20479
---
 .../table/endpoint/hive/HiveServer2Endpoint.java   |  23 +-
 .../table/endpoint/hive/HiveServer2Schemas.java    | 168 +++++++--------
 .../hive/util/OperationExecutorFactory.java        | 161 ++++++++++++--
 .../endpoint/hive/HiveServer2EndpointITCase.java   | 233 ++++++++-------------
 .../flink/table/gateway/api/SqlGatewayService.java |  36 ++++
 .../table/gateway/api/results/FunctionInfo.java    |  83 ++++++++
 .../gateway/api/utils/MockedSqlGatewayService.java |  23 ++
 .../gateway/service/SqlGatewayServiceImpl.java     |  40 ++++
 .../gateway/service/context/SessionContext.java    |   4 +
 .../service/operation/OperationExecutor.java       |  71 ++++++-
 .../gateway/service/SqlGatewayServiceITCase.java   |  63 ++++++
 .../flink/table/catalog/FunctionCatalog.java       |  83 +++++---
 .../flink/table/functions/FunctionIdentifier.java  |   8 +
 13 files changed, 715 insertions(+), 281 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index edb5f1cebe9..2786c8996d9 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -126,6 +126,7 @@ import static 
org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HI
 import static 
org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.getUsedDefaultDatabase;
 import static 
org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.validateAndNormalize;
 import static 
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetCatalogsExecutor;
+import static 
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetFunctionsExecutor;
 import static 
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetSchemasExecutor;
 import static 
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetTableInfoExecutor;
 import static 
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetTablesExecutor;
@@ -518,7 +519,27 @@ public class HiveServer2Endpoint implements 
TCLIService.Iface, SqlGatewayEndpoin
 
     @Override
     public TGetFunctionsResp GetFunctions(TGetFunctionsReq tGetFunctionsReq) 
throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        TGetFunctionsResp resp = new TGetFunctionsResp();
+        try {
+            SessionHandle sessionHandle = 
toSessionHandle(tGetFunctionsReq.getSessionHandle());
+            OperationHandle operationHandle =
+                    service.submitOperation(
+                            sessionHandle,
+                            createGetFunctionsExecutor(
+                                    service,
+                                    sessionHandle,
+                                    tGetFunctionsReq.getCatalogName(),
+                                    tGetFunctionsReq.getSchemaName(),
+                                    tGetFunctionsReq.getFunctionName()));
+            resp.setStatus(OK_STATUS);
+            resp.setOperationHandle(
+                    toTOperationHandle(
+                            sessionHandle, operationHandle, 
TOperationType.GET_FUNCTIONS));
+        } catch (Throwable t) {
+            LOG.error("Failed to GetFunctions.", t);
+            resp.setStatus(toTStatus(t));
+        }
+        return resp;
     }
 
     @Override
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
index ad7743d1be1..d1879ef9062 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
@@ -30,96 +30,98 @@ public class HiveServer2Schemas {
 
     /** Schema for {@link HiveServer2Endpoint#GetCatalogs}. */
     public static final ResolvedSchema GET_CATALOGS_SCHEMA =
-            new ResolvedSchema(
-                    Collections.singletonList(
-                            Column.physical("TABLE_CAT", DataTypes.STRING())
-                                    .withComment("Catalog name. NULL if not 
applicable.")),
-                    Collections.emptyList(),
-                    null);
+            buildSchema(
+                    Column.physical("TABLE_CAT", DataTypes.STRING())
+                            .withComment("Catalog name. NULL if not 
applicable."));
 
     /** Schema for {@link HiveServer2Endpoint#GetSchemas}. */
     public static final ResolvedSchema GET_SCHEMAS_SCHEMA =
-            new ResolvedSchema(
-                    Arrays.asList(
-                            Column.physical("TABLE_SCHEMA", DataTypes.STRING())
-                                    .withComment("Schema name. NULL if not 
applicable."),
-                            Column.physical("TABLE_CAT", DataTypes.STRING())
-                                    .withComment("Catalog name. NULL if not 
applicable")),
-                    Collections.emptyList(),
-                    null);
+            buildSchema(
+                    Column.physical("TABLE_SCHEMA", DataTypes.STRING())
+                            .withComment("Schema name. NULL if not 
applicable."),
+                    Column.physical("TABLE_CAT", DataTypes.STRING())
+                            .withComment("Catalog name. NULL if not 
applicable"));
 
     /** Schema for {@link HiveServer2Endpoint#GetTables}. */
     public static final ResolvedSchema GET_TABLES_SCHEMA =
-            new ResolvedSchema(
-                    Collections.unmodifiableList(
-                            Arrays.asList(
-                                    Column.physical("TABLE_CAT", 
DataTypes.STRING())
-                                            .withComment("Catalog name. NULL 
if not applicable."),
-                                    Column.physical("TABLE_SCHEMA", 
DataTypes.STRING())
-                                            .withComment("Schema name. NULL if 
not applicable."),
-                                    Column.physical("TABLE_NAME", 
DataTypes.STRING())
-                                            .withComment("Table name. NULL if 
not applicable."),
-                                    Column.physical("TABLE_TYPE", 
DataTypes.STRING())
-                                            .withComment(
-                                                    "The table type, e.g. 
\"TABLE\", \"VIEW\", etc."),
-                                    Column.physical("REMARKS", 
DataTypes.STRING())
-                                            .withComment("Comments about the 
table."),
-                                    Column.physical("TYPE_CAT", 
DataTypes.STRING())
-                                            .withComment("The types catalog."),
-                                    Column.physical("TYPE_SCHEM", 
DataTypes.STRING())
-                                            .withComment("The types schema."),
-                                    Column.physical("TYPE_NAME", 
DataTypes.STRING())
-                                            .withComment("Type name."),
-                                    
Column.physical("SELF_REFERENCING_COL_NAME", DataTypes.STRING())
-                                            .withComment(
-                                                    "Name of the designated 
\"identifier\" column of a typed table."),
-                                    Column.physical("REF_GENERATION", 
DataTypes.STRING())
-                                            .withComment(
-                                                    "Specifies how values in 
SELF_REFERENCING_COL_NAME are created."))),
-                    Collections.emptyList(),
-                    null);
+            buildSchema(
+                    Column.physical("TABLE_CAT", DataTypes.STRING())
+                            .withComment("Catalog name. NULL if not 
applicable."),
+                    Column.physical("TABLE_SCHEMA", DataTypes.STRING())
+                            .withComment("Schema name. NULL if not 
applicable."),
+                    Column.physical("TABLE_NAME", DataTypes.STRING())
+                            .withComment("Table name. NULL if not 
applicable."),
+                    Column.physical("TABLE_TYPE", DataTypes.STRING())
+                            .withComment("The table type, e.g. \"TABLE\", 
\"VIEW\", etc."),
+                    Column.physical("REMARKS", DataTypes.STRING())
+                            .withComment("Comments about the table."),
+                    Column.physical("TYPE_CAT", DataTypes.STRING())
+                            .withComment("The types catalog."),
+                    Column.physical("TYPE_SCHEM", DataTypes.STRING())
+                            .withComment("The types schema."),
+                    Column.physical("TYPE_NAME", 
DataTypes.STRING()).withComment("Type name."),
+                    Column.physical("SELF_REFERENCING_COL_NAME", 
DataTypes.STRING())
+                            .withComment(
+                                    "Name of the designated \"identifier\" 
column of a typed table."),
+                    Column.physical("REF_GENERATION", DataTypes.STRING())
+                            .withComment(
+                                    "Specifies how values in 
SELF_REFERENCING_COL_NAME are created."));
+
+    /** Schema for {@link HiveServer2Endpoint#GetFunctions}. */
+    public static final ResolvedSchema GET_FUNCTIONS_SCHEMA =
+            buildSchema(
+                    Column.physical("FUNCTION_CAT", DataTypes.STRING())
+                            .withComment("Function catalog (may be null)"),
+                    Column.physical("FUNCTION_SCHEM", DataTypes.STRING())
+                            .withComment("Function schema (may be null)"),
+                    Column.physical("FUNCTION_NAME", DataTypes.STRING())
+                            .withComment(
+                                    "Function name. This is the name used to 
invoke the function"),
+                    Column.physical("REMARKS", DataTypes.STRING())
+                            .withComment("Explanatory comment on the 
function"),
+                    Column.physical("FUNCTION_TYPE", DataTypes.INT())
+                            .withComment("Kind of function."),
+                    Column.physical("SPECIFIC_NAME", DataTypes.STRING())
+                            .withComment(
+                                    "The name which uniquely identifies this 
function within its schema"));
 
     /** Schema for {@link HiveServer2Endpoint#GetTypeInfo}. */
     public static final ResolvedSchema GET_TYPE_INFO_SCHEMA =
-            new ResolvedSchema(
-                    Arrays.asList(
-                            Column.physical("TYPE_NAME", DataTypes.STRING())
-                                    .withComment("Type name."),
-                            Column.physical("DATA_TYPE", DataTypes.INT())
-                                    .withComment("SQL data type from 
java.sql.Types."),
-                            Column.physical("PRECISION", DataTypes.INT())
-                                    .withComment("Maximum precision."),
-                            Column.physical("LITERAL_PREFIX", 
DataTypes.STRING())
-                                    .withComment("Prefix used to quote a 
literal (may be null)."),
-                            Column.physical("LITERAL_SUFFIX", 
DataTypes.STRING())
-                                    .withComment("Suffix used to quote a 
literal (may be null)."),
-                            Column.physical("CREATE_PARAMS", 
DataTypes.STRING())
-                                    .withComment(
-                                            "Parameters used in creating the 
type (may be null)."),
-                            Column.physical("NULLABLE", DataTypes.SMALLINT())
-                                    .withComment("Can you use NULL for this 
type."),
-                            Column.physical("CASE_SENSITIVE", 
DataTypes.BOOLEAN())
-                                    .withComment("Is it case sensitive."),
-                            Column.physical("SEARCHABLE", DataTypes.SMALLINT())
-                                    .withComment("Can you use \"WHERE\" based 
on this type."),
-                            Column.physical("UNSIGNED_ATTRIBUTE", 
DataTypes.BOOLEAN())
-                                    .withComment("Is it unsigned."),
-                            Column.physical("FIXED_PREC_SCALE", 
DataTypes.BOOLEAN())
-                                    .withComment("Can it be a money value."),
-                            Column.physical("AUTO_INCREMENT", 
DataTypes.BOOLEAN())
-                                    .withComment("Can it be used for an 
auto-increment value."),
-                            Column.physical("LOCAL_TYPE_NAME", 
DataTypes.STRING())
-                                    .withComment("Localized version of type 
name (may be null)."),
-                            Column.physical("MINIMUM_SCALE", 
DataTypes.SMALLINT())
-                                    .withComment("Minimum scale supported."),
-                            Column.physical("MAXIMUM_SCALE", 
DataTypes.SMALLINT())
-                                    .withComment("Maximum scale supported."),
-                            Column.physical("SQL_DATA_TYPE", DataTypes.INT())
-                                    .withComment("Unused."),
-                            Column.physical("SQL_DATETIME_SUB", 
DataTypes.INT())
-                                    .withComment("Unused."),
-                            Column.physical("NUM_PREC_RADIX", DataTypes.INT())
-                                    .withComment("Usually 2 or 10.")),
-                    Collections.emptyList(),
-                    null);
+            buildSchema(
+                    Column.physical("TYPE_NAME", 
DataTypes.STRING()).withComment("Type name."),
+                    Column.physical("DATA_TYPE", DataTypes.INT())
+                            .withComment("SQL data type from java.sql.Types."),
+                    Column.physical("PRECISION", 
DataTypes.INT()).withComment("Maximum precision."),
+                    Column.physical("LITERAL_PREFIX", DataTypes.STRING())
+                            .withComment("Prefix used to quote a literal (may 
be null)."),
+                    Column.physical("LITERAL_SUFFIX", DataTypes.STRING())
+                            .withComment("Suffix used to quote a literal (may 
be null)."),
+                    Column.physical("CREATE_PARAMS", DataTypes.STRING())
+                            .withComment("Parameters used in creating the type 
(may be null)."),
+                    Column.physical("NULLABLE", DataTypes.SMALLINT())
+                            .withComment("Can you use NULL for this type."),
+                    Column.physical("CASE_SENSITIVE", DataTypes.BOOLEAN())
+                            .withComment("Is it case sensitive."),
+                    Column.physical("SEARCHABLE", DataTypes.SMALLINT())
+                            .withComment("Can you use \"WHERE\" based on this 
type."),
+                    Column.physical("UNSIGNED_ATTRIBUTE", DataTypes.BOOLEAN())
+                            .withComment("Is it unsigned."),
+                    Column.physical("FIXED_PREC_SCALE", DataTypes.BOOLEAN())
+                            .withComment("Can it be a money value."),
+                    Column.physical("AUTO_INCREMENT", DataTypes.BOOLEAN())
+                            .withComment("Can it be used for an auto-increment 
value."),
+                    Column.physical("LOCAL_TYPE_NAME", DataTypes.STRING())
+                            .withComment("Localized version of type name (may 
be null)."),
+                    Column.physical("MINIMUM_SCALE", DataTypes.SMALLINT())
+                            .withComment("Minimum scale supported."),
+                    Column.physical("MAXIMUM_SCALE", DataTypes.SMALLINT())
+                            .withComment("Maximum scale supported."),
+                    Column.physical("SQL_DATA_TYPE", 
DataTypes.INT()).withComment("Unused."),
+                    Column.physical("SQL_DATETIME_SUB", 
DataTypes.INT()).withComment("Unused."),
+                    Column.physical("NUM_PREC_RADIX", DataTypes.INT())
+                            .withComment("Usually 2 or 10."));
+
+    private static ResolvedSchema buildSchema(Column... columns) {
+        return new ResolvedSchema(Arrays.asList(columns), 
Collections.emptyList(), null);
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
index 44095b1c24a..d6397700e13 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
@@ -19,9 +19,23 @@
 package org.apache.flink.table.endpoint.hive.util;
 
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.AggregateFunctionDefinition;
+import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.functions.ScalarFunctionDefinition;
+import org.apache.flink.table.functions.TableAggregateFunctionDefinition;
+import org.apache.flink.table.functions.TableFunctionDefinition;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.functions.hive.HiveFunction;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.results.FunctionInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.results.TableInfo;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
@@ -30,8 +44,10 @@ import org.apache.hadoop.hive.serde2.thrift.Type;
 
 import javax.annotation.Nullable;
 
+import java.sql.DatabaseMetaData;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -41,6 +57,7 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_CATALOGS_SCHEMA;
+import static 
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_FUNCTIONS_SCHEMA;
 import static 
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_SCHEMAS_SCHEMA;
 import static 
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_TABLES_SCHEMA;
 import static 
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_TYPE_INFO_SCHEMA;
@@ -94,11 +111,20 @@ public class OperationExecutorFactory {
                         service, sessionHandle, catalogName, schemaName, 
tableName, tableKinds);
     }
 
+    public static Callable<ResultSet> createGetFunctionsExecutor(
+            SqlGatewayService service,
+            SessionHandle sessionHandle,
+            @Nullable String catalogName,
+            @Nullable String databasePattern,
+            @Nullable String functionNamePattern) {
+        return () ->
+                executeGetFunctions(
+                        service, sessionHandle, catalogName, databasePattern, 
functionNamePattern);
+    }
+
     public static Callable<ResultSet> createGetTableInfoExecutor() {
         return () ->
-                new ResultSet(
-                        EOS,
-                        null,
+                buildResultSet(
                         GET_TYPE_INFO_SCHEMA,
                         getSupportedHiveType().stream()
                                 .map(
@@ -134,9 +160,7 @@ public class OperationExecutorFactory {
     private static ResultSet executeGetCatalogs(
             SqlGatewayService service, SessionHandle sessionHandle) {
         Set<String> catalogNames = service.listCatalogs(sessionHandle);
-        return new ResultSet(
-                EOS,
-                null,
+        return buildResultSet(
                 GET_CATALOGS_SCHEMA,
                 catalogNames.stream()
                         .map(OperationExecutorFactory::wrap)
@@ -152,9 +176,7 @@ public class OperationExecutorFactory {
                 isNullOrEmpty(catalogName) ? 
service.getCurrentCatalog(sessionHandle) : catalogName;
         Set<String> databaseNames =
                 filter(service.listDatabases(sessionHandle, 
specifiedCatalogName), schemaName);
-        return new ResultSet(
-                EOS,
-                null,
+        return buildResultSet(
                 GET_SCHEMAS_SCHEMA,
                 databaseNames.stream()
                         .map(name -> wrap(name, specifiedCatalogName))
@@ -180,9 +202,7 @@ public class OperationExecutorFactory {
                             candidate -> 
candidate.getIdentifier().getObjectName(),
                             tableName));
         }
-        return new ResultSet(
-                EOS,
-                null,
+        return buildResultSet(
                 GET_TABLES_SCHEMA,
                 tableInfos.stream()
                         .map(
@@ -203,6 +223,77 @@ public class OperationExecutorFactory {
                         .collect(Collectors.toList()));
     }
 
+    private static ResultSet executeGetFunctions(
+            SqlGatewayService service,
+            SessionHandle sessionHandle,
+            @Nullable String catalogName,
+            @Nullable String databasePattern,
+            @Nullable String functionNamePattern) {
+        String specifiedCatalogName =
+                isNullOrEmpty(catalogName) ? 
service.getCurrentCatalog(sessionHandle) : catalogName;
+
+        Set<FunctionInfo> candidates = new HashSet<>();
+        // Add user defined functions
+        for (String databaseName :
+                filter(
+                        service.listDatabases(sessionHandle, 
specifiedCatalogName),
+                        databasePattern)) {
+            candidates.addAll(
+                    service.listUserDefinedFunctions(
+                            sessionHandle, specifiedCatalogName, 
databaseName));
+        }
+        // Add system functions
+        if (isNullOrEmpty(catalogName) && isNullOrEmpty(databasePattern)) {
+            candidates.addAll(service.listSystemFunctions(sessionHandle));
+        }
+        // Filter out unmatched functions
+        Set<FunctionInfo> matchedFunctions =
+                filter(
+                        candidates,
+                        candidate -> 
candidate.getIdentifier().getFunctionName(),
+                        functionNamePattern);
+        return buildResultSet(
+                GET_FUNCTIONS_SCHEMA,
+                // Sort
+                matchedFunctions.stream()
+                        .sorted(
+                                Comparator.comparing(
+                                        info ->
+                                                info.getIdentifier()
+                                                        .asSummaryString()
+                                                        .toLowerCase()))
+                        .map(
+                                info ->
+                                        wrap(
+                                                info.getIdentifier()
+                                                        .getIdentifier()
+                                                        
.map(ObjectIdentifier::getCatalogName)
+                                                        .orElse(null), // 
FUNCTION_CAT
+                                                info.getIdentifier()
+                                                        .getIdentifier()
+                                                        
.map(ObjectIdentifier::getDatabaseName)
+                                                        .orElse(null), // 
FUNCTION_SCHEM
+                                                info.getIdentifier()
+                                                        .getFunctionName(), // 
FUNCTION_NAME
+                                                "", // REMARKS
+                                                info.getKind()
+                                                        .map(
+                                                                
OperationExecutorFactory
+                                                                        
::toFunctionResult)
+                                                        .orElse(
+                                                                
DatabaseMetaData
+                                                                        
.functionResultUnknown), // FUNCTION_TYPE
+                                                // TODO: remove until Catalog 
listFunctions return
+                                                // CatalogFunction
+                                                getFunctionName(
+                                                        
service.getFunctionDefinition(
+                                                                sessionHandle,
+                                                                
UnresolvedIdentifier.of(
+                                                                        
info.getIdentifier()
+                                                                               
 .toList()))))) // SPECIFIC_NAME
+                        .collect(Collectors.toList()));
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Utilities
     // 
--------------------------------------------------------------------------------------------
@@ -270,6 +361,10 @@ public class OperationExecutorFactory {
         return GenericRowData.of(pack);
     }
 
+    private static ResultSet buildResultSet(ResolvedSchema schema, 
List<RowData> data) {
+        return new ResultSet(EOS, null, schema, data);
+    }
+
     private static List<Type> getSupportedHiveType() {
         return Collections.unmodifiableList(
                 Arrays.asList(
@@ -294,4 +389,46 @@ public class OperationExecutorFactory {
                         INTERVAL_YEAR_MONTH_TYPE,
                         INTERVAL_DAY_TIME_TYPE));
     }
+
+    private static int toFunctionResult(FunctionKind kind) {
+        switch (kind) {
+            case SCALAR:
+            case AGGREGATE:
+                return DatabaseMetaData.functionNoTable;
+            case TABLE:
+            case ASYNC_TABLE:
+            case TABLE_AGGREGATE:
+                return DatabaseMetaData.functionReturnsTable;
+            case OTHER:
+                return DatabaseMetaData.functionResultUnknown;
+            default:
+                throw new UnsupportedOperationException(
+                        String.format("Unknown function kind: %s.", kind));
+        }
+    }
+
+    private static String getFunctionName(FunctionDefinition definition) {
+        if (definition instanceof HiveFunction) {
+            return ((HiveFunction<?>) 
definition).getFunctionWrapper().getUDFClassName();
+        } else if (definition instanceof UserDefinedFunction) {
+            return ((UserDefinedFunction) definition).functionIdentifier();
+        } else if (definition instanceof TableFunctionDefinition) {
+            return ((TableFunctionDefinition) 
definition).getTableFunction().functionIdentifier();
+        } else if (definition instanceof ScalarFunctionDefinition) {
+            return ((ScalarFunctionDefinition) 
definition).getScalarFunction().functionIdentifier();
+        } else if (definition instanceof AggregateFunctionDefinition) {
+            return ((AggregateFunctionDefinition) definition)
+                    .getAggregateFunction()
+                    .functionIdentifier();
+        } else if (definition instanceof TableAggregateFunctionDefinition) {
+            return ((TableAggregateFunctionDefinition) definition)
+                    .getTableAggregateFunction()
+                    .functionIdentifier();
+        } else if (definition instanceof BuiltInFunctionDefinition) {
+            BuiltInFunctionDefinition builtIn = (BuiltInFunctionDefinition) 
definition;
+            return 
builtIn.getRuntimeClass().orElse(definition.getClass().getCanonicalName());
+        } else {
+            return definition.getClass().getCanonicalName();
+        }
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index 9a82e1d5910..ab02e770e36 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
 import org.apache.flink.table.gateway.service.session.SessionManager;
 import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.BiConsumerWithException;
@@ -250,116 +251,16 @@ public class HiveServer2EndpointITCase extends 
TestLogger {
                                         new String[] {"MANAGED_TABLE", 
"VIRTUAL_VIEW"}),
                 getExpectedGetTablesOperationSchema(),
                 Arrays.asList(
-                        Arrays.asList(
-                                "default_catalog",
-                                "db_test1",
-                                "tbl_1",
-                                "TABLE",
-                                "",
-                                null,
-                                null,
-                                null,
-                                null,
-                                null),
-                        Arrays.asList(
-                                "default_catalog",
-                                "db_test1",
-                                "tbl_2",
-                                "TABLE",
-                                "",
-                                null,
-                                null,
-                                null,
-                                null,
-                                null),
-                        Arrays.asList(
-                                "default_catalog",
-                                "db_test1",
-                                "tbl_3",
-                                "VIEW",
-                                "",
-                                null,
-                                null,
-                                null,
-                                null,
-                                null),
-                        Arrays.asList(
-                                "default_catalog",
-                                "db_test1",
-                                "tbl_4",
-                                "VIEW",
-                                "",
-                                null,
-                                null,
-                                null,
-                                null,
-                                null),
-                        Arrays.asList(
-                                "default_catalog",
-                                "db_test2",
-                                "tbl_1",
-                                "TABLE",
-                                "",
-                                null,
-                                null,
-                                null,
-                                null,
-                                null),
-                        Arrays.asList(
-                                "default_catalog",
-                                "db_test2",
-                                "diff_1",
-                                "TABLE",
-                                "",
-                                null,
-                                null,
-                                null,
-                                null,
-                                null),
-                        Arrays.asList(
-                                "default_catalog",
-                                "db_test2",
-                                "tbl_2",
-                                "VIEW",
-                                "",
-                                null,
-                                null,
-                                null,
-                                null,
-                                null),
-                        Arrays.asList(
-                                "default_catalog",
-                                "db_test2",
-                                "diff_2",
-                                "VIEW",
-                                "",
-                                null,
-                                null,
-                                null,
-                                null,
-                                null),
-                        Arrays.asList(
-                                "default_catalog",
-                                "db_diff",
-                                "tbl_1",
-                                "TABLE",
-                                "",
-                                null,
-                                null,
-                                null,
-                                null,
-                                null),
-                        Arrays.asList(
-                                "default_catalog",
-                                "db_diff",
-                                "tbl_2",
-                                "VIEW",
-                                "",
-                                null,
-                                null,
-                                null,
-                                null,
-                                null)));
+                        Arrays.asList("default_catalog", "db_test1", "tbl_1", 
"TABLE", ""),
+                        Arrays.asList("default_catalog", "db_test1", "tbl_2", 
"TABLE", ""),
+                        Arrays.asList("default_catalog", "db_test1", "tbl_3", 
"VIEW", ""),
+                        Arrays.asList("default_catalog", "db_test1", "tbl_4", 
"VIEW", ""),
+                        Arrays.asList("default_catalog", "db_test2", "tbl_1", 
"TABLE", ""),
+                        Arrays.asList("default_catalog", "db_test2", "diff_1", 
"TABLE", ""),
+                        Arrays.asList("default_catalog", "db_test2", "tbl_2", 
"VIEW", ""),
+                        Arrays.asList("default_catalog", "db_test2", "diff_2", 
"VIEW", ""),
+                        Arrays.asList("default_catalog", "db_diff", "tbl_1", 
"TABLE", ""),
+                        Arrays.asList("default_catalog", "db_diff", "tbl_2", 
"VIEW", "")));
     }
 
     @Test
@@ -375,39 +276,9 @@ public class HiveServer2EndpointITCase extends TestLogger {
                                         new String[] {"VIRTUAL_VIEW"}),
                 getExpectedGetTablesOperationSchema(),
                 Arrays.asList(
-                        Arrays.asList(
-                                "default_catalog",
-                                "db_test1",
-                                "tbl_3",
-                                "VIEW",
-                                "",
-                                null,
-                                null,
-                                null,
-                                null,
-                                null),
-                        Arrays.asList(
-                                "default_catalog",
-                                "db_test1",
-                                "tbl_4",
-                                "VIEW",
-                                "",
-                                null,
-                                null,
-                                null,
-                                null,
-                                null),
-                        Arrays.asList(
-                                "default_catalog",
-                                "db_test2",
-                                "tbl_2",
-                                "VIEW",
-                                "",
-                                null,
-                                null,
-                                null,
-                                null,
-                                null)));
+                        Arrays.asList("default_catalog", "db_test1", "tbl_3", 
"VIEW", ""),
+                        Arrays.asList("default_catalog", "db_test1", "tbl_4", 
"VIEW", ""),
+                        Arrays.asList("default_catalog", "db_test2", "tbl_2", 
"VIEW", "")));
     }
 
     @Test
@@ -444,6 +315,70 @@ public class HiveServer2EndpointITCase extends TestLogger {
                                                 "INTERVAL_DAY_TIME")));
     }
 
+    @Test
+    public void testGetFunctions() throws Exception {
+        runGetObjectTest(
+                connection -> connection.getMetaData().getFunctions(null, 
null, ".*"),
+                ResolvedSchema.of(
+                        Column.physical("FUNCTION_CAT", DataTypes.STRING()),
+                        Column.physical("FUNCTION_SCHEM", DataTypes.STRING()),
+                        Column.physical("FUNCTION_NAME", DataTypes.STRING()),
+                        Column.physical("REMARKS", DataTypes.STRING()),
+                        Column.physical("FUNCTION_TYPE", DataTypes.INT()),
+                        Column.physical("SPECIFIC_NAME", DataTypes.STRING())),
+                compactedResult ->
+                        assertThat(compactedResult)
+                                .contains(
+                                        Arrays.asList(
+                                                "withColumns",
+                                                "",
+                                                0,
+                                                
"org.apache.flink.table.functions.BuiltInFunctionDefinition"),
+                                        Arrays.asList(
+                                                "bin",
+                                                "",
+                                                1,
+                                                
"org.apache.hadoop.hive.ql.udf.UDFBin"),
+                                        Arrays.asList(
+                                                "parse_url_tuple",
+                                                "",
+                                                2,
+                                                
"org.apache.hadoop.hive.ql.udf.generic.GenericUDTFParseUrlTuple")));
+    }
+
+    @Test
+    public void testGetFunctionWithPattern() throws Exception {
+        runGetObjectTest(
+                connection -> {
+                    try (Statement statement = connection.createStatement()) {
+                        statement.execute(
+                                String.format(
+                                        "CREATE FUNCTION 
`default_catalog`.`db_test2`.`my_abs` as '%s'",
+                                        JavaFunc0.class.getName()));
+                        statement.execute(
+                                String.format(
+                                        "CREATE FUNCTION 
`default_catalog`.`db_diff`.`your_abs` as '%s'",
+                                        JavaFunc0.class.getName()));
+                    }
+                    return 
connection.getMetaData().getFunctions("default_catalog", "db.*", "my.*");
+                },
+                ResolvedSchema.of(
+                        Column.physical("FUNCTION_CAT", DataTypes.STRING()),
+                        Column.physical("FUNCTION_SCHEM", DataTypes.STRING()),
+                        Column.physical("FUNCTION_NAME", DataTypes.STRING()),
+                        Column.physical("REMARKS", DataTypes.STRING()),
+                        Column.physical("FUNCTION_TYPE", DataTypes.INT()),
+                        Column.physical("SPECIFIC_NAME", DataTypes.STRING())),
+                Collections.singletonList(
+                        Arrays.asList(
+                                "default_catalog",
+                                "db_test2",
+                                "my_abs",
+                                "",
+                                0,
+                                JavaFunc0.class.getName())));
+    }
+
     @Test
     public void testGetInfo() throws Exception {
         try (Connection connection = ENDPOINT_EXTENSION.getConnection()) {
@@ -508,7 +443,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
         try (Connection connection = getInitializedConnection();
                 java.sql.ResultSet result = 
resultSetSupplier.apply(connection)) {
             assertSchemaEquals(expectedSchema, result.getMetaData());
-            validator.accept(collect(result, expectedSchema.getColumnCount()));
+            validator.accept(collectAndCompact(result, 
expectedSchema.getColumnCount()));
         }
     }
 
@@ -608,12 +543,18 @@ public class HiveServer2EndpointITCase extends TestLogger 
{
         }
     }
 
-    private Set<List<Object>> collect(java.sql.ResultSet result, int 
columnCount) throws Exception {
+    private Set<List<Object>> collectAndCompact(java.sql.ResultSet result, int 
columnCount)
+            throws Exception {
         List<List<Object>> actual = new ArrayList<>();
         while (result.next()) {
             List<Object> row = new ArrayList<>();
             for (int i = 1; i <= columnCount; i++) {
-                row.add(result.getObject(i));
+                Object value = result.getObject(i);
+                // ignore the null value for better presentation
+                if (value == null) {
+                    continue;
+                }
+                row.add(value);
             }
             actual.add(row);
         }
diff --git 
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
index 1fd7e339c8d..f12c44250c4 100644
--- 
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
+++ 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -22,10 +22,13 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
 import org.apache.flink.table.gateway.api.results.FetchOrientation;
+import org.apache.flink.table.gateway.api.results.FunctionInfo;
 import org.apache.flink.table.gateway.api.results.GatewayInfo;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
@@ -234,6 +237,39 @@ public interface SqlGatewayService {
             Set<TableKind> tableKinds)
             throws SqlGatewayException;
 
+    /**
+     * List all user defined functions.
+     *
+     * @param sessionHandle handle to identify the session.
+     * @param catalogName name string of the given catalog.
+     * @param databaseName name string of the given database.
+     * @return user defined functions info.
+     */
+    Set<FunctionInfo> listUserDefinedFunctions(
+            SessionHandle sessionHandle, String catalogName, String 
databaseName)
+            throws SqlGatewayException;
+
+    /**
+     * List all available system functions.
+     *
+     * @param sessionHandle handle to identify the session.
+     * @return system functions info.
+     */
+    Set<FunctionInfo> listSystemFunctions(SessionHandle sessionHandle) throws 
SqlGatewayException;
+
+    /**
+     * Get the specific definition of the function. If the input identifier 
only contains the
+     * function name, it is resolved with the order of the temporary system 
function, system
+     * function, temporary function and catalog function.
+     *
+     * @param sessionHandle handle to identify the session.
+     * @param functionIdentifier identifier of the function.
+     * @return the definition of the function.
+     */
+    FunctionDefinition getFunctionDefinition(
+            SessionHandle sessionHandle, UnresolvedIdentifier 
functionIdentifier)
+            throws SqlGatewayException;
+
     // 
-------------------------------------------------------------------------------------------
     // Utilities
     // 
-------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/FunctionInfo.java
 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/FunctionInfo.java
new file mode 100644
index 00000000000..7975235f04c
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/FunctionInfo.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.results;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.table.functions.FunctionKind;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Info to describe the function. It is not equivalent to the {@link 
FunctionDefinition} that needs
+ * to load the implementation, which may require to download the jar from the 
remote to the local
+ * machine and load the classes. Comparing to the {@link FunctionDefinition}, 
the {@link
+ * FunctionInfo} return the available information in the current state, which 
is much lighter.
+ */
+@PublicEvolving
+public class FunctionInfo {
+
+    /** Identifier of the function. */
+    private final FunctionIdentifier identifier;
+    /** Kind of the function. If the value is null, it means kind of the 
function is unresolved. */
+    private final @Nullable FunctionKind kind;
+
+    public FunctionInfo(FunctionIdentifier identifier) {
+        this(identifier, null);
+    }
+
+    public FunctionInfo(FunctionIdentifier identifier, @Nullable FunctionKind 
kind) {
+        this.identifier = identifier;
+        this.kind = kind;
+    }
+
+    public FunctionIdentifier getIdentifier() {
+        return identifier;
+    }
+
+    public Optional<FunctionKind> getKind() {
+        return Optional.ofNullable(kind);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof FunctionInfo)) {
+            return false;
+        }
+        FunctionInfo that = (FunctionInfo) o;
+        return Objects.equals(identifier, that.identifier) && kind == 
that.kind;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(identifier, kind);
+    }
+
+    @Override
+    public String toString() {
+        return "FunctionInfo{identifier=" + identifier + ", kind=" + kind + 
'}';
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
 
b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
index 8f0b914c223..8508edbd23a 100644
--- 
a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
+++ 
b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
@@ -21,10 +21,13 @@ package org.apache.flink.table.gateway.api.utils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
 import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.results.FetchOrientation;
+import org.apache.flink.table.gateway.api.results.FunctionInfo;
 import org.apache.flink.table.gateway.api.results.GatewayInfo;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
@@ -143,6 +146,26 @@ public class MockedSqlGatewayService implements 
SqlGatewayService {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public Set<FunctionInfo> listUserDefinedFunctions(
+            SessionHandle sessionHandle, String catalogName, String 
databaseName)
+            throws SqlGatewayException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Set<FunctionInfo> listSystemFunctions(SessionHandle sessionHandle)
+            throws SqlGatewayException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FunctionDefinition getFunctionDefinition(
+            SessionHandle sessionHandle, UnresolvedIdentifier 
functionIdentifier)
+            throws SqlGatewayException {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public GatewayInfo getGatewayInfo() {
         throw new UnsupportedOperationException();
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index 3b39430301b..8114fc5b79a 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -22,10 +22,13 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
 import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.results.FetchOrientation;
+import org.apache.flink.table.gateway.api.results.FunctionInfo;
 import org.apache.flink.table.gateway.api.results.GatewayInfo;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
@@ -252,6 +255,43 @@ public class SqlGatewayServiceImpl implements 
SqlGatewayService {
         }
     }
 
+    @Override
+    public Set<FunctionInfo> listUserDefinedFunctions(
+            SessionHandle sessionHandle, String catalogName, String 
databaseName)
+            throws SqlGatewayException {
+        try {
+            return getSession(sessionHandle)
+                    .createExecutor()
+                    .listUserDefinedFunctions(catalogName, databaseName);
+        } catch (Throwable t) {
+            LOG.error("Failed to listUserDefinedFunctions.", t);
+            throw new SqlGatewayException("Failed to 
listUserDefinedFunctions.", t);
+        }
+    }
+
+    public Set<FunctionInfo> listSystemFunctions(SessionHandle sessionHandle) {
+        try {
+            return 
getSession(sessionHandle).createExecutor().listSystemFunctions();
+        } catch (Throwable t) {
+            LOG.error("Failed to listSystemFunctions.", t);
+            throw new SqlGatewayException("Failed to listSystemFunctions.", t);
+        }
+    }
+
+    @Override
+    public FunctionDefinition getFunctionDefinition(
+            SessionHandle sessionHandle, UnresolvedIdentifier 
functionIdentifier)
+            throws SqlGatewayException {
+        try {
+            return getSession(sessionHandle)
+                    .createExecutor()
+                    .getFunctionDefinition(functionIdentifier);
+        } catch (Throwable t) {
+            LOG.error("Failed to getFunctionDefinition.", t);
+            throw new SqlGatewayException("Failed to getFunctionDefinition.", 
t);
+        }
+    }
+
     @Override
     public GatewayInfo getGatewayInfo() {
         return GatewayInfo.INSTANCE;
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
index bb70e8947c1..b949d299d22 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
@@ -120,6 +120,10 @@ public class SessionContext {
         return endpointVersion;
     }
 
+    public SessionState getSessionState() {
+        return sessionState;
+    }
+
     public void set(String key, String value) {
         try {
             // Test whether the key value will influence the creation of the 
Executor.
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index 20e04ac3b75..87c4661f24d 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -29,10 +29,14 @@ import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionIdentifier;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.FunctionInfo;
 import org.apache.flink.table.gateway.api.results.TableInfo;
 import org.apache.flink.table.gateway.service.context.SessionContext;
 import org.apache.flink.table.gateway.service.result.ResultFetcher;
@@ -46,6 +50,9 @@ import 
org.apache.flink.table.operations.StatementSetOperation;
 import org.apache.flink.table.operations.command.ResetOperation;
 import org.apache.flink.table.operations.command.SetOperation;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -65,6 +72,8 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 /** An executor to execute the {@link Operation}. */
 public class OperationExecutor {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(OperationExecutor.class);
+
     private final SessionContext sessionContext;
     private final Configuration executionConfig;
 
@@ -111,17 +120,18 @@ public class OperationExecutor {
     }
 
     public String getCurrentCatalog() {
-        return getTableEnvironment().getCatalogManager().getCurrentCatalog();
+        return 
sessionContext.getSessionState().catalogManager.getCurrentCatalog();
     }
 
     public Set<String> listCatalogs() {
-        return getTableEnvironment().getCatalogManager().listCatalogs();
+        return sessionContext.getSessionState().catalogManager.listCatalogs();
     }
 
     public Set<String> listDatabases(String catalogName) {
         return new HashSet<>(
-                getTableEnvironment()
-                        .getCatalogManager()
+                sessionContext
+                        .getSessionState()
+                        .catalogManager
                         .getCatalog(catalogName)
                         .orElseThrow(
                                 () ->
@@ -146,6 +156,53 @@ public class OperationExecutor {
         }
     }
 
+    public Set<FunctionInfo> listUserDefinedFunctions(String catalogName, 
String databaseName) {
+        return sessionContext.getSessionState().functionCatalog
+                .getUserDefinedFunctions(catalogName, databaseName).stream()
+                // Load the CatalogFunction from the remote catalog is time 
wasted. Set the
+                // FunctionKind null.
+                .map(FunctionInfo::new)
+                .collect(Collectors.toSet());
+    }
+
+    public Set<FunctionInfo> listSystemFunctions() {
+        Set<FunctionInfo> info = new HashSet<>();
+        for (String functionName : 
sessionContext.getSessionState().moduleManager.listFunctions()) {
+            try {
+                info.add(
+                        sessionContext
+                                .getSessionState()
+                                .moduleManager
+                                .getFunctionDefinition(functionName)
+                                .map(
+                                        definition ->
+                                                new FunctionInfo(
+                                                        
FunctionIdentifier.of(functionName),
+                                                        definition.getKind()))
+                                .orElse(new 
FunctionInfo(FunctionIdentifier.of(functionName))));
+            } catch (Throwable t) {
+                // Failed to load the function. Ignore.
+                LOG.error(
+                        String.format("Failed to load the system function 
`%s`.", functionName), t);
+            }
+        }
+        return info;
+    }
+
+    public FunctionDefinition getFunctionDefinition(UnresolvedIdentifier 
identifier) {
+        return sessionContext
+                .getSessionState()
+                .functionCatalog
+                .lookupFunction(identifier)
+                .orElseThrow(
+                        () ->
+                                new IllegalArgumentException(
+                                        String.format(
+                                                "Can not find the definition: 
%s.",
+                                                identifier.asSummaryString())))
+                .getDefinition();
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     @VisibleForTesting
@@ -226,7 +283,7 @@ public class OperationExecutor {
 
     private Set<TableInfo> listTables(
             String catalogName, String databaseName, boolean includeViews) {
-        CatalogManager catalogManager = 
getTableEnvironment().getCatalogManager();
+        CatalogManager catalogManager = 
sessionContext.getSessionState().catalogManager;
         Map<String, TableInfo> views = new HashMap<>();
         catalogManager
                 .listViews(catalogName, databaseName)
@@ -257,9 +314,9 @@ public class OperationExecutor {
     }
 
     private Set<TableInfo> listViews(String catalogName, String databaseName) {
-        CatalogManager catalogManager = 
getTableEnvironment().getCatalogManager();
         return Collections.unmodifiableSet(
-                catalogManager.listViews(catalogName, databaseName).stream()
+                
sessionContext.getSessionState().catalogManager.listViews(catalogName, 
databaseName)
+                        .stream()
                         .map(
                                 name ->
                                         new TableInfo(
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index ec370610edb..6447771a33d 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.gateway.service;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
@@ -31,8 +32,10 @@ import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.FunctionIdentifier;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.results.FunctionInfo;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.results.TableInfo;
@@ -45,7 +48,10 @@ import 
org.apache.flink.table.gateway.service.session.SessionManager;
 import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
 import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
+import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
 import org.apache.flink.table.planner.runtime.batch.sql.TestModule;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
+import org.apache.flink.table.planner.utils.TableFunc0;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.function.RunnableWithException;
@@ -73,6 +79,9 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
 import static 
org.apache.flink.core.testutils.FlinkAssertions.assertThatChainOfCauses;
+import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
+import static org.apache.flink.table.functions.FunctionKind.OTHER;
+import static org.apache.flink.table.functions.FunctionKind.SCALAR;
 import static 
org.apache.flink.table.gateway.api.results.ResultSet.ResultType.PAYLOAD;
 import static org.apache.flink.types.RowKind.DELETE;
 import static org.apache.flink.types.RowKind.INSERT;
@@ -405,6 +414,60 @@ public class SqlGatewayServiceITCase extends 
AbstractTestBase {
                 .isEqualTo(Collections.emptySet());
     }
 
+    @Test
+    public void testListSystemFunctions() {
+        SessionEnvironment environment =
+                SessionEnvironment.newBuilder()
+                        .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                        .registerCatalog("cat1", new 
GenericInMemoryCatalog("cat1"))
+                        .registerCatalog("cat2", new 
GenericInMemoryCatalog("cat2"))
+                        .build();
+        SessionHandle sessionHandle = service.openSession(environment);
+
+        assertThat(service.listSystemFunctions(sessionHandle))
+                .contains(
+                        new FunctionInfo(FunctionIdentifier.of("sin"), SCALAR),
+                        new FunctionInfo(FunctionIdentifier.of("sum"), 
AGGREGATE),
+                        new FunctionInfo(FunctionIdentifier.of("as"), OTHER));
+    }
+
+    @Test
+    public void testListUserDefinedFunctions() {
+        SessionEnvironment environment =
+                SessionEnvironment.newBuilder()
+                        .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                        .registerCatalog("cat1", new 
GenericInMemoryCatalog("cat1"))
+                        .registerCatalog("cat2", new 
GenericInMemoryCatalog("cat2"))
+                        .build();
+        SessionHandle sessionHandle = service.openSession(environment);
+        TableEnvironment tEnv =
+                
service.getSession(sessionHandle).createExecutor().getTableEnvironment();
+        tEnv.createTemporarySystemFunction(
+                "count_distinct", 
JavaUserDefinedAggFunctions.CountDistinct.class);
+        tEnv.createFunction("java1", 
JavaUserDefinedScalarFunctions.JavaFunc1.class);
+        tEnv.createTemporaryFunction("table_func0", TableFunc0.class);
+
+        // register catalog function in another catalog
+        tEnv.createFunction(
+                "cat1.default.filter_out_function", 
JavaUserDefinedScalarFunctions.JavaFunc1.class);
+
+        assertThat(
+                        service.listUserDefinedFunctions(
+                                sessionHandle, "default_catalog", 
"default_database"))
+                .contains(
+                        new 
FunctionInfo(FunctionIdentifier.of("count_distinct")),
+                        new FunctionInfo(
+                                FunctionIdentifier.of(
+                                        ObjectIdentifier.of(
+                                                "default_catalog", 
"default_database", "java1"))),
+                        new FunctionInfo(
+                                FunctionIdentifier.of(
+                                        ObjectIdentifier.of(
+                                                "default_catalog",
+                                                "default_database",
+                                                "table_func0"))));
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Concurrent tests
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 75a5be4d0ff..50e51a6ce85 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -45,7 +45,6 @@ import org.apache.flink.table.resource.ResourceUri;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -290,7 +289,50 @@ public final class FunctionCatalog {
      * functions and catalog functions in the current catalog and current 
database.
      */
     public String[] getUserDefinedFunctions() {
-        return getUserDefinedFunctionNames().toArray(new String[0]);
+        return getUserDefinedFunctions(
+                        catalogManager.getCurrentCatalog(), 
catalogManager.getCurrentDatabase())
+                .stream()
+                .map(FunctionIdentifier::getFunctionName)
+                .toArray(String[]::new);
+    }
+
+    /**
+     * Get names of all user including temp system functions, temp catalog * 
functions and catalog
+     * functions in the specified catalog and specified database.
+     */
+    public Set<FunctionIdentifier> getUserDefinedFunctions(
+            String catalogName, String databaseName) {
+        // add temp system functions
+        Set<FunctionIdentifier> result =
+                tempSystemFunctions.keySet().stream()
+                        .map(FunctionIdentifier::of)
+                        .collect(Collectors.toSet());
+
+        // add temp catalog functions
+        result.addAll(
+                tempCatalogFunctions.keySet().stream()
+                        .filter(
+                                oi ->
+                                        oi.getCatalogName().equals(catalogName)
+                                                && 
oi.getDatabaseName().equals(databaseName))
+                        .map(FunctionIdentifier::of)
+                        .collect(Collectors.toSet()));
+
+        // add catalog functions
+        Catalog catalog = catalogManager.getCatalog(catalogName).get();
+        try {
+            catalog.listFunctions(databaseName)
+                    .forEach(
+                            name ->
+                                    result.add(
+                                            FunctionIdentifier.of(
+                                                    ObjectIdentifier.of(
+                                                            catalogName, 
databaseName, name))));
+        } catch (DatabaseNotExistException e) {
+            // Ignore since there will always be a current database of the 
current catalog
+        }
+
+        return result;
     }
 
     /**
@@ -298,7 +340,13 @@ public final class FunctionCatalog {
      * functions and catalog functions in the current catalog and current 
database.
      */
     public String[] getFunctions() {
-        Set<String> result = getUserDefinedFunctionNames();
+        Set<String> result =
+                getUserDefinedFunctions(
+                                catalogManager.getCurrentCatalog(),
+                                catalogManager.getCurrentDatabase())
+                        .stream()
+                        .map(FunctionIdentifier::getFunctionName)
+                        .collect(Collectors.toSet());
 
         // add system functions
         result.addAll(moduleManager.listFunctions());
@@ -523,35 +571,6 @@ public final class FunctionCatalog {
 
     // 
--------------------------------------------------------------------------------------------
 
-    private Set<String> getUserDefinedFunctionNames() {
-
-        // add temp system functions
-        Set<String> result = new HashSet<>(tempSystemFunctions.keySet());
-
-        String currentCatalog = catalogManager.getCurrentCatalog();
-        String currentDatabase = catalogManager.getCurrentDatabase();
-
-        // add temp catalog functions
-        result.addAll(
-                tempCatalogFunctions.keySet().stream()
-                        .filter(
-                                oi ->
-                                        
oi.getCatalogName().equals(currentCatalog)
-                                                && 
oi.getDatabaseName().equals(currentDatabase))
-                        .map(ObjectIdentifier::getObjectName)
-                        .collect(Collectors.toSet()));
-
-        // add catalog functions
-        Catalog catalog = catalogManager.getCatalog(currentCatalog).get();
-        try {
-            result.addAll(catalog.listFunctions(currentDatabase));
-        } catch (DatabaseNotExistException e) {
-            // Ignore since there will always be a current database of the 
current catalog
-        }
-
-        return result;
-    }
-
     private Optional<ContextResolvedFunction> 
resolvePreciseFunctionReference(ObjectIdentifier oi) {
         // resolve order:
         // 1. Temporary functions
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionIdentifier.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionIdentifier.java
index 586bc3c14a6..f1da6cbee2a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionIdentifier.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionIdentifier.java
@@ -87,6 +87,14 @@ public final class FunctionIdentifier implements 
Serializable {
         return Optional.ofNullable(functionName);
     }
 
+    public String getFunctionName() {
+        if (objectIdentifier != null) {
+            return objectIdentifier.getObjectName();
+        } else {
+            return functionName;
+        }
+    }
+
     /** List of the component names of this function identifier. */
     public List<String> toList() {
         if (objectIdentifier != null) {

Reply via email to