This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new def2f543809 [FLINK-28631][sql-gateway][hive] Support to GetFunctions
in the HiveServer2Endpoint
def2f543809 is described below
commit def2f5438090779fa23862027de9b1c4db36b21f
Author: Shengkai <[email protected]>
AuthorDate: Sat Aug 6 22:44:01 2022 +0800
[FLINK-28631][sql-gateway][hive] Support to GetFunctions in the
HiveServer2Endpoint
This closes #20479
---
.../table/endpoint/hive/HiveServer2Endpoint.java | 23 +-
.../table/endpoint/hive/HiveServer2Schemas.java | 168 +++++++--------
.../hive/util/OperationExecutorFactory.java | 161 ++++++++++++--
.../endpoint/hive/HiveServer2EndpointITCase.java | 233 ++++++++-------------
.../flink/table/gateway/api/SqlGatewayService.java | 36 ++++
.../table/gateway/api/results/FunctionInfo.java | 83 ++++++++
.../gateway/api/utils/MockedSqlGatewayService.java | 23 ++
.../gateway/service/SqlGatewayServiceImpl.java | 40 ++++
.../gateway/service/context/SessionContext.java | 4 +
.../service/operation/OperationExecutor.java | 71 ++++++-
.../gateway/service/SqlGatewayServiceITCase.java | 63 ++++++
.../flink/table/catalog/FunctionCatalog.java | 83 +++++---
.../flink/table/functions/FunctionIdentifier.java | 8 +
13 files changed, 715 insertions(+), 281 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index edb5f1cebe9..2786c8996d9 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -126,6 +126,7 @@ import static
org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HI
import static
org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.getUsedDefaultDatabase;
import static
org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.validateAndNormalize;
import static
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetCatalogsExecutor;
+import static
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetFunctionsExecutor;
import static
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetSchemasExecutor;
import static
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetTableInfoExecutor;
import static
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetTablesExecutor;
@@ -518,7 +519,27 @@ public class HiveServer2Endpoint implements
TCLIService.Iface, SqlGatewayEndpoin
@Override
public TGetFunctionsResp GetFunctions(TGetFunctionsReq tGetFunctionsReq)
throws TException {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
+ TGetFunctionsResp resp = new TGetFunctionsResp();
+ try {
+ SessionHandle sessionHandle =
toSessionHandle(tGetFunctionsReq.getSessionHandle());
+ OperationHandle operationHandle =
+ service.submitOperation(
+ sessionHandle,
+ createGetFunctionsExecutor(
+ service,
+ sessionHandle,
+ tGetFunctionsReq.getCatalogName(),
+ tGetFunctionsReq.getSchemaName(),
+ tGetFunctionsReq.getFunctionName()));
+ resp.setStatus(OK_STATUS);
+ resp.setOperationHandle(
+ toTOperationHandle(
+ sessionHandle, operationHandle,
TOperationType.GET_FUNCTIONS));
+ } catch (Throwable t) {
+ LOG.error("Failed to GetFunctions.", t);
+ resp.setStatus(toTStatus(t));
+ }
+ return resp;
}
@Override
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
index ad7743d1be1..d1879ef9062 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
@@ -30,96 +30,98 @@ public class HiveServer2Schemas {
/** Schema for {@link HiveServer2Endpoint#GetCatalogs}. */
public static final ResolvedSchema GET_CATALOGS_SCHEMA =
- new ResolvedSchema(
- Collections.singletonList(
- Column.physical("TABLE_CAT", DataTypes.STRING())
- .withComment("Catalog name. NULL if not
applicable.")),
- Collections.emptyList(),
- null);
+ buildSchema(
+ Column.physical("TABLE_CAT", DataTypes.STRING())
+ .withComment("Catalog name. NULL if not
applicable."));
/** Schema for {@link HiveServer2Endpoint#GetSchemas}. */
public static final ResolvedSchema GET_SCHEMAS_SCHEMA =
- new ResolvedSchema(
- Arrays.asList(
- Column.physical("TABLE_SCHEMA", DataTypes.STRING())
- .withComment("Schema name. NULL if not
applicable."),
- Column.physical("TABLE_CAT", DataTypes.STRING())
- .withComment("Catalog name. NULL if not
applicable")),
- Collections.emptyList(),
- null);
+ buildSchema(
+ Column.physical("TABLE_SCHEMA", DataTypes.STRING())
+ .withComment("Schema name. NULL if not
applicable."),
+ Column.physical("TABLE_CAT", DataTypes.STRING())
+ .withComment("Catalog name. NULL if not
applicable"));
/** Schema for {@link HiveServer2Endpoint#GetTables}. */
public static final ResolvedSchema GET_TABLES_SCHEMA =
- new ResolvedSchema(
- Collections.unmodifiableList(
- Arrays.asList(
- Column.physical("TABLE_CAT",
DataTypes.STRING())
- .withComment("Catalog name. NULL
if not applicable."),
- Column.physical("TABLE_SCHEMA",
DataTypes.STRING())
- .withComment("Schema name. NULL if
not applicable."),
- Column.physical("TABLE_NAME",
DataTypes.STRING())
- .withComment("Table name. NULL if
not applicable."),
- Column.physical("TABLE_TYPE",
DataTypes.STRING())
- .withComment(
- "The table type, e.g.
\"TABLE\", \"VIEW\", etc."),
- Column.physical("REMARKS",
DataTypes.STRING())
- .withComment("Comments about the
table."),
- Column.physical("TYPE_CAT",
DataTypes.STRING())
- .withComment("The types catalog."),
- Column.physical("TYPE_SCHEM",
DataTypes.STRING())
- .withComment("The types schema."),
- Column.physical("TYPE_NAME",
DataTypes.STRING())
- .withComment("Type name."),
-
Column.physical("SELF_REFERENCING_COL_NAME", DataTypes.STRING())
- .withComment(
- "Name of the designated
\"identifier\" column of a typed table."),
- Column.physical("REF_GENERATION",
DataTypes.STRING())
- .withComment(
- "Specifies how values in
SELF_REFERENCING_COL_NAME are created."))),
- Collections.emptyList(),
- null);
+ buildSchema(
+ Column.physical("TABLE_CAT", DataTypes.STRING())
+ .withComment("Catalog name. NULL if not
applicable."),
+ Column.physical("TABLE_SCHEMA", DataTypes.STRING())
+ .withComment("Schema name. NULL if not
applicable."),
+ Column.physical("TABLE_NAME", DataTypes.STRING())
+ .withComment("Table name. NULL if not
applicable."),
+ Column.physical("TABLE_TYPE", DataTypes.STRING())
+ .withComment("The table type, e.g. \"TABLE\",
\"VIEW\", etc."),
+ Column.physical("REMARKS", DataTypes.STRING())
+ .withComment("Comments about the table."),
+ Column.physical("TYPE_CAT", DataTypes.STRING())
+ .withComment("The types catalog."),
+ Column.physical("TYPE_SCHEM", DataTypes.STRING())
+ .withComment("The types schema."),
+ Column.physical("TYPE_NAME",
DataTypes.STRING()).withComment("Type name."),
+ Column.physical("SELF_REFERENCING_COL_NAME",
DataTypes.STRING())
+ .withComment(
+ "Name of the designated \"identifier\"
column of a typed table."),
+ Column.physical("REF_GENERATION", DataTypes.STRING())
+ .withComment(
+ "Specifies how values in
SELF_REFERENCING_COL_NAME are created."));
+
+ /** Schema for {@link HiveServer2Endpoint#GetFunctions}. */
+ public static final ResolvedSchema GET_FUNCTIONS_SCHEMA =
+ buildSchema(
+ Column.physical("FUNCTION_CAT", DataTypes.STRING())
+ .withComment("Function catalog (may be null)"),
+ Column.physical("FUNCTION_SCHEM", DataTypes.STRING())
+ .withComment("Function schema (may be null)"),
+ Column.physical("FUNCTION_NAME", DataTypes.STRING())
+ .withComment(
+ "Function name. This is the name used to
invoke the function"),
+ Column.physical("REMARKS", DataTypes.STRING())
+ .withComment("Explanatory comment on the
function"),
+ Column.physical("FUNCTION_TYPE", DataTypes.INT())
+ .withComment("Kind of function."),
+ Column.physical("SPECIFIC_NAME", DataTypes.STRING())
+ .withComment(
+ "The name which uniquely identifies this
function within its schema"));
/** Schema for {@link HiveServer2Endpoint#GetTypeInfo}. */
public static final ResolvedSchema GET_TYPE_INFO_SCHEMA =
- new ResolvedSchema(
- Arrays.asList(
- Column.physical("TYPE_NAME", DataTypes.STRING())
- .withComment("Type name."),
- Column.physical("DATA_TYPE", DataTypes.INT())
- .withComment("SQL data type from
java.sql.Types."),
- Column.physical("PRECISION", DataTypes.INT())
- .withComment("Maximum precision."),
- Column.physical("LITERAL_PREFIX",
DataTypes.STRING())
- .withComment("Prefix used to quote a
literal (may be null)."),
- Column.physical("LITERAL_SUFFIX",
DataTypes.STRING())
- .withComment("Suffix used to quote a
literal (may be null)."),
- Column.physical("CREATE_PARAMS",
DataTypes.STRING())
- .withComment(
- "Parameters used in creating the
type (may be null)."),
- Column.physical("NULLABLE", DataTypes.SMALLINT())
- .withComment("Can you use NULL for this
type."),
- Column.physical("CASE_SENSITIVE",
DataTypes.BOOLEAN())
- .withComment("Is it case sensitive."),
- Column.physical("SEARCHABLE", DataTypes.SMALLINT())
- .withComment("Can you use \"WHERE\" based
on this type."),
- Column.physical("UNSIGNED_ATTRIBUTE",
DataTypes.BOOLEAN())
- .withComment("Is it unsigned."),
- Column.physical("FIXED_PREC_SCALE",
DataTypes.BOOLEAN())
- .withComment("Can it be a money value."),
- Column.physical("AUTO_INCREMENT",
DataTypes.BOOLEAN())
- .withComment("Can it be used for an
auto-increment value."),
- Column.physical("LOCAL_TYPE_NAME",
DataTypes.STRING())
- .withComment("Localized version of type
name (may be null)."),
- Column.physical("MINIMUM_SCALE",
DataTypes.SMALLINT())
- .withComment("Minimum scale supported."),
- Column.physical("MAXIMUM_SCALE",
DataTypes.SMALLINT())
- .withComment("Maximum scale supported."),
- Column.physical("SQL_DATA_TYPE", DataTypes.INT())
- .withComment("Unused."),
- Column.physical("SQL_DATETIME_SUB",
DataTypes.INT())
- .withComment("Unused."),
- Column.physical("NUM_PREC_RADIX", DataTypes.INT())
- .withComment("Usually 2 or 10.")),
- Collections.emptyList(),
- null);
+ buildSchema(
+ Column.physical("TYPE_NAME",
DataTypes.STRING()).withComment("Type name."),
+ Column.physical("DATA_TYPE", DataTypes.INT())
+ .withComment("SQL data type from java.sql.Types."),
+ Column.physical("PRECISION",
DataTypes.INT()).withComment("Maximum precision."),
+ Column.physical("LITERAL_PREFIX", DataTypes.STRING())
+ .withComment("Prefix used to quote a literal (may
be null)."),
+ Column.physical("LITERAL_SUFFIX", DataTypes.STRING())
+ .withComment("Suffix used to quote a literal (may
be null)."),
+ Column.physical("CREATE_PARAMS", DataTypes.STRING())
+ .withComment("Parameters used in creating the type
(may be null)."),
+ Column.physical("NULLABLE", DataTypes.SMALLINT())
+ .withComment("Can you use NULL for this type."),
+ Column.physical("CASE_SENSITIVE", DataTypes.BOOLEAN())
+ .withComment("Is it case sensitive."),
+ Column.physical("SEARCHABLE", DataTypes.SMALLINT())
+ .withComment("Can you use \"WHERE\" based on this
type."),
+ Column.physical("UNSIGNED_ATTRIBUTE", DataTypes.BOOLEAN())
+ .withComment("Is it unsigned."),
+ Column.physical("FIXED_PREC_SCALE", DataTypes.BOOLEAN())
+ .withComment("Can it be a money value."),
+ Column.physical("AUTO_INCREMENT", DataTypes.BOOLEAN())
+ .withComment("Can it be used for an auto-increment
value."),
+ Column.physical("LOCAL_TYPE_NAME", DataTypes.STRING())
+ .withComment("Localized version of type name (may
be null)."),
+ Column.physical("MINIMUM_SCALE", DataTypes.SMALLINT())
+ .withComment("Minimum scale supported."),
+ Column.physical("MAXIMUM_SCALE", DataTypes.SMALLINT())
+ .withComment("Maximum scale supported."),
+ Column.physical("SQL_DATA_TYPE",
DataTypes.INT()).withComment("Unused."),
+ Column.physical("SQL_DATETIME_SUB",
DataTypes.INT()).withComment("Unused."),
+ Column.physical("NUM_PREC_RADIX", DataTypes.INT())
+ .withComment("Usually 2 or 10."));
+
+ private static ResolvedSchema buildSchema(Column... columns) {
+ return new ResolvedSchema(Arrays.asList(columns),
Collections.emptyList(), null);
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
index 44095b1c24a..d6397700e13 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
@@ -19,9 +19,23 @@
package org.apache.flink.table.endpoint.hive.util;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.AggregateFunctionDefinition;
+import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.functions.ScalarFunctionDefinition;
+import org.apache.flink.table.functions.TableAggregateFunctionDefinition;
+import org.apache.flink.table.functions.TableFunctionDefinition;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.functions.hive.HiveFunction;
import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.results.FunctionInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.results.TableInfo;
import org.apache.flink.table.gateway.api.session.SessionHandle;
@@ -30,8 +44,10 @@ import org.apache.hadoop.hive.serde2.thrift.Type;
import javax.annotation.Nullable;
+import java.sql.DatabaseMetaData;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -41,6 +57,7 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_CATALOGS_SCHEMA;
+import static
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_FUNCTIONS_SCHEMA;
import static
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_SCHEMAS_SCHEMA;
import static
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_TABLES_SCHEMA;
import static
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_TYPE_INFO_SCHEMA;
@@ -94,11 +111,20 @@ public class OperationExecutorFactory {
service, sessionHandle, catalogName, schemaName,
tableName, tableKinds);
}
+ public static Callable<ResultSet> createGetFunctionsExecutor(
+ SqlGatewayService service,
+ SessionHandle sessionHandle,
+ @Nullable String catalogName,
+ @Nullable String databasePattern,
+ @Nullable String functionNamePattern) {
+ return () ->
+ executeGetFunctions(
+ service, sessionHandle, catalogName, databasePattern,
functionNamePattern);
+ }
+
public static Callable<ResultSet> createGetTableInfoExecutor() {
return () ->
- new ResultSet(
- EOS,
- null,
+ buildResultSet(
GET_TYPE_INFO_SCHEMA,
getSupportedHiveType().stream()
.map(
@@ -134,9 +160,7 @@ public class OperationExecutorFactory {
private static ResultSet executeGetCatalogs(
SqlGatewayService service, SessionHandle sessionHandle) {
Set<String> catalogNames = service.listCatalogs(sessionHandle);
- return new ResultSet(
- EOS,
- null,
+ return buildResultSet(
GET_CATALOGS_SCHEMA,
catalogNames.stream()
.map(OperationExecutorFactory::wrap)
@@ -152,9 +176,7 @@ public class OperationExecutorFactory {
isNullOrEmpty(catalogName) ?
service.getCurrentCatalog(sessionHandle) : catalogName;
Set<String> databaseNames =
filter(service.listDatabases(sessionHandle,
specifiedCatalogName), schemaName);
- return new ResultSet(
- EOS,
- null,
+ return buildResultSet(
GET_SCHEMAS_SCHEMA,
databaseNames.stream()
.map(name -> wrap(name, specifiedCatalogName))
@@ -180,9 +202,7 @@ public class OperationExecutorFactory {
candidate ->
candidate.getIdentifier().getObjectName(),
tableName));
}
- return new ResultSet(
- EOS,
- null,
+ return buildResultSet(
GET_TABLES_SCHEMA,
tableInfos.stream()
.map(
@@ -203,6 +223,77 @@ public class OperationExecutorFactory {
.collect(Collectors.toList()));
}
+ private static ResultSet executeGetFunctions(
+ SqlGatewayService service,
+ SessionHandle sessionHandle,
+ @Nullable String catalogName,
+ @Nullable String databasePattern,
+ @Nullable String functionNamePattern) {
+ String specifiedCatalogName =
+ isNullOrEmpty(catalogName) ?
service.getCurrentCatalog(sessionHandle) : catalogName;
+
+ Set<FunctionInfo> candidates = new HashSet<>();
+ // Add user defined functions
+ for (String databaseName :
+ filter(
+ service.listDatabases(sessionHandle,
specifiedCatalogName),
+ databasePattern)) {
+ candidates.addAll(
+ service.listUserDefinedFunctions(
+ sessionHandle, specifiedCatalogName,
databaseName));
+ }
+ // Add system functions
+ if (isNullOrEmpty(catalogName) && isNullOrEmpty(databasePattern)) {
+ candidates.addAll(service.listSystemFunctions(sessionHandle));
+ }
+ // Filter out unmatched functions
+ Set<FunctionInfo> matchedFunctions =
+ filter(
+ candidates,
+ candidate ->
candidate.getIdentifier().getFunctionName(),
+ functionNamePattern);
+ return buildResultSet(
+ GET_FUNCTIONS_SCHEMA,
+ // Sort
+ matchedFunctions.stream()
+ .sorted(
+ Comparator.comparing(
+ info ->
+ info.getIdentifier()
+ .asSummaryString()
+ .toLowerCase()))
+ .map(
+ info ->
+ wrap(
+ info.getIdentifier()
+ .getIdentifier()
+
.map(ObjectIdentifier::getCatalogName)
+ .orElse(null), //
FUNCTION_CAT
+ info.getIdentifier()
+ .getIdentifier()
+
.map(ObjectIdentifier::getDatabaseName)
+ .orElse(null), //
FUNCTION_SCHEM
+ info.getIdentifier()
+ .getFunctionName(), //
FUNCTION_NAME
+ "", // REMARKS
+ info.getKind()
+ .map(
+
OperationExecutorFactory
+
::toFunctionResult)
+ .orElse(
+
DatabaseMetaData
+
.functionResultUnknown), // FUNCTION_TYPE
+ // TODO: remove until Catalog
listFunctions return
+ // CatalogFunction
+ getFunctionName(
+
service.getFunctionDefinition(
+ sessionHandle,
+
UnresolvedIdentifier.of(
+
info.getIdentifier()
+
.toList()))))) // SPECIFIC_NAME
+ .collect(Collectors.toList()));
+ }
+
//
--------------------------------------------------------------------------------------------
// Utilities
//
--------------------------------------------------------------------------------------------
@@ -270,6 +361,10 @@ public class OperationExecutorFactory {
return GenericRowData.of(pack);
}
+ private static ResultSet buildResultSet(ResolvedSchema schema,
List<RowData> data) {
+ return new ResultSet(EOS, null, schema, data);
+ }
+
private static List<Type> getSupportedHiveType() {
return Collections.unmodifiableList(
Arrays.asList(
@@ -294,4 +389,46 @@ public class OperationExecutorFactory {
INTERVAL_YEAR_MONTH_TYPE,
INTERVAL_DAY_TIME_TYPE));
}
+
+ private static int toFunctionResult(FunctionKind kind) {
+ switch (kind) {
+ case SCALAR:
+ case AGGREGATE:
+ return DatabaseMetaData.functionNoTable;
+ case TABLE:
+ case ASYNC_TABLE:
+ case TABLE_AGGREGATE:
+ return DatabaseMetaData.functionReturnsTable;
+ case OTHER:
+ return DatabaseMetaData.functionResultUnknown;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unknown function kind: %s.", kind));
+ }
+ }
+
+ private static String getFunctionName(FunctionDefinition definition) {
+ if (definition instanceof HiveFunction) {
+ return ((HiveFunction<?>)
definition).getFunctionWrapper().getUDFClassName();
+ } else if (definition instanceof UserDefinedFunction) {
+ return ((UserDefinedFunction) definition).functionIdentifier();
+ } else if (definition instanceof TableFunctionDefinition) {
+ return ((TableFunctionDefinition)
definition).getTableFunction().functionIdentifier();
+ } else if (definition instanceof ScalarFunctionDefinition) {
+ return ((ScalarFunctionDefinition)
definition).getScalarFunction().functionIdentifier();
+ } else if (definition instanceof AggregateFunctionDefinition) {
+ return ((AggregateFunctionDefinition) definition)
+ .getAggregateFunction()
+ .functionIdentifier();
+ } else if (definition instanceof TableAggregateFunctionDefinition) {
+ return ((TableAggregateFunctionDefinition) definition)
+ .getTableAggregateFunction()
+ .functionIdentifier();
+ } else if (definition instanceof BuiltInFunctionDefinition) {
+ BuiltInFunctionDefinition builtIn = (BuiltInFunctionDefinition)
definition;
+ return
builtIn.getRuntimeClass().orElse(definition.getClass().getCanonicalName());
+ } else {
+ return definition.getClass().getCanonicalName();
+ }
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index 9a82e1d5910..ab02e770e36 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -36,6 +36,7 @@ import
org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.service.session.SessionManager;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
+import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.BiConsumerWithException;
@@ -250,116 +251,16 @@ public class HiveServer2EndpointITCase extends
TestLogger {
new String[] {"MANAGED_TABLE",
"VIRTUAL_VIEW"}),
getExpectedGetTablesOperationSchema(),
Arrays.asList(
- Arrays.asList(
- "default_catalog",
- "db_test1",
- "tbl_1",
- "TABLE",
- "",
- null,
- null,
- null,
- null,
- null),
- Arrays.asList(
- "default_catalog",
- "db_test1",
- "tbl_2",
- "TABLE",
- "",
- null,
- null,
- null,
- null,
- null),
- Arrays.asList(
- "default_catalog",
- "db_test1",
- "tbl_3",
- "VIEW",
- "",
- null,
- null,
- null,
- null,
- null),
- Arrays.asList(
- "default_catalog",
- "db_test1",
- "tbl_4",
- "VIEW",
- "",
- null,
- null,
- null,
- null,
- null),
- Arrays.asList(
- "default_catalog",
- "db_test2",
- "tbl_1",
- "TABLE",
- "",
- null,
- null,
- null,
- null,
- null),
- Arrays.asList(
- "default_catalog",
- "db_test2",
- "diff_1",
- "TABLE",
- "",
- null,
- null,
- null,
- null,
- null),
- Arrays.asList(
- "default_catalog",
- "db_test2",
- "tbl_2",
- "VIEW",
- "",
- null,
- null,
- null,
- null,
- null),
- Arrays.asList(
- "default_catalog",
- "db_test2",
- "diff_2",
- "VIEW",
- "",
- null,
- null,
- null,
- null,
- null),
- Arrays.asList(
- "default_catalog",
- "db_diff",
- "tbl_1",
- "TABLE",
- "",
- null,
- null,
- null,
- null,
- null),
- Arrays.asList(
- "default_catalog",
- "db_diff",
- "tbl_2",
- "VIEW",
- "",
- null,
- null,
- null,
- null,
- null)));
+ Arrays.asList("default_catalog", "db_test1", "tbl_1",
"TABLE", ""),
+ Arrays.asList("default_catalog", "db_test1", "tbl_2",
"TABLE", ""),
+ Arrays.asList("default_catalog", "db_test1", "tbl_3",
"VIEW", ""),
+ Arrays.asList("default_catalog", "db_test1", "tbl_4",
"VIEW", ""),
+ Arrays.asList("default_catalog", "db_test2", "tbl_1",
"TABLE", ""),
+ Arrays.asList("default_catalog", "db_test2", "diff_1",
"TABLE", ""),
+ Arrays.asList("default_catalog", "db_test2", "tbl_2",
"VIEW", ""),
+ Arrays.asList("default_catalog", "db_test2", "diff_2",
"VIEW", ""),
+ Arrays.asList("default_catalog", "db_diff", "tbl_1",
"TABLE", ""),
+ Arrays.asList("default_catalog", "db_diff", "tbl_2",
"VIEW", "")));
}
@Test
@@ -375,39 +276,9 @@ public class HiveServer2EndpointITCase extends TestLogger {
new String[] {"VIRTUAL_VIEW"}),
getExpectedGetTablesOperationSchema(),
Arrays.asList(
- Arrays.asList(
- "default_catalog",
- "db_test1",
- "tbl_3",
- "VIEW",
- "",
- null,
- null,
- null,
- null,
- null),
- Arrays.asList(
- "default_catalog",
- "db_test1",
- "tbl_4",
- "VIEW",
- "",
- null,
- null,
- null,
- null,
- null),
- Arrays.asList(
- "default_catalog",
- "db_test2",
- "tbl_2",
- "VIEW",
- "",
- null,
- null,
- null,
- null,
- null)));
+ Arrays.asList("default_catalog", "db_test1", "tbl_3",
"VIEW", ""),
+ Arrays.asList("default_catalog", "db_test1", "tbl_4",
"VIEW", ""),
+ Arrays.asList("default_catalog", "db_test2", "tbl_2",
"VIEW", "")));
}
@Test
@@ -444,6 +315,70 @@ public class HiveServer2EndpointITCase extends TestLogger {
"INTERVAL_DAY_TIME")));
}
+ @Test
+ public void testGetFunctions() throws Exception {
+ runGetObjectTest(
+ connection -> connection.getMetaData().getFunctions(null,
null, ".*"),
+ ResolvedSchema.of(
+ Column.physical("FUNCTION_CAT", DataTypes.STRING()),
+ Column.physical("FUNCTION_SCHEM", DataTypes.STRING()),
+ Column.physical("FUNCTION_NAME", DataTypes.STRING()),
+ Column.physical("REMARKS", DataTypes.STRING()),
+ Column.physical("FUNCTION_TYPE", DataTypes.INT()),
+ Column.physical("SPECIFIC_NAME", DataTypes.STRING())),
+ compactedResult ->
+ assertThat(compactedResult)
+ .contains(
+ Arrays.asList(
+ "withColumns",
+ "",
+ 0,
+
"org.apache.flink.table.functions.BuiltInFunctionDefinition"),
+ Arrays.asList(
+ "bin",
+ "",
+ 1,
+
"org.apache.hadoop.hive.ql.udf.UDFBin"),
+ Arrays.asList(
+ "parse_url_tuple",
+ "",
+ 2,
+
"org.apache.hadoop.hive.ql.udf.generic.GenericUDTFParseUrlTuple")));
+ }
+
+ @Test
+ public void testGetFunctionWithPattern() throws Exception {
+ runGetObjectTest(
+ connection -> {
+ try (Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "CREATE FUNCTION
`default_catalog`.`db_test2`.`my_abs` as '%s'",
+ JavaFunc0.class.getName()));
+ statement.execute(
+ String.format(
+ "CREATE FUNCTION
`default_catalog`.`db_diff`.`your_abs` as '%s'",
+ JavaFunc0.class.getName()));
+ }
+ return
connection.getMetaData().getFunctions("default_catalog", "db.*", "my.*");
+ },
+ ResolvedSchema.of(
+ Column.physical("FUNCTION_CAT", DataTypes.STRING()),
+ Column.physical("FUNCTION_SCHEM", DataTypes.STRING()),
+ Column.physical("FUNCTION_NAME", DataTypes.STRING()),
+ Column.physical("REMARKS", DataTypes.STRING()),
+ Column.physical("FUNCTION_TYPE", DataTypes.INT()),
+ Column.physical("SPECIFIC_NAME", DataTypes.STRING())),
+ Collections.singletonList(
+ Arrays.asList(
+ "default_catalog",
+ "db_test2",
+ "my_abs",
+ "",
+ 0,
+ JavaFunc0.class.getName())));
+ }
+
@Test
public void testGetInfo() throws Exception {
try (Connection connection = ENDPOINT_EXTENSION.getConnection()) {
@@ -508,7 +443,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
try (Connection connection = getInitializedConnection();
java.sql.ResultSet result =
resultSetSupplier.apply(connection)) {
assertSchemaEquals(expectedSchema, result.getMetaData());
- validator.accept(collect(result, expectedSchema.getColumnCount()));
+ validator.accept(collectAndCompact(result,
expectedSchema.getColumnCount()));
}
}
@@ -608,12 +543,18 @@ public class HiveServer2EndpointITCase extends TestLogger
{
}
}
- private Set<List<Object>> collect(java.sql.ResultSet result, int
columnCount) throws Exception {
+ private Set<List<Object>> collectAndCompact(java.sql.ResultSet result, int
columnCount)
+ throws Exception {
List<List<Object>> actual = new ArrayList<>();
while (result.next()) {
List<Object> row = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
- row.add(result.getObject(i));
+ Object value = result.getObject(i);
+ // ignore the null value for better presentation
+ if (value == null) {
+ continue;
+ }
+ row.add(value);
}
actual.add(row);
}
diff --git
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
index 1fd7e339c8d..f12c44250c4 100644
---
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
+++
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -22,10 +22,13 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
import org.apache.flink.table.gateway.api.results.FetchOrientation;
+import org.apache.flink.table.gateway.api.results.FunctionInfo;
import org.apache.flink.table.gateway.api.results.GatewayInfo;
import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
@@ -234,6 +237,39 @@ public interface SqlGatewayService {
Set<TableKind> tableKinds)
throws SqlGatewayException;
+ /**
+ * List all user defined functions.
+ *
+ * @param sessionHandle handle to identify the session.
+ * @param catalogName name string of the given catalog.
+ * @param databaseName name string of the given database.
+ * @return user defined functions info.
+ */
+ Set<FunctionInfo> listUserDefinedFunctions(
+ SessionHandle sessionHandle, String catalogName, String
databaseName)
+ throws SqlGatewayException;
+
+ /**
+ * List all available system functions.
+ *
+ * @param sessionHandle handle to identify the session.
+ * @return system functions info.
+ */
+ Set<FunctionInfo> listSystemFunctions(SessionHandle sessionHandle) throws
SqlGatewayException;
+
+ /**
+ * Get the specific definition of the function. If the input identifier
only contains the
+ * function name, it is resolved with the order of the temporary system
function, system
+ * function, temporary function and catalog function.
+ *
+ * @param sessionHandle handle to identify the session.
+ * @param functionIdentifier identifier of the function.
+ * @return the definition of the function.
+ */
+ FunctionDefinition getFunctionDefinition(
+ SessionHandle sessionHandle, UnresolvedIdentifier
functionIdentifier)
+ throws SqlGatewayException;
+
//
-------------------------------------------------------------------------------------------
// Utilities
//
-------------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/FunctionInfo.java
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/FunctionInfo.java
new file mode 100644
index 00000000000..7975235f04c
--- /dev/null
+++
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/FunctionInfo.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.results;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.table.functions.FunctionKind;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Info to describe the function. It is not equivalent to the {@link
FunctionDefinition} that needs
+ * to load the implementation, which may require to download the jar from the
remote to the local
+ * machine and load the classes. Comparing to the {@link FunctionDefinition},
the {@link
+ * FunctionInfo} return the available information in the current state, which
is much lighter.
+ */
+@PublicEvolving
+public class FunctionInfo {
+
+ /** Identifier of the function. */
+ private final FunctionIdentifier identifier;
+ /** Kind of the function. If the value is null, it means kind of the
function is unresolved. */
+ private final @Nullable FunctionKind kind;
+
+ public FunctionInfo(FunctionIdentifier identifier) {
+ this(identifier, null);
+ }
+
+ public FunctionInfo(FunctionIdentifier identifier, @Nullable FunctionKind
kind) {
+ this.identifier = identifier;
+ this.kind = kind;
+ }
+
+ public FunctionIdentifier getIdentifier() {
+ return identifier;
+ }
+
+ public Optional<FunctionKind> getKind() {
+ return Optional.ofNullable(kind);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof FunctionInfo)) {
+ return false;
+ }
+ FunctionInfo that = (FunctionInfo) o;
+ return Objects.equals(identifier, that.identifier) && kind ==
that.kind;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(identifier, kind);
+ }
+
+ @Override
+ public String toString() {
+ return "FunctionInfo{identifier=" + identifier + ", kind=" + kind +
'}';
+ }
+}
diff --git
a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
index 8f0b914c223..8508edbd23a 100644
---
a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
+++
b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
@@ -21,10 +21,13 @@ package org.apache.flink.table.gateway.api.utils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.FetchOrientation;
+import org.apache.flink.table.gateway.api.results.FunctionInfo;
import org.apache.flink.table.gateway.api.results.GatewayInfo;
import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
@@ -143,6 +146,26 @@ public class MockedSqlGatewayService implements
SqlGatewayService {
throw new UnsupportedOperationException();
}
+ @Override
+ public Set<FunctionInfo> listUserDefinedFunctions(
+ SessionHandle sessionHandle, String catalogName, String
databaseName)
+ throws SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<FunctionInfo> listSystemFunctions(SessionHandle sessionHandle)
+ throws SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FunctionDefinition getFunctionDefinition(
+ SessionHandle sessionHandle, UnresolvedIdentifier
functionIdentifier)
+ throws SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public GatewayInfo getGatewayInfo() {
throw new UnsupportedOperationException();
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index 3b39430301b..8114fc5b79a 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -22,10 +22,13 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.FetchOrientation;
+import org.apache.flink.table.gateway.api.results.FunctionInfo;
import org.apache.flink.table.gateway.api.results.GatewayInfo;
import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
@@ -252,6 +255,43 @@ public class SqlGatewayServiceImpl implements
SqlGatewayService {
}
}
+ @Override
+ public Set<FunctionInfo> listUserDefinedFunctions(
+ SessionHandle sessionHandle, String catalogName, String
databaseName)
+ throws SqlGatewayException {
+ try {
+ return getSession(sessionHandle)
+ .createExecutor()
+ .listUserDefinedFunctions(catalogName, databaseName);
+ } catch (Throwable t) {
+ LOG.error("Failed to listUserDefinedFunctions.", t);
+ throw new SqlGatewayException("Failed to
listUserDefinedFunctions.", t);
+ }
+ }
+
+ public Set<FunctionInfo> listSystemFunctions(SessionHandle sessionHandle) {
+ try {
+ return
getSession(sessionHandle).createExecutor().listSystemFunctions();
+ } catch (Throwable t) {
+ LOG.error("Failed to listSystemFunctions.", t);
+ throw new SqlGatewayException("Failed to listSystemFunctions.", t);
+ }
+ }
+
+ @Override
+ public FunctionDefinition getFunctionDefinition(
+ SessionHandle sessionHandle, UnresolvedIdentifier
functionIdentifier)
+ throws SqlGatewayException {
+ try {
+ return getSession(sessionHandle)
+ .createExecutor()
+ .getFunctionDefinition(functionIdentifier);
+ } catch (Throwable t) {
+ LOG.error("Failed to getFunctionDefinition.", t);
+ throw new SqlGatewayException("Failed to getFunctionDefinition.",
t);
+ }
+ }
+
@Override
public GatewayInfo getGatewayInfo() {
return GatewayInfo.INSTANCE;
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
index bb70e8947c1..b949d299d22 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
@@ -120,6 +120,10 @@ public class SessionContext {
return endpointVersion;
}
+ public SessionState getSessionState() {
+ return sessionState;
+ }
+
public void set(String key, String value) {
try {
// Test whether the key value will influence the creation of the
Executor.
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index 20e04ac3b75..87c4661f24d 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -29,10 +29,14 @@ import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.FunctionInfo;
import org.apache.flink.table.gateway.api.results.TableInfo;
import org.apache.flink.table.gateway.service.context.SessionContext;
import org.apache.flink.table.gateway.service.result.ResultFetcher;
@@ -46,6 +50,9 @@ import
org.apache.flink.table.operations.StatementSetOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -65,6 +72,8 @@ import static
org.apache.flink.util.Preconditions.checkArgument;
/** An executor to execute the {@link Operation}. */
public class OperationExecutor {
+ private static final Logger LOG =
LoggerFactory.getLogger(OperationExecutor.class);
+
private final SessionContext sessionContext;
private final Configuration executionConfig;
@@ -111,17 +120,18 @@ public class OperationExecutor {
}
public String getCurrentCatalog() {
- return getTableEnvironment().getCatalogManager().getCurrentCatalog();
+ return
sessionContext.getSessionState().catalogManager.getCurrentCatalog();
}
public Set<String> listCatalogs() {
- return getTableEnvironment().getCatalogManager().listCatalogs();
+ return sessionContext.getSessionState().catalogManager.listCatalogs();
}
public Set<String> listDatabases(String catalogName) {
return new HashSet<>(
- getTableEnvironment()
- .getCatalogManager()
+ sessionContext
+ .getSessionState()
+ .catalogManager
.getCatalog(catalogName)
.orElseThrow(
() ->
@@ -146,6 +156,53 @@ public class OperationExecutor {
}
}
+ public Set<FunctionInfo> listUserDefinedFunctions(String catalogName,
String databaseName) {
+ return sessionContext.getSessionState().functionCatalog
+ .getUserDefinedFunctions(catalogName, databaseName).stream()
+ // Load the CatalogFunction from the remote catalog is time
wasted. Set the
+ // FunctionKind null.
+ .map(FunctionInfo::new)
+ .collect(Collectors.toSet());
+ }
+
+ public Set<FunctionInfo> listSystemFunctions() {
+ Set<FunctionInfo> info = new HashSet<>();
+ for (String functionName :
sessionContext.getSessionState().moduleManager.listFunctions()) {
+ try {
+ info.add(
+ sessionContext
+ .getSessionState()
+ .moduleManager
+ .getFunctionDefinition(functionName)
+ .map(
+ definition ->
+ new FunctionInfo(
+
FunctionIdentifier.of(functionName),
+ definition.getKind()))
+ .orElse(new
FunctionInfo(FunctionIdentifier.of(functionName))));
+ } catch (Throwable t) {
+ // Failed to load the function. Ignore.
+ LOG.error(
+ String.format("Failed to load the system function
`%s`.", functionName), t);
+ }
+ }
+ return info;
+ }
+
+ public FunctionDefinition getFunctionDefinition(UnresolvedIdentifier
identifier) {
+ return sessionContext
+ .getSessionState()
+ .functionCatalog
+ .lookupFunction(identifier)
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ String.format(
+ "Can not find the definition:
%s.",
+ identifier.asSummaryString())))
+ .getDefinition();
+ }
+
//
--------------------------------------------------------------------------------------------
@VisibleForTesting
@@ -226,7 +283,7 @@ public class OperationExecutor {
private Set<TableInfo> listTables(
String catalogName, String databaseName, boolean includeViews) {
- CatalogManager catalogManager =
getTableEnvironment().getCatalogManager();
+ CatalogManager catalogManager =
sessionContext.getSessionState().catalogManager;
Map<String, TableInfo> views = new HashMap<>();
catalogManager
.listViews(catalogName, databaseName)
@@ -257,9 +314,9 @@ public class OperationExecutor {
}
private Set<TableInfo> listViews(String catalogName, String databaseName) {
- CatalogManager catalogManager =
getTableEnvironment().getCatalogManager();
return Collections.unmodifiableSet(
- catalogManager.listViews(catalogName, databaseName).stream()
+
sessionContext.getSessionState().catalogManager.listViews(catalogName,
databaseName)
+ .stream()
.map(
name ->
new TableInfo(
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index ec370610edb..6447771a33d 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.gateway.service;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
@@ -31,8 +32,10 @@ import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.results.FunctionInfo;
import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.results.TableInfo;
@@ -45,7 +48,10 @@ import
org.apache.flink.table.gateway.service.session.SessionManager;
import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
+import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.runtime.batch.sql.TestModule;
+import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
+import org.apache.flink.table.planner.utils.TableFunc0;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.function.RunnableWithException;
@@ -73,6 +79,9 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static
org.apache.flink.core.testutils.FlinkAssertions.assertThatChainOfCauses;
+import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
+import static org.apache.flink.table.functions.FunctionKind.OTHER;
+import static org.apache.flink.table.functions.FunctionKind.SCALAR;
import static
org.apache.flink.table.gateway.api.results.ResultSet.ResultType.PAYLOAD;
import static org.apache.flink.types.RowKind.DELETE;
import static org.apache.flink.types.RowKind.INSERT;
@@ -405,6 +414,60 @@ public class SqlGatewayServiceITCase extends
AbstractTestBase {
.isEqualTo(Collections.emptySet());
}
+ @Test
+ public void testListSystemFunctions() {
+ SessionEnvironment environment =
+ SessionEnvironment.newBuilder()
+ .setSessionEndpointVersion(MockedEndpointVersion.V1)
+ .registerCatalog("cat1", new
GenericInMemoryCatalog("cat1"))
+ .registerCatalog("cat2", new
GenericInMemoryCatalog("cat2"))
+ .build();
+ SessionHandle sessionHandle = service.openSession(environment);
+
+ assertThat(service.listSystemFunctions(sessionHandle))
+ .contains(
+ new FunctionInfo(FunctionIdentifier.of("sin"), SCALAR),
+ new FunctionInfo(FunctionIdentifier.of("sum"),
AGGREGATE),
+ new FunctionInfo(FunctionIdentifier.of("as"), OTHER));
+ }
+
+ @Test
+ public void testListUserDefinedFunctions() {
+ SessionEnvironment environment =
+ SessionEnvironment.newBuilder()
+ .setSessionEndpointVersion(MockedEndpointVersion.V1)
+ .registerCatalog("cat1", new
GenericInMemoryCatalog("cat1"))
+ .registerCatalog("cat2", new
GenericInMemoryCatalog("cat2"))
+ .build();
+ SessionHandle sessionHandle = service.openSession(environment);
+ TableEnvironment tEnv =
+
service.getSession(sessionHandle).createExecutor().getTableEnvironment();
+ tEnv.createTemporarySystemFunction(
+ "count_distinct",
JavaUserDefinedAggFunctions.CountDistinct.class);
+ tEnv.createFunction("java1",
JavaUserDefinedScalarFunctions.JavaFunc1.class);
+ tEnv.createTemporaryFunction("table_func0", TableFunc0.class);
+
+ // register catalog function in another catalog
+ tEnv.createFunction(
+ "cat1.default.filter_out_function",
JavaUserDefinedScalarFunctions.JavaFunc1.class);
+
+ assertThat(
+ service.listUserDefinedFunctions(
+ sessionHandle, "default_catalog",
"default_database"))
+ .contains(
+ new
FunctionInfo(FunctionIdentifier.of("count_distinct")),
+ new FunctionInfo(
+ FunctionIdentifier.of(
+ ObjectIdentifier.of(
+ "default_catalog",
"default_database", "java1"))),
+ new FunctionInfo(
+ FunctionIdentifier.of(
+ ObjectIdentifier.of(
+ "default_catalog",
+ "default_database",
+ "table_func0"))));
+ }
+
//
--------------------------------------------------------------------------------------------
// Concurrent tests
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 75a5be4d0ff..50e51a6ce85 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -45,7 +45,6 @@ import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.util.Preconditions;
import java.util.Collections;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -290,7 +289,50 @@ public final class FunctionCatalog {
* functions and catalog functions in the current catalog and current
database.
*/
public String[] getUserDefinedFunctions() {
- return getUserDefinedFunctionNames().toArray(new String[0]);
+ return getUserDefinedFunctions(
+ catalogManager.getCurrentCatalog(),
catalogManager.getCurrentDatabase())
+ .stream()
+ .map(FunctionIdentifier::getFunctionName)
+ .toArray(String[]::new);
+ }
+
+ /**
+ * Get names of all user including temp system functions, temp catalog *
functions and catalog
+ * functions in the specified catalog and specified database.
+ */
+ public Set<FunctionIdentifier> getUserDefinedFunctions(
+ String catalogName, String databaseName) {
+ // add temp system functions
+ Set<FunctionIdentifier> result =
+ tempSystemFunctions.keySet().stream()
+ .map(FunctionIdentifier::of)
+ .collect(Collectors.toSet());
+
+ // add temp catalog functions
+ result.addAll(
+ tempCatalogFunctions.keySet().stream()
+ .filter(
+ oi ->
+ oi.getCatalogName().equals(catalogName)
+ &&
oi.getDatabaseName().equals(databaseName))
+ .map(FunctionIdentifier::of)
+ .collect(Collectors.toSet()));
+
+ // add catalog functions
+ Catalog catalog = catalogManager.getCatalog(catalogName).get();
+ try {
+ catalog.listFunctions(databaseName)
+ .forEach(
+ name ->
+ result.add(
+ FunctionIdentifier.of(
+ ObjectIdentifier.of(
+ catalogName,
databaseName, name))));
+ } catch (DatabaseNotExistException e) {
+ // Ignore since there will always be a current database of the
current catalog
+ }
+
+ return result;
}
/**
@@ -298,7 +340,13 @@ public final class FunctionCatalog {
* functions and catalog functions in the current catalog and current
database.
*/
public String[] getFunctions() {
- Set<String> result = getUserDefinedFunctionNames();
+ Set<String> result =
+ getUserDefinedFunctions(
+ catalogManager.getCurrentCatalog(),
+ catalogManager.getCurrentDatabase())
+ .stream()
+ .map(FunctionIdentifier::getFunctionName)
+ .collect(Collectors.toSet());
// add system functions
result.addAll(moduleManager.listFunctions());
@@ -523,35 +571,6 @@ public final class FunctionCatalog {
//
--------------------------------------------------------------------------------------------
- private Set<String> getUserDefinedFunctionNames() {
-
- // add temp system functions
- Set<String> result = new HashSet<>(tempSystemFunctions.keySet());
-
- String currentCatalog = catalogManager.getCurrentCatalog();
- String currentDatabase = catalogManager.getCurrentDatabase();
-
- // add temp catalog functions
- result.addAll(
- tempCatalogFunctions.keySet().stream()
- .filter(
- oi ->
-
oi.getCatalogName().equals(currentCatalog)
- &&
oi.getDatabaseName().equals(currentDatabase))
- .map(ObjectIdentifier::getObjectName)
- .collect(Collectors.toSet()));
-
- // add catalog functions
- Catalog catalog = catalogManager.getCatalog(currentCatalog).get();
- try {
- result.addAll(catalog.listFunctions(currentDatabase));
- } catch (DatabaseNotExistException e) {
- // Ignore since there will always be a current database of the
current catalog
- }
-
- return result;
- }
-
private Optional<ContextResolvedFunction>
resolvePreciseFunctionReference(ObjectIdentifier oi) {
// resolve order:
// 1. Temporary functions
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionIdentifier.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionIdentifier.java
index 586bc3c14a6..f1da6cbee2a 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionIdentifier.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionIdentifier.java
@@ -87,6 +87,14 @@ public final class FunctionIdentifier implements
Serializable {
return Optional.ofNullable(functionName);
}
+ public String getFunctionName() {
+ if (objectIdentifier != null) {
+ return objectIdentifier.getObjectName();
+ } else {
+ return functionName;
+ }
+ }
+
/** List of the component names of this function identifier. */
public List<String> toList() {
if (objectIdentifier != null) {