This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 5494e91ddb6 [FLINK-38198] Support WITH Clause in CREATE FUNCTION Statement (#26874) 5494e91ddb6 is described below commit 5494e91ddb6d43d6501f6f812afcb6dc8624e089 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Mon Aug 11 09:33:01 2025 +0200 [FLINK-38198] Support WITH Clause in CREATE FUNCTION Statement (#26874) --- docs/content.zh/docs/dev/table/sql/create.md | 1 + docs/content/docs/dev/table/sql/create.md | 1 + flink-python/pyflink/table/catalog.py | 10 ++ .../src/main/codegen/includes/parserImpls.ftl | 8 +- .../flink/sql/parser/ddl/SqlCreateFunction.java | 23 ++- .../flink/sql/parser/FlinkSqlParserImplTest.java | 14 ++ .../apache/flink/table/api/FunctionDescriptor.java | 169 ++++++++++++++++++ .../apache/flink/table/api/TableEnvironment.java | 63 +++++++ .../table/api/internal/TableEnvironmentImpl.java | 51 +++++- .../flink/table/catalog/CatalogFunctionImpl.java | 37 +++- .../table/catalog/ContextResolvedFunction.java | 13 +- .../flink/table/catalog/FunctionCatalog.java | 68 ++++---- .../ddl/CreateTempSystemFunctionOperation.java | 5 +- .../flink/table/catalog/FunctionCatalogTest.java | 191 +++++++++++++++------ .../flink/table/resource/ResourceManagerTest.java | 15 +- .../flink/table/catalog/CatalogFunction.java | 7 + .../operations/SqlNodeToOperationConversion.java | 13 +- .../nodes/exec/serde/RexNodeJsonSerializer.java | 16 +- .../operations/SqlDdlToOperationConverterTest.java | 2 +- ...erializedTableNodeToOperationConverterTest.java | 3 +- .../nodes/exec/serde/RexNodeJsonSerdeTest.java | 120 +++++++++++-- 21 files changed, 699 insertions(+), 131 deletions(-) diff --git a/docs/content.zh/docs/dev/table/sql/create.md b/docs/content.zh/docs/dev/table/sql/create.md index 3c18e3e0128..779ea6e6e72 100644 --- a/docs/content.zh/docs/dev/table/sql/create.md +++ b/docs/content.zh/docs/dev/table/sql/create.md @@ -863,6 +863,7 @@ CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF NOT EXISTS] [[catalog_name.]db_name.]function_name AS identifier [LANGUAGE JAVA|SCALA|PYTHON] [USING JAR '<path_to_filename>.jar' [, JAR '<path_to_filename>.jar']* ] + [WITH (key1=val1, key2=val2, ...)] ``` 创建一个有 catalog 和数据库命名空间的 catalog function ,需要指定一个 identifier ,可指定 language tag 。 若 catalog 中,已经有同名的函数注册了,则无法注册。 diff --git a/docs/content/docs/dev/table/sql/create.md b/docs/content/docs/dev/table/sql/create.md index 9147b929f8c..10d570a6e34 100644 --- a/docs/content/docs/dev/table/sql/create.md +++ b/docs/content/docs/dev/table/sql/create.md @@ -860,6 +860,7 @@ CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF NOT EXISTS] [catalog_name.][db_name.]function_name AS identifier [LANGUAGE JAVA|SCALA|PYTHON] [USING JAR '<path_to_filename>.jar' [, JAR '<path_to_filename>.jar']* ] + [WITH (key1=val1, key2=val2, ...)] ``` Create a catalog function that has catalog and database namespaces with the identifier and optional language tag. If a function with the same name already exists in the catalog, an exception is thrown. diff --git a/flink-python/pyflink/table/catalog.py b/flink-python/pyflink/table/catalog.py index 888f995d688..8033a95ba2c 100644 --- a/flink-python/pyflink/table/catalog.py +++ b/flink-python/pyflink/table/catalog.py @@ -1118,6 +1118,16 @@ class CatalogFunction(object): """ return self._j_catalog_function.getFunctionLanguage() + def get_options(self) -> Dict[str, str]: + """ + Returns a map of string-based options. + + :return: Property map of the function. + + .. versionadded:: 2.2.0 + """ + return dict(self._j_catalog_function.getOptions()) + @PublicEvolving() class CatalogModel(object): diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 9419d165874..fb2b688b42c 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -396,6 +396,7 @@ SqlCreate SqlCreateFunction(Span s, boolean replace, boolean isTemporary) : boolean ifNotExists = false; boolean isSystemFunction = false; SqlNodeList resourceInfos = SqlNodeList.EMPTY; + SqlNodeList propertyList = SqlNodeList.EMPTY; SqlParserPos functionLanguagePos = null; } { @@ -466,6 +467,10 @@ SqlCreate SqlCreateFunction(Span s, boolean replace, boolean isTemporary) : )* { resourceInfos = new SqlNodeList(resourceList, s.pos()); } ] + [ + <WITH> + propertyList = Properties() + ] { return new SqlCreateFunction( s.pos(), @@ -475,7 +480,8 @@ SqlCreate SqlCreateFunction(Span s, boolean replace, boolean isTemporary) : ifNotExists, isTemporary, isSystemFunction, - resourceInfos); + resourceInfos, + propertyList); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateFunction.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateFunction.java index 252ef5f6864..af40191e9ae 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateFunction.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateFunction.java @@ -18,6 +18,8 @@ package org.apache.flink.sql.parser.ddl; +import org.apache.flink.sql.parser.SqlUnparseUtils; + import org.apache.calcite.sql.SqlCharStringLiteral; import org.apache.calcite.sql.SqlCreate; import org.apache.calcite.sql.SqlIdentifier; @@ -53,6 +55,7 @@ public class SqlCreateFunction extends SqlCreate { private final boolean isSystemFunction; private final SqlNodeList resourceInfos; + private final SqlNodeList propertyList; public SqlCreateFunction( SqlParserPos pos, @@ -62,7 +65,8 @@ public class SqlCreateFunction extends SqlCreate { boolean ifNotExists, boolean isTemporary, boolean isSystemFunction, - SqlNodeList resourceInfos) { + SqlNodeList resourceInfos, + SqlNodeList propertyList) { super(OPERATOR, pos, false, ifNotExists); this.functionIdentifier = requireNonNull(functionIdentifier); this.functionClassName = requireNonNull(functionClassName); @@ -70,6 +74,7 @@ public class SqlCreateFunction extends SqlCreate { this.isTemporary = isTemporary; this.functionLanguage = functionLanguage; this.resourceInfos = resourceInfos; + this.propertyList = requireNonNull(propertyList); } @Override @@ -103,7 +108,7 @@ public class SqlCreateFunction extends SqlCreate { writer.keyword("LANGUAGE"); writer.keyword(functionLanguage); } - if (resourceInfos.size() > 0) { + if (!resourceInfos.isEmpty()) { writer.keyword("USING"); SqlWriter.Frame withFrame = writer.startList("", ""); for (SqlNode resourcePath : resourceInfos) { @@ -112,6 +117,16 @@ public class SqlCreateFunction extends SqlCreate { } writer.endList(withFrame); } + if (!this.propertyList.isEmpty()) { + writer.keyword("WITH"); + SqlWriter.Frame withFrame = writer.startList("(", ")"); + for (SqlNode property : propertyList) { + SqlUnparseUtils.printIndent(writer); + property.unparse(writer, leftPrec, rightPrec); + } + writer.newlineAndIndent(); + writer.endList(withFrame); + } } public boolean isIfNotExists() { @@ -141,4 +156,8 @@ public class SqlCreateFunction extends SqlCreate { public List<SqlNode> getResourceInfos() { return resourceInfos.getList(); } + + public SqlNodeList getPropertyList() { + return propertyList; + } } diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index f84762645b0..c71cc1e15a0 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -2491,6 +2491,20 @@ class FlinkSqlParserImplTest extends SqlParserTest { + "Was expecting:\n" + " \"JAR\" ...\n" + " .*"); + + sql("create function function1 as 'org.apache.flink.function.function1' language java using jar 'file:///path/to/test.jar' WITH ('k1' = 'v1', 'k2' = 'v2')") + .ok( + "CREATE FUNCTION `FUNCTION1` AS 'org.apache.flink.function.function1' LANGUAGE JAVA USING JAR 'file:///path/to/test.jar' WITH (\n" + + " 'k1' = 'v1',\n" + + " 'k2' = 'v2'\n" + + ")"); + + sql("create temporary function function1 as 'org.apache.flink.function.function1' language java using jar 'file:///path/to/test.jar' WITH ('k1' = 'v1', 'k2' = 'v2')") + .ok( + "CREATE TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.flink.function.function1' LANGUAGE JAVA USING JAR 'file:///path/to/test.jar' WITH (\n" + + " 'k1' = 'v1',\n" + + " 'k2' = 'v2'\n" + + ")"); } @Test diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/FunctionDescriptor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/FunctionDescriptor.java new file mode 100644 index 00000000000..2dad5982e5b --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/FunctionDescriptor.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.FunctionLanguage; +import org.apache.flink.table.functions.UserDefinedFunction; +import org.apache.flink.table.functions.UserDefinedFunctionHelper; +import org.apache.flink.table.resource.ResourceUri; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Describes a {@link CatalogFunction}. + * + * <p>A {@link FunctionDescriptor} is a template for creating a {@link CatalogFunction} instance. It + * closely resembles the "CREATE FUNCTION" SQL DDL statement. + * + * <p>This can be used to register a table in the Table API, see {@link + * TableEnvironment#createFunction(String, FunctionDescriptor)}. + */ +@PublicEvolving +public class FunctionDescriptor { + + private final String className; + private final FunctionLanguage language; + private final List<ResourceUri> resourceUris; + private final Map<String, String> options; + + private FunctionDescriptor( + String className, + FunctionLanguage language, + List<ResourceUri> resourceUris, + Map<String, String> options) { + this.className = className; + this.language = language; + this.resourceUris = resourceUris; + this.options = options; + } + + /** Creates a {@link Builder} for a function descriptor with the given class name. */ + public static Builder forClassName(String className) { + return new Builder(className); + } + + /** Creates a {@link Builder} for a function descriptor for the given function class. */ + public static Builder forFunctionClass(Class<? extends UserDefinedFunction> functionClass) { + try { + UserDefinedFunctionHelper.validateClass(functionClass); + } catch (Throwable t) { + throw new ValidationException( + String.format( + "Can not create a function '%s' due to implementation errors.", + functionClass.getName()), + t); + } + return new Builder(functionClass.getName()).language(FunctionLanguage.JAVA); + } + + public String getClassName() { + return className; + } + + public FunctionLanguage getLanguage() { + return language; + } + + public List<ResourceUri> getResourceUris() { + return resourceUris; + } + + public Map<String, String> getOptions() { + return options; + } + + /** Builder for {@link FunctionDescriptor}. */ + @PublicEvolving + public static final class Builder { + private final String className; + private FunctionLanguage language = FunctionLanguage.JAVA; + private final List<ResourceUri> resourceUris = new ArrayList<>(); + private final Map<String, String> options = new HashMap<>(); + + private Builder(String className) { + this.className = className; + } + + /** + * Sets the language of the function. Equivalent to the {@code LANGUAGE} clause in the + * "CREATE FUNCTION" SQL DDL statement. + */ + public Builder language(FunctionLanguage language) { + Preconditions.checkNotNull(language, "Function language must not be null."); + this.language = language; + return this; + } + + /** + * Adds a list of resource URIs to the function descriptor. Equivalent to the {@code USING} + * clause in the "CREATE FUNCTION" SQL DDL statement. + */ + public Builder resourceUris(List<ResourceUri> uri) { + Preconditions.checkNotNull(uri, "Resource URIs must not be null."); + this.resourceUris.addAll(uri); + return this; + } + + /** + * Adds a single resource URI to the function descriptor. Equivalent to the {@code USING} + * clause in the "CREATE FUNCTION" SQL DDL statement. + */ + public Builder resourceUri(ResourceUri uri) { + Preconditions.checkNotNull(uri, "Resource URI must not be null."); + this.resourceUris.add(uri); + return this; + } + + /** + * Adds an option to the function descriptor. Equivalent to the {@code WITH} clause in the + * "CREATE FUNCTION" SQL DDL statement. + */ + public Builder option(String key, String value) { + Preconditions.checkNotNull(key, "Option key must not be null."); + Preconditions.checkNotNull(value, "Option value must not be null."); + this.options.put(key, value); + return this; + } + + /** + * Adds multiple options to the function descriptor. Equivalent to the {@code WITH} clause + * in the "CREATE FUNCTION" SQL DDL statement. + */ + public Builder options(Map<String, String> options) { + Preconditions.checkNotNull(options, "Options must not be null."); + this.options.putAll(options); + return this; + } + + public FunctionDescriptor build() { + return new FunctionDescriptor( + className, + language, + Collections.unmodifiableList(resourceUris), + Collections.unmodifiableMap(options)); + } + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java index 3b514aa4ae4..b0bc8538833 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java @@ -561,6 +561,21 @@ public interface TableEnvironment { */ void createFunction(String path, String className, List<ResourceUri> resourceUris); + /** + * Creates a catalog function in the given path described by a {@link FunctionDescriptor}. + * + * <p>Compared to system functions with a globally defined name, catalog functions are always + * (implicitly or explicitly) identified by a catalog and database. + * + * <p>There must not be another function (temporary or permanent) registered under the same + * path. + * + * @param path The path under which the function will be registered. See also the {@link + * TableEnvironment} class description for the format of the path. + * @param functionDescriptor The descriptor of the function to create. + */ + void createFunction(String path, FunctionDescriptor functionDescriptor); + /** * Registers a {@link UserDefinedFunction} class as a catalog function in the given path by the * specific class name and user defined resource uri. @@ -587,6 +602,23 @@ public interface TableEnvironment { void createFunction( String path, String className, List<ResourceUri> resourceUris, boolean ignoreIfExists); + /** + * Creates a catalog function in the given path described by a {@link FunctionDescriptor}. + * + * <p>Compared to system functions with a globally defined name, catalog functions are always + * (implicitly or explicitly) identified by a catalog and database. + * + * <p>There must not be another function (temporary or permanent) registered under the same + * path. + * + * @param path The path under which the function will be registered. See also the {@link + * TableEnvironment} class description for the format of the path. + * @param functionDescriptor The descriptor of the function to create. + * @param ignoreIfExists If a function exists under the given path and this flag is set, no + * operation is executed. An exception is thrown otherwise. + */ + void createFunction(String path, FunctionDescriptor functionDescriptor, boolean ignoreIfExists); + /** * Registers a {@link UserDefinedFunction} class as a temporary catalog function. * @@ -651,6 +683,24 @@ public interface TableEnvironment { */ void createTemporaryFunction(String path, String className, List<ResourceUri> resourceUris); + /** + * Creates a temporary catalog function using a {@link FunctionDescriptor} to describe the + * function. + * + * <p>Compared to {@link #createTemporarySystemFunction(String, String, List)} with a globally + * defined name, catalog functions are always (implicitly or explicitly) identified by a catalog + * and database. + * + * <p>Temporary functions can shadow permanent ones. If a permanent function under a given name + * exists, it will be inaccessible in the current session. To make the permanent function + * available again one can drop the corresponding temporary function. + * + * @param path The path under which the function will be registered. See also the {@link + * TableEnvironment} class description for the format of the path. + * @param functionDescriptor The descriptor of the function to create. + */ + void createTemporaryFunction(String path, FunctionDescriptor functionDescriptor); + /** * Registers a {@link UserDefinedFunction} class as a temporary system function by the specific * class name and user defined resource uri. @@ -672,6 +722,19 @@ public interface TableEnvironment { void createTemporarySystemFunction( String name, String className, List<ResourceUri> resourceUris); + /** + * Creates a temporary system function using a {@link FunctionDescriptor} to describe the + * function. + * + * <p>Temporary functions can shadow permanent ones. If a permanent function under a given name + * exists, it will be inaccessible in the current session. To make the permanent function + * available again one can drop the corresponding temporary system function. + * + * @param name The name under which the function will be registered globally. + * @param functionDescriptor The descriptor of the function to create. + */ + void createTemporarySystemFunction(String name, FunctionDescriptor functionDescriptor); + /** * Drops a catalog function registered in the given path. * diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index f3c1e2f628f..37017b9df32 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -32,6 +32,7 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.ExplainFormat; import org.apache.flink.table.api.Expressions; +import org.apache.flink.table.api.FunctionDescriptor; import org.apache.flink.table.api.ModelDescriptor; import org.apache.flink.table.api.PlanReference; import org.apache.flink.table.api.ResultKind; @@ -417,7 +418,14 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { @Override public void createTemporarySystemFunction( String name, String className, List<ResourceUri> resourceUris) { - functionCatalog.registerTemporarySystemFunction(name, className, resourceUris); + createTemporarySystemFunction( + name, + FunctionDescriptor.forClassName(className).resourceUris(resourceUris).build()); + } + + @Override + public void createTemporarySystemFunction(String name, FunctionDescriptor functionDescriptor) { + functionCatalog.registerTemporarySystemFunction(name, functionDescriptor, false); } @Override @@ -435,9 +443,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { String path, Class<? extends UserDefinedFunction> functionClass, boolean ignoreIfExists) { - final UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path); - functionCatalog.registerCatalogFunction( - unresolvedIdentifier, functionClass, ignoreIfExists); + createFunction( + path, FunctionDescriptor.forFunctionClass(functionClass).build(), ignoreIfExists); } @Override @@ -448,9 +455,27 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { @Override public void createFunction( String path, String className, List<ResourceUri> resourceUris, boolean ignoreIfExists) { + createFunction( + path, + FunctionDescriptor.forClassName(className) + .language(FunctionLanguage.JAVA) + .resourceUris(resourceUris) + .build(), + ignoreIfExists); + } + + @Override + public void createFunction(String path, FunctionDescriptor functionDescriptor) { + createFunction(path, functionDescriptor, false); + } + + @Override + public void createFunction( + String path, FunctionDescriptor functionDescriptor, boolean ignoreIfExists) { + final UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path); functionCatalog.registerCatalogFunction( - unresolvedIdentifier, className, resourceUris, ignoreIfExists); + unresolvedIdentifier, functionDescriptor, ignoreIfExists); } @Override @@ -477,9 +502,23 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { @Override public void createTemporaryFunction( String path, String className, List<ResourceUri> resourceUris) { + createTemporaryFunction( + path, + FunctionDescriptor.forClassName(className) + .language(FunctionLanguage.JAVA) + .resourceUris(resourceUris) + .build()); + } + + @Override + public void createTemporaryFunction(String path, FunctionDescriptor functionDescriptor) { final UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path); final CatalogFunction catalogFunction = - new CatalogFunctionImpl(className, FunctionLanguage.JAVA, resourceUris); + new CatalogFunctionImpl( + functionDescriptor.getClassName(), + functionDescriptor.getLanguage(), + functionDescriptor.getResourceUris(), + functionDescriptor.getOptions()); functionCatalog.registerTemporaryCatalogFunction( unresolvedIdentifier, catalogFunction, false); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java index d8ad6932580..2a6829cb78d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java @@ -24,6 +24,7 @@ import org.apache.flink.util.StringUtils; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -36,6 +37,7 @@ public class CatalogFunctionImpl implements CatalogFunction { private final String className; // Fully qualified class name of the function private final FunctionLanguage functionLanguage; private final List<ResourceUri> resourceUris; + private final Map<String, String> options; public CatalogFunctionImpl(String className) { this(className, FunctionLanguage.JAVA, Collections.emptyList()); @@ -47,12 +49,21 @@ public class CatalogFunctionImpl implements CatalogFunction { public CatalogFunctionImpl( String className, FunctionLanguage functionLanguage, List<ResourceUri> resourceUris) { + this(className, functionLanguage, resourceUris, Collections.emptyMap()); + } + + public CatalogFunctionImpl( + String className, + FunctionLanguage functionLanguage, + List<ResourceUri> resourceUris, + Map<String, String> options) { checkArgument( !StringUtils.isNullOrWhitespaceOnly(className), "className cannot be null or empty"); this.className = className; this.functionLanguage = checkNotNull(functionLanguage, "functionLanguage cannot be null"); this.resourceUris = resourceUris; + this.options = checkNotNull(options, "options cannot be null"); } @Override @@ -63,7 +74,10 @@ public class CatalogFunctionImpl implements CatalogFunction { @Override public CatalogFunction copy() { return new CatalogFunctionImpl( - getClassName(), functionLanguage, Collections.unmodifiableList(resourceUris)); + getClassName(), + functionLanguage, + Collections.unmodifiableList(resourceUris), + Collections.unmodifiableMap(options)); } @Override @@ -86,23 +100,31 @@ public class CatalogFunctionImpl implements CatalogFunction { return resourceUris; } + @Override + public Map<String, String> getOptions() { + return options; + } + @Override public boolean equals(Object o) { - if (this == o) { - return true; - } if (o == null || getClass() != o.getClass()) { return false; } + CatalogFunctionImpl that = (CatalogFunctionImpl) o; return Objects.equals(className, that.className) && functionLanguage == that.functionLanguage - && Objects.equals(resourceUris, that.resourceUris); + && Objects.equals(resourceUris, that.resourceUris) + && Objects.equals(options, that.options); } @Override public int hashCode() { - return Objects.hash(className, functionLanguage, resourceUris); + int result = Objects.hashCode(className); + result = 31 * result + Objects.hashCode(functionLanguage); + result = 31 * result + Objects.hashCode(resourceUris); + result = 31 * result + Objects.hashCode(options); + return result; } @Override @@ -116,6 +138,9 @@ public class CatalogFunctionImpl implements CatalogFunction { + "', " + "functionResource='" + getFunctionResources() + + "', " + + "options='" + + getOptions() + "'}"; } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedFunction.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedFunction.java index 8ae5c9ff179..089e08fd02d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedFunction.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedFunction.java @@ -184,20 +184,23 @@ public final class ContextResolvedFunction { @Override public boolean equals(Object o) { - if (this == o) { - return true; - } if (o == null || getClass() != o.getClass()) { return false; } + ContextResolvedFunction that = (ContextResolvedFunction) o; return isTemporary == that.isTemporary && Objects.equals(functionIdentifier, that.functionIdentifier) - && functionDefinition.equals(that.functionDefinition); + && functionDefinition.equals(that.functionDefinition) + && Objects.equals(catalogFunction, that.catalogFunction); } @Override public int hashCode() { - return Objects.hash(isTemporary, functionIdentifier, functionDefinition); + int result = Boolean.hashCode(isTemporary); + result = 31 * result + Objects.hashCode(functionIdentifier); + result = 31 * result + functionDefinition.hashCode(); + result = 31 * result + Objects.hashCode(catalogFunction); + return result; } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java index 058e87a36b6..d0d72467c33 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java @@ -21,6 +21,7 @@ package org.apache.flink.table.catalog; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.FunctionDescriptor; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; @@ -55,6 +56,7 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Function; @@ -115,23 +117,17 @@ public final class FunctionCatalog { name, new InlineCatalogFunction(definition), ignoreIfExists); } - /** Registers a uninstantiated temporary system function. */ - public void registerTemporarySystemFunction( - String name, - String fullyQualifiedName, - FunctionLanguage language, - boolean ignoreIfExists) { - registerTemporarySystemFunction( - name, new CatalogFunctionImpl(fullyQualifiedName, language), ignoreIfExists); - } - /** Registers a temporary system function from resource uris. */ public void registerTemporarySystemFunction( - String name, String className, List<ResourceUri> resourceUris) { + String name, FunctionDescriptor functionDescriptor, boolean ignoreIfExists) { registerTemporarySystemFunction( name, - new CatalogFunctionImpl(className, FunctionLanguage.JAVA, resourceUris), - false); + new CatalogFunctionImpl( + functionDescriptor.getClassName(), + functionDescriptor.getLanguage(), + functionDescriptor.getResourceUris(), + functionDescriptor.getOptions()), + ignoreIfExists); } /** Drops a temporary system function. Returns true if a function was dropped. */ @@ -208,37 +204,18 @@ public final class FunctionCatalog { return dropTempCatalogFunction(identifier, ignoreIfNotExist) != null; } - /** Registers a catalog function by also considering temporary catalog functions. */ - public void registerCatalogFunction( - UnresolvedIdentifier unresolvedIdentifier, - Class<? extends UserDefinedFunction> functionClass, - boolean ignoreIfExists) { - final ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); - final CatalogFunction catalogFunction = - new CatalogFunctionImpl(functionClass.getName(), FunctionLanguage.JAVA); - - try { - UserDefinedFunctionHelper.validateClass(functionClass); - } catch (Throwable t) { - throw new ValidationException( - String.format( - "Could not register catalog function '%s' due to implementation errors.", - identifier.asSummaryString()), - t); - } - - registerCatalogFunction(identifier, catalogFunction, ignoreIfExists); - } - public void registerCatalogFunction( UnresolvedIdentifier unresolvedIdentifier, - String className, - List<ResourceUri> resourceUris, + FunctionDescriptor functionDescriptor, boolean ignoreIfExists) { final ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); final CatalogFunction catalogFunction = - new CatalogFunctionImpl(className, FunctionLanguage.JAVA, resourceUris); + new CatalogFunctionImpl( + functionDescriptor.getClassName(), + functionDescriptor.getLanguage(), + functionDescriptor.getResourceUris(), + functionDescriptor.getOptions()); registerCatalogFunction(identifier, catalogFunction, ignoreIfExists); } @@ -888,5 +865,20 @@ public final class FunctionCatalog { public FunctionDefinition getDefinition() { return definition; } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + + InlineCatalogFunction that = (InlineCatalogFunction) o; + return Objects.equals(definition, that.definition); + } + + @Override + public int hashCode() { + return Objects.hashCode(definition); + } } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTempSystemFunctionOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTempSystemFunctionOperation.java index d2bb01d062c..b9118400016 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTempSystemFunctionOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTempSystemFunctionOperation.java @@ -48,11 +48,12 @@ public class CreateTempSystemFunctionOperation implements CreateOperation { String functionClass, boolean ignoreIfExists, FunctionLanguage functionLanguage, - List<ResourceUri> resourceUris) { + List<ResourceUri> resourceUris, + Map<String, String> options) { this.functionName = functionName; this.ignoreIfExists = ignoreIfExists; this.catalogFunction = - new CatalogFunctionImpl(functionClass, functionLanguage, resourceUris); + new CatalogFunctionImpl(functionClass, functionLanguage, resourceUris, options); } public CreateTempSystemFunctionOperation( diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java index fa9baacbd9e..ae6c8f639a4 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java @@ -19,6 +19,7 @@ package org.apache.flink.table.catalog; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.FunctionDescriptor; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.FunctionDefinition; @@ -115,15 +116,14 @@ class FunctionCatalogTest { assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER)).isEmpty(); // test catalog function is found - catalog.createFunction( - IDENTIFIER.toObjectPath(), - new CatalogFunctionImpl(FUNCTION_1.getClass().getName()), - false); + final CatalogFunctionImpl catalogFunction = + new CatalogFunctionImpl(FUNCTION_1.getClass().getName()); + catalog.createFunction(IDENTIFIER.toObjectPath(), catalogFunction, false); assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER)) .hasValue( ContextResolvedFunction.permanent( - FunctionIdentifier.of(IDENTIFIER), FUNCTION_1)); + FunctionIdentifier.of(IDENTIFIER), FUNCTION_1, catalogFunction)); // test temp catalog function is found functionCatalog.registerTemporaryCatalogFunction( @@ -132,7 +132,9 @@ class FunctionCatalogTest { assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER)) .hasValue( ContextResolvedFunction.temporary( - FunctionIdentifier.of(IDENTIFIER), FUNCTION_2)); + FunctionIdentifier.of(IDENTIFIER), + FUNCTION_2, + new FunctionCatalog.InlineCatalogFunction(FUNCTION_2))); } @Test @@ -141,15 +143,14 @@ class FunctionCatalogTest { assertThat(functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER)).isEmpty(); // test catalog function is found - catalog.createFunction( - IDENTIFIER.toObjectPath(), - new CatalogFunctionImpl(FUNCTION_1.getClass().getName()), - false); + final CatalogFunctionImpl function = + new CatalogFunctionImpl(FUNCTION_1.getClass().getName()); + catalog.createFunction(IDENTIFIER.toObjectPath(), function, false); assertThat(functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER)) .hasValue( ContextResolvedFunction.permanent( - FunctionIdentifier.of(IDENTIFIER), FUNCTION_1)); + FunctionIdentifier.of(IDENTIFIER), FUNCTION_1, function)); // test temporary catalog function is found functionCatalog.registerTemporaryCatalogFunction( @@ -158,7 +159,9 @@ class FunctionCatalogTest { assertThat(functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER)) .hasValue( ContextResolvedFunction.temporary( - FunctionIdentifier.of(IDENTIFIER), FUNCTION_2)); + FunctionIdentifier.of(IDENTIFIER), + FUNCTION_2, + new FunctionCatalog.InlineCatalogFunction(FUNCTION_2))); // test system function is found moduleManager.loadModule("test_module", new TestModule()); @@ -230,14 +233,22 @@ class FunctionCatalogTest { void testUninstantiatedTemporarySystemFunction() { // register first time functionCatalog.registerTemporarySystemFunction( - NAME, FUNCTION_1.getClass().getName(), FunctionLanguage.JAVA, false); + NAME, + FunctionDescriptor.forClassName(FUNCTION_1.getClass().getName()) + .language(FunctionLanguage.JAVA) + .build(), + false); assertThat(functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER)) .hasValue( ContextResolvedFunction.temporary(FunctionIdentifier.of(NAME), FUNCTION_1)); // register second time lenient functionCatalog.registerTemporarySystemFunction( - NAME, FUNCTION_2.getClass().getName(), FunctionLanguage.JAVA, true); + NAME, + FunctionDescriptor.forClassName(FUNCTION_2.getClass().getName()) + .language(FunctionLanguage.JAVA) + .build(), + true); assertThat(functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER)) .hasValue( ContextResolvedFunction.temporary(FunctionIdentifier.of(NAME), FUNCTION_1)); @@ -247,8 +258,10 @@ class FunctionCatalogTest { () -> functionCatalog.registerTemporarySystemFunction( NAME, - FUNCTION_2.getClass().getName(), - FunctionLanguage.JAVA, + FunctionDescriptor.forClassName( + FUNCTION_2.getClass().getName()) + .language(FunctionLanguage.JAVA) + .build(), false)) .satisfies( anyCauseMatches( @@ -260,8 +273,10 @@ class FunctionCatalogTest { () -> functionCatalog.registerTemporarySystemFunction( NAME, - FUNCTION_INVALID.getClass().getName(), - FunctionLanguage.JAVA, + FunctionDescriptor.forClassName( + FUNCTION_INVALID.getClass().getName()) + .language(FunctionLanguage.JAVA) + .build(), false)) .satisfies( anyCauseMatches( @@ -274,7 +289,11 @@ class FunctionCatalogTest { // test register uninstantiated table function functionCatalog.registerTemporarySystemFunction( - NAME, TABLE_FUNCTION.getClass().getName(), FunctionLanguage.JAVA, false); + NAME, + FunctionDescriptor.forClassName(TABLE_FUNCTION.getClass().getName()) + .language(FunctionLanguage.JAVA) + .build(), + false); assertThat(functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER)) .hasValue( ContextResolvedFunction.temporary( @@ -284,7 +303,11 @@ class FunctionCatalogTest { // test register uninstantiated aggregate function functionCatalog.registerTemporarySystemFunction( - NAME, AGGREGATE_FUNCTION.getClass().getName(), FunctionLanguage.JAVA, false); + NAME, + FunctionDescriptor.forClassName(AGGREGATE_FUNCTION.getClass().getName()) + .language(FunctionLanguage.JAVA) + .build(), + false); assertThat(functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER)) .hasValue( ContextResolvedFunction.temporary( @@ -295,26 +318,37 @@ class FunctionCatalogTest { void testCatalogFunction() { // register first time functionCatalog.registerCatalogFunction( - PARTIAL_UNRESOLVED_IDENTIFIER, FUNCTION_1.getClass(), false); + PARTIAL_UNRESOLVED_IDENTIFIER, + FunctionDescriptor.forFunctionClass(FUNCTION_1.getClass()).build(), + false); + final CatalogFunctionImpl catalogFunction = + new CatalogFunctionImpl( + FUNCTION_1.getClass().getName(), + FunctionLanguage.JAVA, + Collections.emptyList(), + Collections.emptyMap()); assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER)) .hasValue( ContextResolvedFunction.permanent( - FunctionIdentifier.of(IDENTIFIER), FUNCTION_1)); + FunctionIdentifier.of(IDENTIFIER), FUNCTION_1, catalogFunction)); // register second time lenient functionCatalog.registerCatalogFunction( - PARTIAL_UNRESOLVED_IDENTIFIER, FUNCTION_2.getClass(), true); + PARTIAL_UNRESOLVED_IDENTIFIER, + FunctionDescriptor.forFunctionClass(FUNCTION_2.getClass()).build(), + true); assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER)) .hasValue( ContextResolvedFunction.permanent( - FunctionIdentifier.of(IDENTIFIER), FUNCTION_1)); + FunctionIdentifier.of(IDENTIFIER), FUNCTION_1, catalogFunction)); // register second time not lenient assertThatThrownBy( () -> functionCatalog.registerCatalogFunction( PARTIAL_UNRESOLVED_IDENTIFIER, - FUNCTION_2.getClass(), + FunctionDescriptor.forFunctionClass(FUNCTION_2.getClass()) + .build(), false)) .satisfies( anyCauseMatches( @@ -349,25 +383,56 @@ class FunctionCatalogTest { () -> functionCatalog.registerCatalogFunction( PARTIAL_UNRESOLVED_IDENTIFIER, - FUNCTION_INVALID.getClass(), + FunctionDescriptor.forFunctionClass( + FUNCTION_INVALID.getClass()) + .build(), false)) .satisfies( anyCauseMatches( ValidationException.class, - "Could not register catalog function '" - + IDENTIFIER.asSummaryString() + "Can not create a function '" + + FUNCTION_INVALID.getClass().getName() + "' due to implementation errors.")); } + @Test + void testCatalogFunctionWithOptions() { + // register permanent function + functionCatalog.registerCatalogFunction( + PARTIAL_UNRESOLVED_IDENTIFIER, + FunctionDescriptor.forFunctionClass(FUNCTION_2.getClass()) + .option("key1", "option1") + .build(), + false); + assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER)) + .hasValue( + ContextResolvedFunction.permanent( + FunctionIdentifier.of(IDENTIFIER), + FUNCTION_2, + new CatalogFunctionImpl( + FUNCTION_2.getClass().getName(), + FunctionLanguage.JAVA, + Collections.emptyList(), + Collections.singletonMap("key1", "option1")))); + } + @Test void testTemporaryCatalogFunction() { // register permanent function functionCatalog.registerCatalogFunction( - PARTIAL_UNRESOLVED_IDENTIFIER, FUNCTION_2.getClass(), false); + PARTIAL_UNRESOLVED_IDENTIFIER, + FunctionDescriptor.forFunctionClass(FUNCTION_2.getClass()).build(), + false); + final CatalogFunctionImpl catalogFunction = + new CatalogFunctionImpl( + FUNCTION_2.getClass().getName(), + FunctionLanguage.JAVA, + Collections.emptyList(), + Collections.emptyMap()); assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER)) .hasValue( ContextResolvedFunction.permanent( - FunctionIdentifier.of(IDENTIFIER), FUNCTION_2)); + FunctionIdentifier.of(IDENTIFIER), FUNCTION_2, catalogFunction)); // register temporary first time functionCatalog.registerTemporaryCatalogFunction( @@ -375,8 +440,10 @@ class FunctionCatalogTest { assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER)) .hasValue( ContextResolvedFunction.temporary( - FunctionIdentifier.of(IDENTIFIER), - FUNCTION_1)); // temporary function hides catalog function + FunctionIdentifier.of( + IDENTIFIER), // temporary function hides catalog function + FUNCTION_1, + new FunctionCatalog.InlineCatalogFunction(FUNCTION_1))); // dropping catalog functions is not possible in this state assertThatThrownBy( @@ -396,7 +463,8 @@ class FunctionCatalogTest { () -> functionCatalog.registerCatalogFunction( PARTIAL_UNRESOLVED_IDENTIFIER, - FUNCTION_2.getClass(), + FunctionDescriptor.forFunctionClass(FUNCTION_2.getClass()) + .build(), false)) .satisfies( anyCauseMatches( @@ -412,7 +480,9 @@ class FunctionCatalogTest { assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER)) .hasValue( ContextResolvedFunction.temporary( - FunctionIdentifier.of(IDENTIFIER), FUNCTION_1)); + FunctionIdentifier.of(IDENTIFIER), + FUNCTION_1, + new FunctionCatalog.InlineCatalogFunction(FUNCTION_1))); // register temporary second time not lenient assertThatThrownBy( @@ -435,7 +505,8 @@ class FunctionCatalogTest { .hasValue( ContextResolvedFunction.permanent( FunctionIdentifier.of(IDENTIFIER), - FUNCTION_2)); // permanent function is visible again + FUNCTION_2, + catalogFunction)); // permanent function is visible again // drop temporary second time lenient assertThat( @@ -472,32 +543,42 @@ class FunctionCatalogTest { void testUninstantiatedTemporaryCatalogFunction() { // register permanent function functionCatalog.registerCatalogFunction( - PARTIAL_UNRESOLVED_IDENTIFIER, FUNCTION_2.getClass(), false); + PARTIAL_UNRESOLVED_IDENTIFIER, + FunctionDescriptor.forFunctionClass(FUNCTION_2.getClass()).build(), + false); assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER)) .hasValue( ContextResolvedFunction.permanent( - FunctionIdentifier.of(IDENTIFIER), FUNCTION_2)); + FunctionIdentifier.of(IDENTIFIER), + FUNCTION_2, + new CatalogFunctionImpl( + FUNCTION_2.getClass().getName(), + FunctionLanguage.JAVA, + Collections.emptyList(), + Collections.emptyMap()))); // register temporary first time + final CatalogFunctionImpl temporaryCatalogFunction = + new CatalogFunctionImpl(FUNCTION_1.getClass().getName()); functionCatalog.registerTemporaryCatalogFunction( - PARTIAL_UNRESOLVED_IDENTIFIER, - new CatalogFunctionImpl(FUNCTION_1.getClass().getName()), - false); + PARTIAL_UNRESOLVED_IDENTIFIER, temporaryCatalogFunction, false); // temporary function hides catalog function assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER)) .hasValue( ContextResolvedFunction.temporary( - FunctionIdentifier.of(IDENTIFIER), FUNCTION_1)); + FunctionIdentifier.of(IDENTIFIER), + FUNCTION_1, + temporaryCatalogFunction)); // register temporary second time lenient functionCatalog.registerTemporaryCatalogFunction( - PARTIAL_UNRESOLVED_IDENTIFIER, - new CatalogFunctionImpl(FUNCTION_1.getClass().getName()), - true); + PARTIAL_UNRESOLVED_IDENTIFIER, temporaryCatalogFunction, true); assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER)) .hasValue( ContextResolvedFunction.temporary( - FunctionIdentifier.of(IDENTIFIER), FUNCTION_1)); + FunctionIdentifier.of(IDENTIFIER), + FUNCTION_1, + temporaryCatalogFunction)); // register temporary second time not lenient assertThatThrownBy( @@ -534,26 +615,30 @@ class FunctionCatalogTest { functionCatalog.dropTemporaryCatalogFunction(PARTIAL_UNRESOLVED_IDENTIFIER, true); // test register uninstantiated table function + final CatalogFunctionImpl temporaryTableCatalogFunction = + new CatalogFunctionImpl(TABLE_FUNCTION.getClass().getName()); functionCatalog.registerTemporaryCatalogFunction( - PARTIAL_UNRESOLVED_IDENTIFIER, - new CatalogFunctionImpl(TABLE_FUNCTION.getClass().getName()), - false); + PARTIAL_UNRESOLVED_IDENTIFIER, temporaryTableCatalogFunction, false); assertThat(functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER)) .hasValue( ContextResolvedFunction.temporary( - FunctionIdentifier.of(IDENTIFIER), TABLE_FUNCTION)); + FunctionIdentifier.of(IDENTIFIER), + TABLE_FUNCTION, + temporaryTableCatalogFunction)); functionCatalog.dropTemporaryCatalogFunction(PARTIAL_UNRESOLVED_IDENTIFIER, true); // test register uninstantiated aggregate function + final CatalogFunctionImpl temporaryAggregateCatalogFunction = + new CatalogFunctionImpl(AGGREGATE_FUNCTION.getClass().getName()); functionCatalog.registerTemporaryCatalogFunction( - PARTIAL_UNRESOLVED_IDENTIFIER, - new CatalogFunctionImpl(AGGREGATE_FUNCTION.getClass().getName()), - false); + PARTIAL_UNRESOLVED_IDENTIFIER, temporaryAggregateCatalogFunction, false); assertThat(functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER)) .hasValue( ContextResolvedFunction.temporary( - FunctionIdentifier.of(IDENTIFIER), AGGREGATE_FUNCTION)); + FunctionIdentifier.of(IDENTIFIER), + AGGREGATE_FUNCTION, + temporaryAggregateCatalogFunction)); } // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java index cb98e51a09e..0b14e7f8de6 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.table.api.FunctionDescriptor; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogFunctionImpl; import org.apache.flink.table.catalog.FunctionCatalog; @@ -385,7 +386,12 @@ public class ResourceManagerTest { new ModuleManager()); functionCatalog.registerCatalogFunction( - FULL_UNRESOLVED_IDENTIFIER1, GENERATED_LOWER_UDF_CLASS, resourceUris, false); + FULL_UNRESOLVED_IDENTIFIER1, + FunctionDescriptor.forClassName(GENERATED_LOWER_UDF_CLASS) + .language(FunctionLanguage.JAVA) + .resourceUris(resourceUris) + .build(), + false); Map<ResourceUri, ResourceManager.ResourceCounter> functionResourceInfos = resourceManager.functionResourceInfos(); @@ -396,7 +402,12 @@ public class ResourceManagerTest { // Register catalog function again to validate that unregister catalog function will not // decrease the reference count of resourceUris. functionCatalog.registerCatalogFunction( - FULL_UNRESOLVED_IDENTIFIER1, GENERATED_LOWER_UDF_CLASS, resourceUris, false); + FULL_UNRESOLVED_IDENTIFIER1, + FunctionDescriptor.forClassName(GENERATED_LOWER_UDF_CLASS) + .language(FunctionLanguage.JAVA) + .resourceUris(resourceUris) + .build(), + false); functionCatalog.registerTemporaryCatalogFunction( FULL_UNRESOLVED_IDENTIFIER2, new CatalogFunctionImpl( diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java index 090c699e33f..e394db7dd71 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java @@ -21,7 +21,9 @@ package org.apache.flink.table.catalog; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.resource.ResourceUri; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; /** Interface for a function in a catalog. */ @@ -69,4 +71,9 @@ public interface CatalogFunction { * @return an {@link ResourceUri} list of the function */ List<ResourceUri> getFunctionResources(); + + /** Returns a map of string-based options. */ + default Map<String, String> getOptions() { + return Collections.emptyMap(); + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java index 6c0b31cf10d..269af4f60d0 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java @@ -558,20 +558,29 @@ public class SqlNodeToOperationConversion { UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlCreateFunction.getFunctionIdentifier()); List<ResourceUri> resourceUris = getFunctionResources(sqlCreateFunction.getResourceInfos()); + final Map<String, String> options = + sqlCreateFunction.getPropertyList().getList().stream() + .map(SqlTableOption.class::cast) + .collect( + Collectors.toMap( + SqlTableOption::getKeyString, + SqlTableOption::getValueString)); if (sqlCreateFunction.isSystemFunction()) { return new CreateTempSystemFunctionOperation( unresolvedIdentifier.getObjectName(), sqlCreateFunction.getFunctionClassName().getValueAs(String.class), sqlCreateFunction.isIfNotExists(), parseLanguage(sqlCreateFunction.getFunctionLanguage()), - resourceUris); + resourceUris, + options); } else { FunctionLanguage language = parseLanguage(sqlCreateFunction.getFunctionLanguage()); CatalogFunction catalogFunction = new CatalogFunctionImpl( sqlCreateFunction.getFunctionClassName().getValueAs(String.class), language, - resourceUris); + resourceUris, + options); ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); return new CreateCatalogFunctionOperation( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java index facb4a11158..23b11bd531f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java @@ -492,7 +492,7 @@ final class RexNodeJsonSerializer extends StdSerializer<RexNode> { assert identifier.getIdentifier().isPresent(); serializeCatalogFunction( identifier.getIdentifier().get(), - resolvedFunction.getDefinition(), + resolvedFunction, gen, serializerProvider, serializeCatalogObjects); @@ -501,7 +501,7 @@ final class RexNodeJsonSerializer extends StdSerializer<RexNode> { private static void serializeCatalogFunction( ObjectIdentifier objectIdentifier, - FunctionDefinition definition, + ContextResolvedFunction resolvedFunction, JsonGenerator gen, SerializerProvider serializerProvider, boolean serializeCatalogObjects) @@ -512,6 +512,18 @@ final class RexNodeJsonSerializer extends StdSerializer<RexNode> { return; } + if (resolvedFunction.getCatalogFunction() != null + && !resolvedFunction.getCatalogFunction().getOptions().isEmpty()) { + throw new TableException( + String.format( + "Catalog functions with custom options can not be serialized into the " + + "compiled plan. Please set the option '%s'='%s' to only" + + " serialize the function's identifier.", + TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS.key(), + CatalogPlanCompilation.IDENTIFIER)); + } + + final FunctionDefinition definition = resolvedFunction.getDefinition(); if (!(definition instanceof UserDefinedFunction) || !isClassNameSerializable((UserDefinedFunction) definition)) { throw cannotSerializePermanentCatalogFunction(objectIdentifier); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java index da91a54060f..f5f4e4d6696 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java @@ -1075,7 +1075,7 @@ class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversionTestBas "CREATE TEMPORARY SYSTEM FUNCTION: (functionName: [test_udf2], " + "catalogFunction: [CatalogFunctionImpl{className='org.apache.fink.function.function2', " + "functionLanguage='SCALA', " - + "functionResource='[ResourceUri{resourceType=JAR, uri='file:///path/to/test.jar'}]'}], " + + "functionResource='[ResourceUri{resourceType=JAR, uri='file:///path/to/test.jar'}]', options='{}'}], " + "ignoreIfExists: [false], functionLanguage: [SCALA])"); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java index 27d8ea979fc..2487e6427a2 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.operations; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.FunctionDescriptor; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogMaterializedTable; @@ -161,7 +162,7 @@ public class SqlMaterializedTableNodeToOperationConverterTest UnresolvedIdentifier.of( ObjectIdentifier.of( catalogManager.getCurrentCatalog(), "default", "myFunc")), - TableFunc0.class, + FunctionDescriptor.forFunctionClass(TableFunc0.class).build(), true); final String sql = diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java index c151cac5a02..76b0066256b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java @@ -20,12 +20,16 @@ package org.apache.flink.table.planner.plan.nodes.exec.serde; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.FunctionDescriptor; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation; import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanRestore; +import org.apache.flink.table.catalog.CatalogFunctionImpl; import org.apache.flink.table.catalog.ContextResolvedFunction; import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.catalog.FunctionCatalog; +import org.apache.flink.table.catalog.FunctionLanguage; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.functions.AsyncScalarFunction; @@ -80,6 +84,7 @@ import java.io.IOException; import java.math.BigDecimal; import java.util.Arrays; import java.util.Collections; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -135,7 +140,23 @@ public class RexNodeJsonSerdeTest { private static final NonSerializableFunctionDefinition NON_SER_FUNCTION_DEF_IMPL = new NonSerializableFunctionDefinition(); private static final ContextResolvedFunction PERMANENT_FUNCTION = - ContextResolvedFunction.permanent(FUNCTION_CAT_ID, SER_UDF_IMPL); + ContextResolvedFunction.permanent( + FUNCTION_CAT_ID, + SER_UDF_IMPL, + new CatalogFunctionImpl( + SER_UDF_IMPL.getClass().getName(), + FunctionLanguage.JAVA, + Collections.emptyList(), + Collections.emptyMap())); + private static final ContextResolvedFunction PERMANENT_FUNCTION_WITH_OPTIONS = + ContextResolvedFunction.permanent( + FUNCTION_CAT_ID, + SER_UDF_IMPL, + new CatalogFunctionImpl( + SER_UDF_IMPL.getClass().getName(), + FunctionLanguage.JAVA, + Collections.emptyList(), + Map.of("option1", "value1", "option2", "value2"))); @ParameterizedTest @MethodSource("testRexNodeSerde") @@ -329,7 +350,32 @@ public class RexNodeJsonSerdeTest { assertThat(actual) .isEqualTo( ContextResolvedFunction.permanent( - FUNCTION_CAT_ID, SER_UDF_IMPL_OTHER)); + FUNCTION_CAT_ID, + SER_UDF_IMPL_OTHER, + new CatalogFunctionImpl( + SER_UDF_IMPL_OTHER.getClass().getName(), + FunctionLanguage.JAVA, + Collections.emptyList(), + Collections.emptyMap()))); + } + + @Test + void withCatalogFunctionWithOptions() throws Exception { + final SerdeContext serdeContext = serdeContext(compilation, restore); + serdeContext + .getFlinkContext() + .getFunctionCatalog() + .registerCatalogFunction( + UNRESOLVED_FUNCTION_CAT_ID, + FunctionDescriptor.forFunctionClass(SER_UDF_CLASS) + .option("option1", "value1") + .build(), + false); + + testJsonRoundTrip( + serdeContext, + createFunctionCall(serdeContext, PERMANENT_FUNCTION_WITH_OPTIONS), + RexNode.class); } } @@ -367,7 +413,10 @@ public class RexNodeJsonSerdeTest { assertThat(actual) .isEqualTo( ContextResolvedFunction.temporary( - FUNCTION_CAT_ID, NON_SER_FUNCTION_DEF_IMPL)); + FUNCTION_CAT_ID, + NON_SER_FUNCTION_DEF_IMPL, + new FunctionCatalog.InlineCatalogFunction( + NON_SER_FUNCTION_DEF_IMPL))); } } @@ -402,7 +451,10 @@ public class RexNodeJsonSerdeTest { assertThat(actual) .isEqualTo( ContextResolvedFunction.temporary( - FUNCTION_CAT_ID, NON_SER_FUNCTION_DEF_IMPL)); + FUNCTION_CAT_ID, + NON_SER_FUNCTION_DEF_IMPL, + new FunctionCatalog.InlineCatalogFunction( + NON_SER_FUNCTION_DEF_IMPL))); } @Test @@ -418,7 +470,13 @@ public class RexNodeJsonSerdeTest { assertThat(actual) .isEqualTo( ContextResolvedFunction.permanent( - FUNCTION_CAT_ID, SER_UDF_IMPL_OTHER)); + FUNCTION_CAT_ID, + SER_UDF_IMPL_OTHER, + new CatalogFunctionImpl( + SER_UDF_IMPL_OTHER.getClass().getName(), + FunctionLanguage.JAVA, + Collections.emptyList(), + Collections.emptyMap()))); } } } @@ -475,7 +533,38 @@ public class RexNodeJsonSerdeTest { assertThat(actual) .isEqualTo( ContextResolvedFunction.temporary( - FUNCTION_CAT_ID, NON_SER_FUNCTION_DEF_IMPL)); + FUNCTION_CAT_ID, + NON_SER_FUNCTION_DEF_IMPL, + new FunctionCatalog.InlineCatalogFunction( + NON_SER_FUNCTION_DEF_IMPL))); + } + + @Test + void withCatalogFunctionWithOptions() throws Exception { + final SerdeContext serdeContext = serdeContext(compilation, restore); + serdeContext + .getFlinkContext() + .getFunctionCatalog() + .registerCatalogFunction( + UNRESOLVED_FUNCTION_CAT_ID, + FunctionDescriptor.forFunctionClass(SER_UDF_CLASS) + .option("option1", "value1") + .build(), + false); + + assertThatThrownBy( + () -> + testJsonRoundTrip( + serdeContext, + createFunctionCall( + serdeContext, + PERMANENT_FUNCTION_WITH_OPTIONS), + RexNode.class)) + .hasMessageContaining( + "Catalog functions with custom options can" + + " not be serialized into the compiled plan. Please set the option" + + " 'table.plan.compile.catalog-objects'='IDENTIFIER' to only serialize" + + " the function's identifier."); } } @@ -552,7 +641,10 @@ public class RexNodeJsonSerdeTest { assertThat(actual) .isEqualTo( ContextResolvedFunction.temporary( - FUNCTION_CAT_ID, NON_SER_FUNCTION_DEF_IMPL)); + FUNCTION_CAT_ID, + NON_SER_FUNCTION_DEF_IMPL, + new FunctionCatalog.InlineCatalogFunction( + NON_SER_FUNCTION_DEF_IMPL))); } } } @@ -760,12 +852,17 @@ public class RexNodeJsonSerdeTest { serdeContext .getFlinkContext() .getFunctionCatalog() - .registerCatalogFunction(UNRESOLVED_FUNCTION_CAT_ID, SER_UDF_CLASS, false); + .registerCatalogFunction( + UNRESOLVED_FUNCTION_CAT_ID, + FunctionDescriptor.forFunctionClass(SER_UDF_CLASS).build(), + false); serdeContext .getFlinkContext() .getFunctionCatalog() .registerCatalogFunction( - UNRESOLVED_ASYNC_FUNCTION_CAT_ID, SER_ASYNC_UDF_CLASS, false); + UNRESOLVED_ASYNC_FUNCTION_CAT_ID, + FunctionDescriptor.forFunctionClass(SER_ASYNC_UDF_CLASS).build(), + false); return serdeContext; } @@ -781,7 +878,10 @@ public class RexNodeJsonSerdeTest { serdeContext .getFlinkContext() .getFunctionCatalog() - .registerCatalogFunction(UNRESOLVED_FUNCTION_CAT_ID, SER_UDF_CLASS_OTHER, false); + .registerCatalogFunction( + UNRESOLVED_FUNCTION_CAT_ID, + FunctionDescriptor.forFunctionClass(SER_UDF_CLASS_OTHER).build(), + false); } private static void registerTemporaryFunction(SerdeContext serdeContext) {