This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c1b32be0faa [FLINK-27651][table] Support CREATE FUNCTION USING JAR 
syntax
c1b32be0faa is described below

commit c1b32be0faa29ecf0b7b358404aba534cba59903
Author: Ron <[email protected]>
AuthorDate: Sun Jun 12 13:10:53 2022 +0800

    [FLINK-27651][table] Support CREATE FUNCTION USING JAR syntax
    
    This closes #19742
---
 .../table/tests/test_catalog_completeness.py       |  4 ++
 .../src/main/codegen/includes/parserImpls.ftl      | 11 +++-
 .../src/main/codegen/data/Parser.tdd               |  2 +
 .../src/main/codegen/includes/parserImpls.ftl      | 76 ++++++++++++++++++---
 .../flink/sql/parser/ddl/SqlCreateFunction.java    | 22 ++++++-
 .../flink/sql/parser/ddl/resource/SqlResource.java | 77 ++++++++++++++++++++++
 .../sql/parser/ddl/resource/SqlResourceType.java   | 48 ++++++++++++++
 .../flink/sql/parser/utils/ParserResource.java     |  3 +
 .../ParserResource.properties                      |  1 +
 .../flink/sql/parser/FlinkSqlParserImplTest.java   | 30 +++++++++
 .../flink/table/catalog/CatalogFunctionImpl.java   | 43 +++++++++++-
 .../flink/table/catalog/FunctionCatalog.java       |  8 +++
 .../ddl/CreateTempSystemFunctionOperation.java     |  8 ++-
 .../flink/table/catalog/CatalogFunction.java       |  9 +++
 .../apache/flink/table/resource/ResourceType.java  | 29 ++++++++
 .../apache/flink/table/resource/ResourceUri.java   | 68 +++++++++++++++++++
 .../functions/UserDefinedFunctionHelperTest.java   |  7 ++
 .../operations/SqlToOperationConverter.java        | 44 ++++++++++++-
 .../operations/SqlToOperationConverterTest.java    | 46 +++++++++++++
 19 files changed, 517 insertions(+), 19 deletions(-)

diff --git a/flink-python/pyflink/table/tests/test_catalog_completeness.py 
b/flink-python/pyflink/table/tests/test_catalog_completeness.py
index fd83e29cfa8..326f2365491 100644
--- a/flink-python/pyflink/table/tests/test_catalog_completeness.py
+++ b/flink-python/pyflink/table/tests/test_catalog_completeness.py
@@ -96,6 +96,10 @@ class 
CatalogFunctionAPICompletenessTests(PythonAPICompletenessTestCase, PyFlink
     def java_class(cls):
         return "org.apache.flink.table.catalog.CatalogFunction"
 
+    @classmethod
+    def excluded_methods(cls):
+        return {'getFunctionResources'}
+
 
 class CatalogPartitionAPICompletenessTests(PythonAPICompletenessTestCase, 
PyFlinkTestCase):
     """
diff --git 
a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
index df11c89b7a6..e4bdae2172f 100644
--- 
a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
+++ 
b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
@@ -1005,8 +1005,15 @@ SqlCreate SqlCreateFunction(Span s, boolean isTemporary) 
:
         functionClassName = createStringLiteral(token.image, getPos());
     }
     {
-        return new SqlCreateFunction(s.pos(), functionIdentifier, 
functionClassName, null,
-                false, isTemporary, false);
+        return new SqlCreateFunction(
+                    s.pos(),
+                    functionIdentifier,
+                    functionClassName,
+                    null,
+                    false,
+                    isTemporary,
+                    false,
+                    SqlNodeList.EMPTY);
     }
 }
 
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 1f5ea19f40f..be27f4fc110 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -26,6 +26,8 @@
     "org.apache.flink.sql.parser.ddl.constraint.SqlConstraintEnforcement"
     "org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint"
     "org.apache.flink.sql.parser.ddl.constraint.SqlUniqueSpec"
+    "org.apache.flink.sql.parser.ddl.resource.SqlResource"
+    "org.apache.flink.sql.parser.ddl.resource.SqlResourceType"
     "org.apache.flink.sql.parser.ddl.SqlAddJar"
     "org.apache.flink.sql.parser.ddl.SqlAlterDatabase"
     "org.apache.flink.sql.parser.ddl.SqlAlterFunction"
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index cfd7c3758e5..10c4f540fd0 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -270,6 +270,8 @@ SqlCreate SqlCreateFunction(Span s, boolean replace, 
boolean isTemporary) :
     String functionLanguage = null;
     boolean ifNotExists = false;
     boolean isSystemFunction = false;
+    SqlNodeList resourceInfos = SqlNodeList.EMPTY;
+    SqlParserPos functionLanguagePos = null;
 }
 {
     (
@@ -277,7 +279,7 @@ SqlCreate SqlCreateFunction(Span s, boolean replace, 
boolean isTemporary) :
         {
             if (!isTemporary){
                 throw SqlUtil.newContextException(getPos(),
-                
ParserResource.RESOURCE.createSystemFunctionOnlySupportTemporary());
+                    
ParserResource.RESOURCE.createSystemFunctionOnlySupportTemporary());
             }
         }
         <FUNCTION>
@@ -294,20 +296,78 @@ SqlCreate SqlCreateFunction(Span s, boolean replace, 
boolean isTemporary) :
         String p = SqlParserUtil.parseString(token.image);
         functionClassName = SqlLiteral.createCharString(p, getPos());
     }
-    [<LANGUAGE>
+    [
+        <LANGUAGE>
         (
-            <JAVA>  { functionLanguage = "JAVA"; }
+            <JAVA> {
+                functionLanguage = "JAVA";
+                functionLanguagePos = getPos();
+            }
         |
-            <SCALA> { functionLanguage = "SCALA"; }
+            <SCALA> {
+                functionLanguage = "SCALA";
+                functionLanguagePos = getPos();
+            }
         |
-            <SQL>   { functionLanguage = "SQL"; }
+            <SQL> {
+                functionLanguage = "SQL";
+                functionLanguagePos = getPos();
+            }
         |
-            <PYTHON>   { functionLanguage = "PYTHON"; }
+            <PYTHON> {
+                functionLanguage = "PYTHON";
+                functionLanguagePos = getPos();
+            }
         )
     ]
+    [
+        <USING> {
+            if ("SQL".equals(functionLanguage) || 
"PYTHON".equals(functionLanguage)) {
+                throw SqlUtil.newContextException(
+                    functionLanguagePos,
+                    
ParserResource.RESOURCE.createFunctionUsingJar(functionLanguage));
+            }
+            List<SqlNode> resourceList = new ArrayList<SqlNode>();
+            SqlResource sqlResource = null;
+        }
+        sqlResource = SqlResourceInfo() {
+            resourceList.add(sqlResource);
+        }
+        (
+            <COMMA>
+            sqlResource = SqlResourceInfo() {
+                resourceList.add(sqlResource);
+            }
+        )*
+        {  resourceInfos = new SqlNodeList(resourceList, s.pos()); }
+    ]
     {
-        return new SqlCreateFunction(s.pos(), functionIdentifier, 
functionClassName, functionLanguage,
-                ifNotExists, isTemporary, isSystemFunction);
+        return new SqlCreateFunction(
+                    s.pos(),
+                    functionIdentifier,
+                    functionClassName,
+                    functionLanguage,
+                    ifNotExists,
+                    isTemporary,
+                    isSystemFunction,
+                    resourceInfos);
+    }
+}
+
+/**
+ * Parse resource type and path.
+ */
+SqlResource SqlResourceInfo() :
+{
+    String resourcePath;
+}
+{
+    <JAR> <QUOTED_STRING> {
+        resourcePath = SqlParserUtil.parseString(token.image);
+        return new SqlResource(
+                    getPos(),
+                    SqlResourceType.JAR.symbol(getPos()),
+                    SqlLiteral.createCharString(resourcePath, getPos()));
     }
 }
 
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateFunction.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateFunction.java
index 179b790095a..252ef5f6864 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateFunction.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateFunction.java
@@ -23,6 +23,7 @@ import org.apache.calcite.sql.SqlCreate;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlSpecialOperator;
 import org.apache.calcite.sql.SqlWriter;
@@ -51,6 +52,8 @@ public class SqlCreateFunction extends SqlCreate {
 
     private final boolean isSystemFunction;
 
+    private final SqlNodeList resourceInfos;
+
     public SqlCreateFunction(
             SqlParserPos pos,
             SqlIdentifier functionIdentifier,
@@ -58,13 +61,15 @@ public class SqlCreateFunction extends SqlCreate {
             String functionLanguage,
             boolean ifNotExists,
             boolean isTemporary,
-            boolean isSystemFunction) {
+            boolean isSystemFunction,
+            SqlNodeList resourceInfos) {
         super(OPERATOR, pos, false, ifNotExists);
         this.functionIdentifier = requireNonNull(functionIdentifier);
         this.functionClassName = requireNonNull(functionClassName);
         this.isSystemFunction = isSystemFunction;
         this.isTemporary = isTemporary;
         this.functionLanguage = functionLanguage;
+        this.resourceInfos = resourceInfos;
     }
 
     @Override
@@ -75,7 +80,7 @@ public class SqlCreateFunction extends SqlCreate {
     @Nonnull
     @Override
     public List<SqlNode> getOperandList() {
-        return ImmutableNullableList.of(functionIdentifier, functionClassName);
+        return ImmutableNullableList.of(functionIdentifier, functionClassName, 
resourceInfos);
     }
 
     @Override
@@ -98,6 +103,15 @@ public class SqlCreateFunction extends SqlCreate {
             writer.keyword("LANGUAGE");
             writer.keyword(functionLanguage);
         }
+        if (resourceInfos.size() > 0) {
+            writer.keyword("USING");
+            SqlWriter.Frame withFrame = writer.startList("", "");
+            for (SqlNode resourcePath : resourceInfos) {
+                writer.sep(",");
+                resourcePath.unparse(writer, leftPrec, rightPrec);
+            }
+            writer.endList(withFrame);
+        }
     }
 
     public boolean isIfNotExists() {
@@ -123,4 +137,8 @@ public class SqlCreateFunction extends SqlCreate {
     public String[] getFunctionIdentifier() {
         return functionIdentifier.names.toArray(new String[0]);
     }
+
+    public List<SqlNode> getResourceInfos() {
+        return resourceInfos.getList();
+    }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/resource/SqlResource.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/resource/SqlResource.java
new file mode 100644
index 00000000000..77cf4f52af8
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/resource/SqlResource.java
@@ -0,0 +1,77 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl.resource;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+/** SqlNode to describe resource type and its path information. */
+public class SqlResource extends SqlCall {
+
+    private static final SqlOperator OPERATOR =
+            new SqlSpecialOperator("SqlResource", SqlKind.OTHER);
+
+    private final SqlLiteral resourceType;
+    private final SqlCharStringLiteral resourcePath;
+
+    public SqlResource(
+            SqlParserPos pos, SqlLiteral resourceType, SqlCharStringLiteral 
resourcePath) {
+        super(pos);
+        this.resourceType = resourceType;
+        this.resourcePath = resourcePath;
+    }
+
+    @Nonnull
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Nonnull
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(resourceType, resourcePath);
+    }
+
+    public SqlLiteral getResourceType() {
+        return resourceType;
+    }
+
+    public SqlCharStringLiteral getResourcePath() {
+        return resourcePath;
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        resourceType.unparse(writer, leftPrec, rightPrec);
+        resourcePath.unparse(writer, leftPrec, rightPrec);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/resource/SqlResourceType.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/resource/SqlResourceType.java
new file mode 100644
index 00000000000..87aaea34fc1
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/resource/SqlResourceType.java
@@ -0,0 +1,48 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl.resource;
+
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/** Enumeration of SQL resource type specification. */
+public enum SqlResourceType {
+    FILE("FILE"),
+    JAR("JAR"),
+    ARCHIVE("ARCHIVE");
+
+    private final String digest;
+
+    SqlResourceType(String digest) {
+        this.digest = digest;
+    }
+
+    @Override
+    public String toString() {
+        return this.digest;
+    }
+
+    /**
+     * Creates a parse-tree node representing an occurrence of this keyword at 
a particular position
+     * in the parsed text.
+     */
+    public SqlLiteral symbol(SqlParserPos pos) {
+        return SqlLiteral.createSymbol(this, pos);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
index 433a6884251..8fa96b90ae2 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
@@ -40,4 +40,7 @@ public interface ParserResource {
 
     @Resources.BaseMessage("Duplicate EXPLAIN DETAIL is not allowed.")
     Resources.ExInst<ParseException> explainDetailIsDuplicate();
+
+    @Resources.BaseMessage("CREATE FUNCTION USING JAR syntax is not applicable 
to {0} language.")
+    Resources.ExInst<ParseException> createFunctionUsingJar(String language);
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
 
b/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
index c43d94f3a95..9767c397de7 100644
--- 
a/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
+++ 
b/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
@@ -20,3 +20,4 @@ MultipleWatermarksUnsupported=Multiple WATERMARK statements 
is not supported yet
 OverwriteIsOnlyUsedWithInsert=OVERWRITE expression is only used with INSERT 
statement.
 createSystemFunctionOnlySupportTemporary=CREATE SYSTEM FUNCTION is not 
supported, system functions can only be registered as temporary function, you 
can use CREATE TEMPORARY SYSTEM FUNCTION instead.
 explainDetailIsDuplicate=Duplicate EXPLAIN DETAIL is not allowed.
+createFunctionUsingJar=CREATE FUNCTION USING JAR syntax is not applicable to 
{0} language.
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index d8695754b6b..7c00f15ec75 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -1314,6 +1314,36 @@ class FlinkSqlParserImplTest extends SqlParserTest {
                         "CREATE SYSTEM FUNCTION is not supported, "
                                 + "system functions can only be registered as 
temporary "
                                 + "function, you can use CREATE TEMPORARY 
SYSTEM FUNCTION instead.");
+
+        // test create function using jar
+        sql("create temporary function function1 as 
'org.apache.fink.function.function1' language java using jar 
'file:///path/to/test.jar'")
+                .ok(
+                        "CREATE TEMPORARY FUNCTION `FUNCTION1` AS 
'org.apache.fink.function.function1' LANGUAGE JAVA USING JAR 
'file:///path/to/test.jar'");
+
+        sql("create temporary function function1 as 
'org.apache.fink.function.function1' language scala using jar 
'/path/to/test.jar'")
+                .ok(
+                        "CREATE TEMPORARY FUNCTION `FUNCTION1` AS 
'org.apache.fink.function.function1' LANGUAGE SCALA USING JAR 
'/path/to/test.jar'");
+
+        sql("create temporary system function function1 as 
'org.apache.fink.function.function1' language scala using jar 
'/path/to/test.jar'")
+                .ok(
+                        "CREATE TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 
'org.apache.fink.function.function1' LANGUAGE SCALA USING JAR 
'/path/to/test.jar'");
+
+        sql("create function function1 as 'org.apache.fink.function.function1' 
language java using jar 'file:///path/to/test.jar', jar 
'hdfs:///path/to/test2.jar'")
+                .ok(
+                        "CREATE FUNCTION `FUNCTION1` AS 
'org.apache.fink.function.function1' LANGUAGE JAVA USING JAR 
'file:///path/to/test.jar', JAR 'hdfs:///path/to/test2.jar'");
+
+        sql("create temporary function function1 as 
'org.apache.fink.function.function1' language ^sql^ using jar 
'file:///path/to/test.jar'")
+                .fails("CREATE FUNCTION USING JAR syntax is not applicable to 
SQL language.");
+
+        sql("create temporary function function1 as 
'org.apache.fink.function.function1' language ^python^ using jar 
'file:///path/to/test.jar'")
+                .fails("CREATE FUNCTION USING JAR syntax is not applicable to 
PYTHON language.");
+
+        sql("create temporary function function1 as 
'org.apache.fink.function.function1' language java using ^file^ 
'file:///path/to/test'")
+                .fails(
+                        "Encountered \"file\" at line 1, column 97.\n"
+                                + "Was expecting:\n"
+                                + "    \"JAR\" ...\n"
+                                + "    .*");
     }
 
     @Test
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java
index d565bed7e77..6ac0df2a93b 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java
@@ -19,8 +19,12 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.resource.ResourceUri;
 import org.apache.flink.util.StringUtils;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -30,17 +34,24 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public class CatalogFunctionImpl implements CatalogFunction {
     private final String className; // Fully qualified class name of the 
function
     private final FunctionLanguage functionLanguage;
+    private final List<ResourceUri> resourceUris;
 
     public CatalogFunctionImpl(String className) {
-        this(className, FunctionLanguage.JAVA);
+        this(className, FunctionLanguage.JAVA, Collections.emptyList());
     }
 
     public CatalogFunctionImpl(String className, FunctionLanguage 
functionLanguage) {
+        this(className, functionLanguage, Collections.emptyList());
+    }
+
+    public CatalogFunctionImpl(
+            String className, FunctionLanguage functionLanguage, 
List<ResourceUri> resourceUris) {
         checkArgument(
                 !StringUtils.isNullOrWhitespaceOnly(className),
                 "className cannot be null or empty");
         this.className = className;
         this.functionLanguage = checkNotNull(functionLanguage, 
"functionLanguage cannot be null");
+        this.resourceUris = resourceUris;
     }
 
     @Override
@@ -50,7 +61,8 @@ public class CatalogFunctionImpl implements CatalogFunction {
 
     @Override
     public CatalogFunction copy() {
-        return new CatalogFunctionImpl(getClassName(), functionLanguage);
+        return new CatalogFunctionImpl(
+                getClassName(), functionLanguage, 
Collections.unmodifiableList(resourceUris));
     }
 
     @Override
@@ -85,6 +97,30 @@ public class CatalogFunctionImpl implements CatalogFunction {
         return functionLanguage;
     }
 
+    @Override
+    public List<ResourceUri> getFunctionResources() {
+        return resourceUris;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CatalogFunctionImpl that = (CatalogFunctionImpl) o;
+        return Objects.equals(className, that.className)
+                && functionLanguage == that.functionLanguage
+                && Objects.equals(resourceUris, that.resourceUris);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(className, functionLanguage, resourceUris);
+    }
+
     @Override
     public String toString() {
         return "CatalogFunctionImpl{"
@@ -93,6 +129,9 @@ public class CatalogFunctionImpl implements CatalogFunction {
                 + "', "
                 + "functionLanguage='"
                 + getFunctionLanguage()
+                + "', "
+                + "functionResource='"
+                + getFunctionResources()
                 + "'}";
     }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index b7bb95f31aa..bd624a37b7d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -40,10 +40,13 @@ import 
org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.functions.UserDefinedFunction;
 import org.apache.flink.table.functions.UserDefinedFunctionHelper;
 import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.resource.ResourceUri;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -713,6 +716,11 @@ public final class FunctionCatalog {
             return FunctionLanguage.JAVA;
         }
 
+        @Override
+        public List<ResourceUri> getFunctionResources() {
+            return Collections.emptyList();
+        }
+
         public FunctionDefinition getDefinition() {
             return definition;
         }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTempSystemFunctionOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTempSystemFunctionOperation.java
index 920ff12a0c2..281bafeee5d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTempSystemFunctionOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTempSystemFunctionOperation.java
@@ -25,9 +25,11 @@ import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.OperationUtils;
+import org.apache.flink.table.resource.ResourceUri;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 
 /** Operation to describe a CREATE FUNCTION statement for temporary system 
function. */
@@ -40,10 +42,12 @@ public class CreateTempSystemFunctionOperation implements 
CreateOperation {
             String functionName,
             String functionClass,
             boolean ignoreIfExists,
-            FunctionLanguage functionLanguage) {
+            FunctionLanguage functionLanguage,
+            List<ResourceUri> resourceUris) {
         this.functionName = functionName;
         this.ignoreIfExists = ignoreIfExists;
-        this.catalogFunction = new CatalogFunctionImpl(functionClass, 
functionLanguage);
+        this.catalogFunction =
+                new CatalogFunctionImpl(functionClass, functionLanguage, 
resourceUris);
     }
 
     public CreateTempSystemFunctionOperation(
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
index 7c65dc17904..9e812020f3b 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
@@ -19,7 +19,9 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.resource.ResourceUri;
 
+import java.util.List;
 import java.util.Optional;
 
 /** Interface for a function in a catalog. */
@@ -70,4 +72,11 @@ public interface CatalogFunction {
      * @return the language type of the function definition
      */
     FunctionLanguage getFunctionLanguage();
+
+    /**
+     * Get a detailed resource description of the function.
+     *
+     * @return an {@link ResourceUri} list of the function
+     */
+    List<ResourceUri> getFunctionResources();
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/resource/ResourceType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/resource/ResourceType.java
new file mode 100644
index 00000000000..15bbdd6656e
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/resource/ResourceType.java
@@ -0,0 +1,29 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.resource;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** An enum that represents the type of resource. */
+@PublicEvolving
+public enum ResourceType {
+    FILE,
+    JAR,
+    ARCHIVE
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/resource/ResourceUri.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/resource/ResourceUri.java
new file mode 100644
index 00000000000..56c1277cf31
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/resource/ResourceUri.java
@@ -0,0 +1,68 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.resource;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Objects;
+
+/** Description of function resource information. */
+@PublicEvolving
+public class ResourceUri {
+
+    private final ResourceType resourceType;
+    private final String uri;
+
+    public ResourceUri(ResourceType resourceType, String uri) {
+        this.resourceType = resourceType;
+        this.uri = uri;
+    }
+
+    /** Get resource type info. */
+    public ResourceType getResourceType() {
+        return resourceType;
+    }
+
+    /** Get resource unique path info. */
+    public String getUri() {
+        return uri;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ResourceUri that = (ResourceUri) o;
+        return resourceType == that.resourceType && Objects.equals(uri, 
that.uri);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(resourceType, uri);
+    }
+
+    @Override
+    public String toString() {
+        return "ResourceUri{" + "resourceType=" + resourceType + ", uri='" + 
uri + '\'' + '}';
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java
index ce0b3cb1861..80fb05809a1 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.resource.ResourceUri;
 import org.apache.flink.util.Collector;
 
 import org.junit.jupiter.api.Test;
@@ -31,6 +32,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 import javax.annotation.Nullable;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.function.Supplier;
@@ -229,6 +231,11 @@ class UserDefinedFunctionHelperTest {
         public FunctionLanguage getFunctionLanguage() {
             return FunctionLanguage.JAVA;
         }
+
+        @Override
+        public List<ResourceUri> getFunctionResources() {
+            return Collections.emptyList();
+        }
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index ef3eb9d4efe..25e2e53d03e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -55,6 +55,8 @@ import org.apache.flink.sql.parser.ddl.SqlUseCatalog;
 import org.apache.flink.sql.parser.ddl.SqlUseDatabase;
 import org.apache.flink.sql.parser.ddl.SqlUseModules;
 import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.ddl.resource.SqlResource;
+import org.apache.flink.sql.parser.ddl.resource.SqlResourceType;
 import org.apache.flink.sql.parser.dml.RichSqlInsert;
 import org.apache.flink.sql.parser.dml.SqlBeginStatementSet;
 import org.apache.flink.sql.parser.dml.SqlCompileAndExecutePlan;
@@ -168,6 +170,8 @@ import 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.hint.FlinkHints;
 import org.apache.flink.table.planner.utils.Expander;
 import org.apache.flink.table.planner.utils.OperationConverterUtils;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.resource.ResourceUri;
 import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.flink.util.StringUtils;
 
@@ -652,19 +656,21 @@ public class SqlToOperationConverter {
     private Operation convertCreateFunction(SqlCreateFunction 
sqlCreateFunction) {
         UnresolvedIdentifier unresolvedIdentifier =
                 
UnresolvedIdentifier.of(sqlCreateFunction.getFunctionIdentifier());
-
+        List<ResourceUri> resourceUris = 
getFunctionResources(sqlCreateFunction.getResourceInfos());
         if (sqlCreateFunction.isSystemFunction()) {
             return new CreateTempSystemFunctionOperation(
                     unresolvedIdentifier.getObjectName(),
                     
sqlCreateFunction.getFunctionClassName().getValueAs(String.class),
                     sqlCreateFunction.isIfNotExists(),
-                    parseLanguage(sqlCreateFunction.getFunctionLanguage()));
+                    parseLanguage(sqlCreateFunction.getFunctionLanguage()),
+                    resourceUris);
         } else {
             FunctionLanguage language = 
parseLanguage(sqlCreateFunction.getFunctionLanguage());
             CatalogFunction catalogFunction =
                     new CatalogFunctionImpl(
                             
sqlCreateFunction.getFunctionClassName().getValueAs(String.class),
-                            language);
+                            language,
+                            resourceUris);
             ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
 
             return new CreateCatalogFunctionOperation(
@@ -675,6 +681,38 @@ public class SqlToOperationConverter {
         }
     }
 
+    private List<ResourceUri> getFunctionResources(List<SqlNode> sqlResources) 
{
+        return sqlResources.stream()
+                .map(SqlResource.class::cast)
+                .map(
+                        sqlResource -> {
+                            // get resource type
+                            SqlResourceType sqlResourceType =
+                                    
sqlResource.getResourceType().getValueAs(SqlResourceType.class);
+                            ResourceType resourceType;
+                            switch (sqlResourceType) {
+                                case FILE:
+                                    resourceType = ResourceType.FILE;
+                                    break;
+                                case JAR:
+                                    resourceType = ResourceType.JAR;
+                                    break;
+                                case ARCHIVE:
+                                    resourceType = ResourceType.ARCHIVE;
+                                    break;
+                                default:
+                                    throw new ValidationException(
+                                            String.format(
+                                                    "Unsupported resource 
type: .",
+                                                    sqlResourceType));
+                            }
+                            // get resource path
+                            String path = 
sqlResource.getResourcePath().getValueAs(String.class);
+                            return new ResourceUri(resourceType, path);
+                        })
+                .collect(Collectors.toList());
+    }
+
     /** Convert ALTER FUNCTION statement. */
     private Operation convertAlterFunction(SqlAlterFunction sqlAlterFunction) {
         if (sqlAlterFunction.isSystemFunction()) {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 39a502b1f09..198a28ea536 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -81,8 +82,10 @@ import 
org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation;
 import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
 import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
 import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
+import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.CreateViewOperation;
 import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
@@ -96,6 +99,8 @@ import org.apache.flink.table.planner.parse.CalciteParser;
 import org.apache.flink.table.planner.parse.ExtendedParser;
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
 import org.apache.flink.table.planner.utils.PlannerMocks;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.resource.ResourceUri;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.CatalogManagerMocks;
 import org.apache.flink.table.utils.ExpressionResolverMocks;
@@ -1213,6 +1218,47 @@ public class SqlToOperationConverterTest {
         assertThat(actualSchema).isEqualTo(expectedSchema);
     }
 
+    @Test
+    public void testCreateFunction() {
+        // test create catalog function
+        String sql =
+                "CREATE FUNCTION test_udf AS 
'org.apache.fink.function.function1' "
+                        + "LANGUAGE JAVA USING JAR 'file:///path/to/test.jar'";
+        final FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        Operation operation = parse(sql, planner, 
getParserBySqlDialect(SqlDialect.DEFAULT));
+        
assertThat(operation).isInstanceOf(CreateCatalogFunctionOperation.class);
+        CatalogFunction actualFunction =
+                ((CreateCatalogFunctionOperation) 
operation).getCatalogFunction();
+
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "CREATE CATALOG FUNCTION: (catalogFunction: 
[Optional[This is a user-defined function]], "
+                                + "identifier: 
[`builtin`.`default`.`test_udf`], ignoreIfExists: [false], isTemporary: 
[false])");
+
+        CatalogFunction expected =
+                new CatalogFunctionImpl(
+                        "org.apache.fink.function.function1",
+                        FunctionLanguage.JAVA,
+                        Collections.singletonList(
+                                new ResourceUri(ResourceType.JAR, 
"file:///path/to/test.jar")));
+        assertThat(actualFunction).isEqualTo(expected);
+
+        // test create temporary system function
+        sql =
+                "CREATE TEMPORARY SYSTEM FUNCTION test_udf2 AS 
'org.apache.fink.function.function2' "
+                        + "LANGUAGE SCALA USING JAR 
'file:///path/to/test.jar'";
+        operation = parse(sql, planner, 
getParserBySqlDialect(SqlDialect.DEFAULT));
+
+        
assertThat(operation).isInstanceOf(CreateTempSystemFunctionOperation.class);
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "CREATE TEMPORARY SYSTEM FUNCTION: (functionName: 
[test_udf2], "
+                                + "catalogFunction: 
[CatalogFunctionImpl{className='org.apache.fink.function.function2', "
+                                + "functionLanguage='SCALA', "
+                                + 
"functionResource='[ResourceUri{resourceType=JAR, 
uri='file:///path/to/test.jar'}]'}], "
+                                + "ignoreIfExists: [false], functionLanguage: 
[SCALA])");
+    }
+
     @Test
     public void testAlterTable() throws Exception {
         prepareNonManagedTable(false);

Reply via email to