This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c1b32be0faa [FLINK-27651][table] Support CREATE FUNCTION USING JAR
syntax
c1b32be0faa is described below
commit c1b32be0faa29ecf0b7b358404aba534cba59903
Author: Ron <[email protected]>
AuthorDate: Sun Jun 12 13:10:53 2022 +0800
[FLINK-27651][table] Support CREATE FUNCTION USING JAR syntax
This closes #19742
---
.../table/tests/test_catalog_completeness.py | 4 ++
.../src/main/codegen/includes/parserImpls.ftl | 11 +++-
.../src/main/codegen/data/Parser.tdd | 2 +
.../src/main/codegen/includes/parserImpls.ftl | 76 ++++++++++++++++++---
.../flink/sql/parser/ddl/SqlCreateFunction.java | 22 ++++++-
.../flink/sql/parser/ddl/resource/SqlResource.java | 77 ++++++++++++++++++++++
.../sql/parser/ddl/resource/SqlResourceType.java | 48 ++++++++++++++
.../flink/sql/parser/utils/ParserResource.java | 3 +
.../ParserResource.properties | 1 +
.../flink/sql/parser/FlinkSqlParserImplTest.java | 30 +++++++++
.../flink/table/catalog/CatalogFunctionImpl.java | 43 +++++++++++-
.../flink/table/catalog/FunctionCatalog.java | 8 +++
.../ddl/CreateTempSystemFunctionOperation.java | 8 ++-
.../flink/table/catalog/CatalogFunction.java | 9 +++
.../apache/flink/table/resource/ResourceType.java | 29 ++++++++
.../apache/flink/table/resource/ResourceUri.java | 68 +++++++++++++++++++
.../functions/UserDefinedFunctionHelperTest.java | 7 ++
.../operations/SqlToOperationConverter.java | 44 ++++++++++++-
.../operations/SqlToOperationConverterTest.java | 46 +++++++++++++
19 files changed, 517 insertions(+), 19 deletions(-)
diff --git a/flink-python/pyflink/table/tests/test_catalog_completeness.py
b/flink-python/pyflink/table/tests/test_catalog_completeness.py
index fd83e29cfa8..326f2365491 100644
--- a/flink-python/pyflink/table/tests/test_catalog_completeness.py
+++ b/flink-python/pyflink/table/tests/test_catalog_completeness.py
@@ -96,6 +96,10 @@ class
CatalogFunctionAPICompletenessTests(PythonAPICompletenessTestCase, PyFlink
def java_class(cls):
return "org.apache.flink.table.catalog.CatalogFunction"
+ @classmethod
+ def excluded_methods(cls):
+ return {'getFunctionResources'}
+
class CatalogPartitionAPICompletenessTests(PythonAPICompletenessTestCase,
PyFlinkTestCase):
"""
diff --git
a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
index df11c89b7a6..e4bdae2172f 100644
---
a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
+++
b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
@@ -1005,8 +1005,15 @@ SqlCreate SqlCreateFunction(Span s, boolean isTemporary)
:
functionClassName = createStringLiteral(token.image, getPos());
}
{
- return new SqlCreateFunction(s.pos(), functionIdentifier,
functionClassName, null,
- false, isTemporary, false);
+ return new SqlCreateFunction(
+ s.pos(),
+ functionIdentifier,
+ functionClassName,
+ null,
+ false,
+ isTemporary,
+ false,
+ SqlNodeList.EMPTY);
}
}
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 1f5ea19f40f..be27f4fc110 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -26,6 +26,8 @@
"org.apache.flink.sql.parser.ddl.constraint.SqlConstraintEnforcement"
"org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint"
"org.apache.flink.sql.parser.ddl.constraint.SqlUniqueSpec"
+ "org.apache.flink.sql.parser.ddl.resource.SqlResource"
+ "org.apache.flink.sql.parser.ddl.resource.SqlResourceType"
"org.apache.flink.sql.parser.ddl.SqlAddJar"
"org.apache.flink.sql.parser.ddl.SqlAlterDatabase"
"org.apache.flink.sql.parser.ddl.SqlAlterFunction"
diff --git
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index cfd7c3758e5..10c4f540fd0 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -270,6 +270,8 @@ SqlCreate SqlCreateFunction(Span s, boolean replace,
boolean isTemporary) :
String functionLanguage = null;
boolean ifNotExists = false;
boolean isSystemFunction = false;
+ SqlNodeList resourceInfos = SqlNodeList.EMPTY;
+ SqlParserPos functionLanguagePos = null;
}
{
(
@@ -277,7 +279,7 @@ SqlCreate SqlCreateFunction(Span s, boolean replace,
boolean isTemporary) :
{
if (!isTemporary){
throw SqlUtil.newContextException(getPos(),
-
ParserResource.RESOURCE.createSystemFunctionOnlySupportTemporary());
+
ParserResource.RESOURCE.createSystemFunctionOnlySupportTemporary());
}
}
<FUNCTION>
@@ -294,20 +296,78 @@ SqlCreate SqlCreateFunction(Span s, boolean replace,
boolean isTemporary) :
String p = SqlParserUtil.parseString(token.image);
functionClassName = SqlLiteral.createCharString(p, getPos());
}
- [<LANGUAGE>
+ [
+ <LANGUAGE>
(
- <JAVA> { functionLanguage = "JAVA"; }
+ <JAVA> {
+ functionLanguage = "JAVA";
+ functionLanguagePos = getPos();
+ }
|
- <SCALA> { functionLanguage = "SCALA"; }
+ <SCALA> {
+ functionLanguage = "SCALA";
+ functionLanguagePos = getPos();
+ }
|
- <SQL> { functionLanguage = "SQL"; }
+ <SQL> {
+ functionLanguage = "SQL";
+ functionLanguagePos = getPos();
+ }
|
- <PYTHON> { functionLanguage = "PYTHON"; }
+ <PYTHON> {
+ functionLanguage = "PYTHON";
+ functionLanguagePos = getPos();
+ }
)
]
+ [
+ <USING> {
+ if ("SQL".equals(functionLanguage) ||
"PYTHON".equals(functionLanguage)) {
+ throw SqlUtil.newContextException(
+ functionLanguagePos,
+
ParserResource.RESOURCE.createFunctionUsingJar(functionLanguage));
+ }
+ List<SqlNode> resourceList = new ArrayList<SqlNode>();
+ SqlResource sqlResource = null;
+ }
+ sqlResource = SqlResourceInfo() {
+ resourceList.add(sqlResource);
+ }
+ (
+ <COMMA>
+ sqlResource = SqlResourceInfo() {
+ resourceList.add(sqlResource);
+ }
+ )*
+ { resourceInfos = new SqlNodeList(resourceList, s.pos()); }
+ ]
{
- return new SqlCreateFunction(s.pos(), functionIdentifier,
functionClassName, functionLanguage,
- ifNotExists, isTemporary, isSystemFunction);
+ return new SqlCreateFunction(
+ s.pos(),
+ functionIdentifier,
+ functionClassName,
+ functionLanguage,
+ ifNotExists,
+ isTemporary,
+ isSystemFunction,
+ resourceInfos);
+ }
+}
+
+/**
+ * Parse resource type and path.
+ */
+SqlResource SqlResourceInfo() :
+{
+ String resourcePath;
+}
+{
+ <JAR> <QUOTED_STRING> {
+ resourcePath = SqlParserUtil.parseString(token.image);
+ return new SqlResource(
+ getPos(),
+ SqlResourceType.JAR.symbol(getPos()),
+ SqlLiteral.createCharString(resourcePath, getPos()));
}
}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateFunction.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateFunction.java
index 179b790095a..252ef5f6864 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateFunction.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateFunction.java
@@ -23,6 +23,7 @@ import org.apache.calcite.sql.SqlCreate;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
@@ -51,6 +52,8 @@ public class SqlCreateFunction extends SqlCreate {
private final boolean isSystemFunction;
+ private final SqlNodeList resourceInfos;
+
public SqlCreateFunction(
SqlParserPos pos,
SqlIdentifier functionIdentifier,
@@ -58,13 +61,15 @@ public class SqlCreateFunction extends SqlCreate {
String functionLanguage,
boolean ifNotExists,
boolean isTemporary,
- boolean isSystemFunction) {
+ boolean isSystemFunction,
+ SqlNodeList resourceInfos) {
super(OPERATOR, pos, false, ifNotExists);
this.functionIdentifier = requireNonNull(functionIdentifier);
this.functionClassName = requireNonNull(functionClassName);
this.isSystemFunction = isSystemFunction;
this.isTemporary = isTemporary;
this.functionLanguage = functionLanguage;
+ this.resourceInfos = resourceInfos;
}
@Override
@@ -75,7 +80,7 @@ public class SqlCreateFunction extends SqlCreate {
@Nonnull
@Override
public List<SqlNode> getOperandList() {
- return ImmutableNullableList.of(functionIdentifier, functionClassName);
+ return ImmutableNullableList.of(functionIdentifier, functionClassName,
resourceInfos);
}
@Override
@@ -98,6 +103,15 @@ public class SqlCreateFunction extends SqlCreate {
writer.keyword("LANGUAGE");
writer.keyword(functionLanguage);
}
+ if (resourceInfos.size() > 0) {
+ writer.keyword("USING");
+ SqlWriter.Frame withFrame = writer.startList("", "");
+ for (SqlNode resourcePath : resourceInfos) {
+ writer.sep(",");
+ resourcePath.unparse(writer, leftPrec, rightPrec);
+ }
+ writer.endList(withFrame);
+ }
}
public boolean isIfNotExists() {
@@ -123,4 +137,8 @@ public class SqlCreateFunction extends SqlCreate {
public String[] getFunctionIdentifier() {
return functionIdentifier.names.toArray(new String[0]);
}
+
+ public List<SqlNode> getResourceInfos() {
+ return resourceInfos.getList();
+ }
}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/resource/SqlResource.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/resource/SqlResource.java
new file mode 100644
index 00000000000..77cf4f52af8
--- /dev/null
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/resource/SqlResource.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl.resource;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+/** SqlNode to describe resource type and its path information. */
+public class SqlResource extends SqlCall {
+
+ private static final SqlOperator OPERATOR =
+ new SqlSpecialOperator("SqlResource", SqlKind.OTHER);
+
+ private final SqlLiteral resourceType;
+ private final SqlCharStringLiteral resourcePath;
+
+ public SqlResource(
+ SqlParserPos pos, SqlLiteral resourceType, SqlCharStringLiteral
resourcePath) {
+ super(pos);
+ this.resourceType = resourceType;
+ this.resourcePath = resourcePath;
+ }
+
+ @Nonnull
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Nonnull
+ @Override
+ public List<SqlNode> getOperandList() {
+ return ImmutableNullableList.of(resourceType, resourcePath);
+ }
+
+ public SqlLiteral getResourceType() {
+ return resourceType;
+ }
+
+ public SqlCharStringLiteral getResourcePath() {
+ return resourcePath;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ resourceType.unparse(writer, leftPrec, rightPrec);
+ resourcePath.unparse(writer, leftPrec, rightPrec);
+ }
+}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/resource/SqlResourceType.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/resource/SqlResourceType.java
new file mode 100644
index 00000000000..87aaea34fc1
--- /dev/null
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/resource/SqlResourceType.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl.resource;
+
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/** Enumeration of SQL resource type specification. */
+public enum SqlResourceType {
+ FILE("FILE"),
+ JAR("JAR"),
+ ARCHIVE("ARCHIVE");
+
+ private final String digest;
+
+ SqlResourceType(String digest) {
+ this.digest = digest;
+ }
+
+ @Override
+ public String toString() {
+ return this.digest;
+ }
+
+ /**
+ * Creates a parse-tree node representing an occurrence of this keyword at
a particular position
+ * in the parsed text.
+ */
+ public SqlLiteral symbol(SqlParserPos pos) {
+ return SqlLiteral.createSymbol(this, pos);
+ }
+}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
index 433a6884251..8fa96b90ae2 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
@@ -40,4 +40,7 @@ public interface ParserResource {
@Resources.BaseMessage("Duplicate EXPLAIN DETAIL is not allowed.")
Resources.ExInst<ParseException> explainDetailIsDuplicate();
+
+ @Resources.BaseMessage("CREATE FUNCTION USING JAR syntax is not applicable
to {0} language.")
+ Resources.ExInst<ParseException> createFunctionUsingJar(String language);
}
diff --git
a/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
b/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
index c43d94f3a95..9767c397de7 100644
---
a/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
+++
b/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
@@ -20,3 +20,4 @@ MultipleWatermarksUnsupported=Multiple WATERMARK statements
is not supported yet
OverwriteIsOnlyUsedWithInsert=OVERWRITE expression is only used with INSERT
statement.
createSystemFunctionOnlySupportTemporary=CREATE SYSTEM FUNCTION is not
supported, system functions can only be registered as temporary function, you
can use CREATE TEMPORARY SYSTEM FUNCTION instead.
explainDetailIsDuplicate=Duplicate EXPLAIN DETAIL is not allowed.
+createFunctionUsingJar=CREATE FUNCTION USING JAR syntax is not applicable to
{0} language.
diff --git
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index d8695754b6b..7c00f15ec75 100644
---
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -1314,6 +1314,36 @@ class FlinkSqlParserImplTest extends SqlParserTest {
"CREATE SYSTEM FUNCTION is not supported, "
+ "system functions can only be registered as
temporary "
+ "function, you can use CREATE TEMPORARY
SYSTEM FUNCTION instead.");
+
+ // test create function using jar
+ sql("create temporary function function1 as
'org.apache.fink.function.function1' language java using jar
'file:///path/to/test.jar'")
+ .ok(
+ "CREATE TEMPORARY FUNCTION `FUNCTION1` AS
'org.apache.fink.function.function1' LANGUAGE JAVA USING JAR
'file:///path/to/test.jar'");
+
+ sql("create temporary function function1 as
'org.apache.fink.function.function1' language scala using jar
'/path/to/test.jar'")
+ .ok(
+ "CREATE TEMPORARY FUNCTION `FUNCTION1` AS
'org.apache.fink.function.function1' LANGUAGE SCALA USING JAR
'/path/to/test.jar'");
+
+ sql("create temporary system function function1 as
'org.apache.fink.function.function1' language scala using jar
'/path/to/test.jar'")
+ .ok(
+ "CREATE TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS
'org.apache.fink.function.function1' LANGUAGE SCALA USING JAR
'/path/to/test.jar'");
+
+ sql("create function function1 as 'org.apache.fink.function.function1'
language java using jar 'file:///path/to/test.jar', jar
'hdfs:///path/to/test2.jar'")
+ .ok(
+ "CREATE FUNCTION `FUNCTION1` AS
'org.apache.fink.function.function1' LANGUAGE JAVA USING JAR
'file:///path/to/test.jar', JAR 'hdfs:///path/to/test2.jar'");
+
+ sql("create temporary function function1 as
'org.apache.fink.function.function1' language ^sql^ using jar
'file:///path/to/test.jar'")
+ .fails("CREATE FUNCTION USING JAR syntax is not applicable to
SQL language.");
+
+ sql("create temporary function function1 as
'org.apache.fink.function.function1' language ^python^ using jar
'file:///path/to/test.jar'")
+ .fails("CREATE FUNCTION USING JAR syntax is not applicable to
PYTHON language.");
+
+ sql("create temporary function function1 as
'org.apache.fink.function.function1' language java using ^file^
'file:///path/to/test'")
+ .fails(
+ "Encountered \"file\" at line 1, column 97.\n"
+ + "Was expecting:\n"
+ + " \"JAR\" ...\n"
+ + " .*");
}
@Test
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java
index d565bed7e77..6ac0df2a93b 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java
@@ -19,8 +19,12 @@
package org.apache.flink.table.catalog;
import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.util.StringUtils;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -30,17 +34,24 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
public class CatalogFunctionImpl implements CatalogFunction {
private final String className; // Fully qualified class name of the
function
private final FunctionLanguage functionLanguage;
+ private final List<ResourceUri> resourceUris;
public CatalogFunctionImpl(String className) {
- this(className, FunctionLanguage.JAVA);
+ this(className, FunctionLanguage.JAVA, Collections.emptyList());
}
public CatalogFunctionImpl(String className, FunctionLanguage
functionLanguage) {
+ this(className, functionLanguage, Collections.emptyList());
+ }
+
+ public CatalogFunctionImpl(
+ String className, FunctionLanguage functionLanguage,
List<ResourceUri> resourceUris) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(className),
"className cannot be null or empty");
this.className = className;
this.functionLanguage = checkNotNull(functionLanguage,
"functionLanguage cannot be null");
+ this.resourceUris = resourceUris;
}
@Override
@@ -50,7 +61,8 @@ public class CatalogFunctionImpl implements CatalogFunction {
@Override
public CatalogFunction copy() {
- return new CatalogFunctionImpl(getClassName(), functionLanguage);
+ return new CatalogFunctionImpl(
+ getClassName(), functionLanguage,
Collections.unmodifiableList(resourceUris));
}
@Override
@@ -85,6 +97,30 @@ public class CatalogFunctionImpl implements CatalogFunction {
return functionLanguage;
}
+ @Override
+ public List<ResourceUri> getFunctionResources() {
+ return resourceUris;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CatalogFunctionImpl that = (CatalogFunctionImpl) o;
+ return Objects.equals(className, that.className)
+ && functionLanguage == that.functionLanguage
+ && Objects.equals(resourceUris, that.resourceUris);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(className, functionLanguage, resourceUris);
+ }
+
@Override
public String toString() {
return "CatalogFunctionImpl{"
@@ -93,6 +129,9 @@ public class CatalogFunctionImpl implements CatalogFunction {
+ "', "
+ "functionLanguage='"
+ getFunctionLanguage()
+ + "', "
+ + "functionResource='"
+ + getFunctionResources()
+ "'}";
}
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index b7bb95f31aa..bd624a37b7d 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -40,10 +40,13 @@ import
org.apache.flink.table.functions.TableFunctionDefinition;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.util.Preconditions;
+import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -713,6 +716,11 @@ public final class FunctionCatalog {
return FunctionLanguage.JAVA;
}
+ @Override
+ public List<ResourceUri> getFunctionResources() {
+ return Collections.emptyList();
+ }
+
public FunctionDefinition getDefinition() {
return definition;
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTempSystemFunctionOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTempSystemFunctionOperation.java
index 920ff12a0c2..281bafeee5d 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTempSystemFunctionOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTempSystemFunctionOperation.java
@@ -25,9 +25,11 @@ import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.OperationUtils;
+import org.apache.flink.table.resource.ResourceUri;
import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
/** Operation to describe a CREATE FUNCTION statement for temporary system
function. */
@@ -40,10 +42,12 @@ public class CreateTempSystemFunctionOperation implements
CreateOperation {
String functionName,
String functionClass,
boolean ignoreIfExists,
- FunctionLanguage functionLanguage) {
+ FunctionLanguage functionLanguage,
+ List<ResourceUri> resourceUris) {
this.functionName = functionName;
this.ignoreIfExists = ignoreIfExists;
- this.catalogFunction = new CatalogFunctionImpl(functionClass,
functionLanguage);
+ this.catalogFunction =
+ new CatalogFunctionImpl(functionClass, functionLanguage,
resourceUris);
}
public CreateTempSystemFunctionOperation(
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
index 7c65dc17904..9e812020f3b 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
@@ -19,7 +19,9 @@
package org.apache.flink.table.catalog;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.resource.ResourceUri;
+import java.util.List;
import java.util.Optional;
/** Interface for a function in a catalog. */
@@ -70,4 +72,11 @@ public interface CatalogFunction {
* @return the language type of the function definition
*/
FunctionLanguage getFunctionLanguage();
+
+ /**
+ * Get a detailed resource description of the function.
+ *
+ * @return an {@link ResourceUri} list of the function
+ */
+ List<ResourceUri> getFunctionResources();
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/resource/ResourceType.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/resource/ResourceType.java
new file mode 100644
index 00000000000..15bbdd6656e
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/resource/ResourceType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.resource;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** An enum that represents the type of resource. */
+@PublicEvolving
+public enum ResourceType {
+ FILE,
+ JAR,
+ ARCHIVE
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/resource/ResourceUri.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/resource/ResourceUri.java
new file mode 100644
index 00000000000..56c1277cf31
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/resource/ResourceUri.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.resource;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Objects;
+
+/** Description of function resource information. */
+@PublicEvolving
+public class ResourceUri {
+
+ private final ResourceType resourceType;
+ private final String uri;
+
+ public ResourceUri(ResourceType resourceType, String uri) {
+ this.resourceType = resourceType;
+ this.uri = uri;
+ }
+
+ /** Get resource type info. */
+ public ResourceType getResourceType() {
+ return resourceType;
+ }
+
+ /** Get resource unique path info. */
+ public String getUri() {
+ return uri;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ResourceUri that = (ResourceUri) o;
+ return resourceType == that.resourceType && Objects.equals(uri,
that.uri);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(resourceType, uri);
+ }
+
+ @Override
+ public String toString() {
+ return "ResourceUri{" + "resourceType=" + resourceType + ", uri='" +
uri + '\'' + '}';
+ }
+}
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java
index ce0b3cb1861..80fb05809a1 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.util.Collector;
import org.junit.jupiter.api.Test;
@@ -31,6 +32,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import javax.annotation.Nullable;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
@@ -229,6 +231,11 @@ class UserDefinedFunctionHelperTest {
public FunctionLanguage getFunctionLanguage() {
return FunctionLanguage.JAVA;
}
+
+ @Override
+ public List<ResourceUri> getFunctionResources() {
+ return Collections.emptyList();
+ }
}
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index ef3eb9d4efe..25e2e53d03e 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -55,6 +55,8 @@ import org.apache.flink.sql.parser.ddl.SqlUseCatalog;
import org.apache.flink.sql.parser.ddl.SqlUseDatabase;
import org.apache.flink.sql.parser.ddl.SqlUseModules;
import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.ddl.resource.SqlResource;
+import org.apache.flink.sql.parser.ddl.resource.SqlResourceType;
import org.apache.flink.sql.parser.dml.RichSqlInsert;
import org.apache.flink.sql.parser.dml.SqlBeginStatementSet;
import org.apache.flink.sql.parser.dml.SqlCompileAndExecutePlan;
@@ -168,6 +170,8 @@ import
org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.hint.FlinkHints;
import org.apache.flink.table.planner.utils.Expander;
import org.apache.flink.table.planner.utils.OperationConverterUtils;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.StringUtils;
@@ -652,19 +656,21 @@ public class SqlToOperationConverter {
private Operation convertCreateFunction(SqlCreateFunction
sqlCreateFunction) {
UnresolvedIdentifier unresolvedIdentifier =
UnresolvedIdentifier.of(sqlCreateFunction.getFunctionIdentifier());
-
+ List<ResourceUri> resourceUris =
getFunctionResources(sqlCreateFunction.getResourceInfos());
if (sqlCreateFunction.isSystemFunction()) {
return new CreateTempSystemFunctionOperation(
unresolvedIdentifier.getObjectName(),
sqlCreateFunction.getFunctionClassName().getValueAs(String.class),
sqlCreateFunction.isIfNotExists(),
- parseLanguage(sqlCreateFunction.getFunctionLanguage()));
+ parseLanguage(sqlCreateFunction.getFunctionLanguage()),
+ resourceUris);
} else {
FunctionLanguage language =
parseLanguage(sqlCreateFunction.getFunctionLanguage());
CatalogFunction catalogFunction =
new CatalogFunctionImpl(
sqlCreateFunction.getFunctionClassName().getValueAs(String.class),
- language);
+ language,
+ resourceUris);
ObjectIdentifier identifier =
catalogManager.qualifyIdentifier(unresolvedIdentifier);
return new CreateCatalogFunctionOperation(
@@ -675,6 +681,38 @@ public class SqlToOperationConverter {
}
}
+ private List<ResourceUri> getFunctionResources(List<SqlNode> sqlResources)
{
+ return sqlResources.stream()
+ .map(SqlResource.class::cast)
+ .map(
+ sqlResource -> {
+ // get resource type
+ SqlResourceType sqlResourceType =
+
sqlResource.getResourceType().getValueAs(SqlResourceType.class);
+ ResourceType resourceType;
+ switch (sqlResourceType) {
+ case FILE:
+ resourceType = ResourceType.FILE;
+ break;
+ case JAR:
+ resourceType = ResourceType.JAR;
+ break;
+ case ARCHIVE:
+ resourceType = ResourceType.ARCHIVE;
+ break;
+ default:
+ throw new ValidationException(
+ String.format(
+ "Unsupported resource
type: .",
+ sqlResourceType));
+ }
+ // get resource path
+ String path =
sqlResource.getResourcePath().getValueAs(String.class);
+ return new ResourceUri(resourceType, path);
+ })
+ .collect(Collectors.toList());
+ }
+
/** Convert ALTER FUNCTION statement. */
private Operation convertAlterFunction(SqlAlterFunction sqlAlterFunction) {
if (sqlAlterFunction.isSystemFunction()) {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 39a502b1f09..198a28ea536 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ContextResolvedTable;
import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
@@ -81,8 +82,10 @@ import
org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation;
import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
+import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateViewOperation;
import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
@@ -96,6 +99,8 @@ import org.apache.flink.table.planner.parse.CalciteParser;
import org.apache.flink.table.planner.parse.ExtendedParser;
import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
import org.apache.flink.table.planner.utils.PlannerMocks;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.flink.table.utils.ExpressionResolverMocks;
@@ -1213,6 +1218,47 @@ public class SqlToOperationConverterTest {
assertThat(actualSchema).isEqualTo(expectedSchema);
}
+ @Test
+ public void testCreateFunction() {
+ // test create catalog function
+ String sql =
+ "CREATE FUNCTION test_udf AS
'org.apache.fink.function.function1' "
+ + "LANGUAGE JAVA USING JAR 'file:///path/to/test.jar'";
+ final FlinkPlannerImpl planner =
getPlannerBySqlDialect(SqlDialect.DEFAULT);
+ Operation operation = parse(sql, planner,
getParserBySqlDialect(SqlDialect.DEFAULT));
+
assertThat(operation).isInstanceOf(CreateCatalogFunctionOperation.class);
+ CatalogFunction actualFunction =
+ ((CreateCatalogFunctionOperation)
operation).getCatalogFunction();
+
+ assertThat(operation.asSummaryString())
+ .isEqualTo(
+ "CREATE CATALOG FUNCTION: (catalogFunction:
[Optional[This is a user-defined function]], "
+ + "identifier:
[`builtin`.`default`.`test_udf`], ignoreIfExists: [false], isTemporary:
[false])");
+
+ CatalogFunction expected =
+ new CatalogFunctionImpl(
+ "org.apache.fink.function.function1",
+ FunctionLanguage.JAVA,
+ Collections.singletonList(
+ new ResourceUri(ResourceType.JAR,
"file:///path/to/test.jar")));
+ assertThat(actualFunction).isEqualTo(expected);
+
+ // test create temporary system function
+ sql =
+ "CREATE TEMPORARY SYSTEM FUNCTION test_udf2 AS
'org.apache.fink.function.function2' "
+ + "LANGUAGE SCALA USING JAR
'file:///path/to/test.jar'";
+ operation = parse(sql, planner,
getParserBySqlDialect(SqlDialect.DEFAULT));
+
+
assertThat(operation).isInstanceOf(CreateTempSystemFunctionOperation.class);
+ assertThat(operation.asSummaryString())
+ .isEqualTo(
+ "CREATE TEMPORARY SYSTEM FUNCTION: (functionName:
[test_udf2], "
+ + "catalogFunction:
[CatalogFunctionImpl{className='org.apache.fink.function.function2', "
+ + "functionLanguage='SCALA', "
+ +
"functionResource='[ResourceUri{resourceType=JAR,
uri='file:///path/to/test.jar'}]'}], "
+ + "ignoreIfExists: [false], functionLanguage:
[SCALA])");
+ }
+
@Test
public void testAlterTable() throws Exception {
prepareNonManagedTable(false);