This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5494e91ddb6 [FLINK-38198] Support WITH Clause in CREATE FUNCTION 
Statement (#26874)
5494e91ddb6 is described below

commit 5494e91ddb6d43d6501f6f812afcb6dc8624e089
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Mon Aug 11 09:33:01 2025 +0200

    [FLINK-38198] Support WITH Clause in CREATE FUNCTION Statement (#26874)
---
 docs/content.zh/docs/dev/table/sql/create.md       |   1 +
 docs/content/docs/dev/table/sql/create.md          |   1 +
 flink-python/pyflink/table/catalog.py              |  10 ++
 .../src/main/codegen/includes/parserImpls.ftl      |   8 +-
 .../flink/sql/parser/ddl/SqlCreateFunction.java    |  23 ++-
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  14 ++
 .../apache/flink/table/api/FunctionDescriptor.java | 169 ++++++++++++++++++
 .../apache/flink/table/api/TableEnvironment.java   |  63 +++++++
 .../table/api/internal/TableEnvironmentImpl.java   |  51 +++++-
 .../flink/table/catalog/CatalogFunctionImpl.java   |  37 +++-
 .../table/catalog/ContextResolvedFunction.java     |  13 +-
 .../flink/table/catalog/FunctionCatalog.java       |  68 ++++----
 .../ddl/CreateTempSystemFunctionOperation.java     |   5 +-
 .../flink/table/catalog/FunctionCatalogTest.java   | 191 +++++++++++++++------
 .../flink/table/resource/ResourceManagerTest.java  |  15 +-
 .../flink/table/catalog/CatalogFunction.java       |   7 +
 .../operations/SqlNodeToOperationConversion.java   |  13 +-
 .../nodes/exec/serde/RexNodeJsonSerializer.java    |  16 +-
 .../operations/SqlDdlToOperationConverterTest.java |   2 +-
 ...erializedTableNodeToOperationConverterTest.java |   3 +-
 .../nodes/exec/serde/RexNodeJsonSerdeTest.java     | 120 +++++++++++--
 21 files changed, 699 insertions(+), 131 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/sql/create.md 
b/docs/content.zh/docs/dev/table/sql/create.md
index 3c18e3e0128..779ea6e6e72 100644
--- a/docs/content.zh/docs/dev/table/sql/create.md
+++ b/docs/content.zh/docs/dev/table/sql/create.md
@@ -863,6 +863,7 @@ CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
   [IF NOT EXISTS] [[catalog_name.]db_name.]function_name
   AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
   [USING JAR '<path_to_filename>.jar' [, JAR '<path_to_filename>.jar']* ]
+  [WITH (key1=val1, key2=val2, ...)]
 ```
 
 创建一个有 catalog 和数据库命名空间的 catalog function ,需要指定一个 identifier ,可指定 language tag 
。 若 catalog 中,已经有同名的函数注册了,则无法注册。
diff --git a/docs/content/docs/dev/table/sql/create.md 
b/docs/content/docs/dev/table/sql/create.md
index 9147b929f8c..10d570a6e34 100644
--- a/docs/content/docs/dev/table/sql/create.md
+++ b/docs/content/docs/dev/table/sql/create.md
@@ -860,6 +860,7 @@ CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
   [IF NOT EXISTS] [catalog_name.][db_name.]function_name 
   AS identifier [LANGUAGE JAVA|SCALA|PYTHON] 
   [USING JAR '<path_to_filename>.jar' [, JAR '<path_to_filename>.jar']* ]
+  [WITH (key1=val1, key2=val2, ...)]
 ```
 
 Create a catalog function that has catalog and database namespaces with the 
identifier and optional language tag. If a function with the same name already 
exists in the catalog, an exception is thrown.
diff --git a/flink-python/pyflink/table/catalog.py 
b/flink-python/pyflink/table/catalog.py
index 888f995d688..8033a95ba2c 100644
--- a/flink-python/pyflink/table/catalog.py
+++ b/flink-python/pyflink/table/catalog.py
@@ -1118,6 +1118,16 @@ class CatalogFunction(object):
         """
         return self._j_catalog_function.getFunctionLanguage()
 
+    def get_options(self) -> Dict[str, str]:
+        """
+        Returns a map of string-based options.
+
+        :return: Property map of the function.
+
+        .. versionadded:: 2.2.0
+        """
+        return dict(self._j_catalog_function.getOptions())
+
 
 @PublicEvolving()
 class CatalogModel(object):
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 9419d165874..fb2b688b42c 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -396,6 +396,7 @@ SqlCreate SqlCreateFunction(Span s, boolean replace, 
boolean isTemporary) :
     boolean ifNotExists = false;
     boolean isSystemFunction = false;
     SqlNodeList resourceInfos = SqlNodeList.EMPTY;
+    SqlNodeList propertyList = SqlNodeList.EMPTY;
     SqlParserPos functionLanguagePos = null;
 }
 {
@@ -466,6 +467,10 @@ SqlCreate SqlCreateFunction(Span s, boolean replace, 
boolean isTemporary) :
         )*
         {  resourceInfos = new SqlNodeList(resourceList, s.pos()); }
     ]
+    [
+        <WITH>
+        propertyList = Properties()
+    ]
     {
         return new SqlCreateFunction(
                     s.pos(),
@@ -475,7 +480,8 @@ SqlCreate SqlCreateFunction(Span s, boolean replace, 
boolean isTemporary) :
                     ifNotExists,
                     isTemporary,
                     isSystemFunction,
-                    resourceInfos);
+                    resourceInfos,
+                    propertyList);
     }
 }
 
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateFunction.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateFunction.java
index 252ef5f6864..af40191e9ae 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateFunction.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateFunction.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.sql.parser.ddl;
 
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+
 import org.apache.calcite.sql.SqlCharStringLiteral;
 import org.apache.calcite.sql.SqlCreate;
 import org.apache.calcite.sql.SqlIdentifier;
@@ -53,6 +55,7 @@ public class SqlCreateFunction extends SqlCreate {
     private final boolean isSystemFunction;
 
     private final SqlNodeList resourceInfos;
+    private final SqlNodeList propertyList;
 
     public SqlCreateFunction(
             SqlParserPos pos,
@@ -62,7 +65,8 @@ public class SqlCreateFunction extends SqlCreate {
             boolean ifNotExists,
             boolean isTemporary,
             boolean isSystemFunction,
-            SqlNodeList resourceInfos) {
+            SqlNodeList resourceInfos,
+            SqlNodeList propertyList) {
         super(OPERATOR, pos, false, ifNotExists);
         this.functionIdentifier = requireNonNull(functionIdentifier);
         this.functionClassName = requireNonNull(functionClassName);
@@ -70,6 +74,7 @@ public class SqlCreateFunction extends SqlCreate {
         this.isTemporary = isTemporary;
         this.functionLanguage = functionLanguage;
         this.resourceInfos = resourceInfos;
+        this.propertyList = requireNonNull(propertyList);
     }
 
     @Override
@@ -103,7 +108,7 @@ public class SqlCreateFunction extends SqlCreate {
             writer.keyword("LANGUAGE");
             writer.keyword(functionLanguage);
         }
-        if (resourceInfos.size() > 0) {
+        if (!resourceInfos.isEmpty()) {
             writer.keyword("USING");
             SqlWriter.Frame withFrame = writer.startList("", "");
             for (SqlNode resourcePath : resourceInfos) {
@@ -112,6 +117,16 @@ public class SqlCreateFunction extends SqlCreate {
             }
             writer.endList(withFrame);
         }
+        if (!this.propertyList.isEmpty()) {
+            writer.keyword("WITH");
+            SqlWriter.Frame withFrame = writer.startList("(", ")");
+            for (SqlNode property : propertyList) {
+                SqlUnparseUtils.printIndent(writer);
+                property.unparse(writer, leftPrec, rightPrec);
+            }
+            writer.newlineAndIndent();
+            writer.endList(withFrame);
+        }
     }
 
     public boolean isIfNotExists() {
@@ -141,4 +156,8 @@ public class SqlCreateFunction extends SqlCreate {
     public List<SqlNode> getResourceInfos() {
         return resourceInfos.getList();
     }
+
+    public SqlNodeList getPropertyList() {
+        return propertyList;
+    }
 }
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index f84762645b0..c71cc1e15a0 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -2491,6 +2491,20 @@ class FlinkSqlParserImplTest extends SqlParserTest {
                                 + "Was expecting:\n"
                                 + "    \"JAR\" ...\n"
                                 + "    .*");
+
+        sql("create function function1 as 
'org.apache.flink.function.function1' language java using jar 
'file:///path/to/test.jar' WITH ('k1' = 'v1', 'k2' = 'v2')")
+                .ok(
+                        "CREATE FUNCTION `FUNCTION1` AS 
'org.apache.flink.function.function1' LANGUAGE JAVA USING JAR 
'file:///path/to/test.jar' WITH (\n"
+                                + "  'k1' = 'v1',\n"
+                                + "  'k2' = 'v2'\n"
+                                + ")");
+
+        sql("create temporary function function1 as 
'org.apache.flink.function.function1' language java using jar 
'file:///path/to/test.jar' WITH ('k1' = 'v1', 'k2' = 'v2')")
+                .ok(
+                        "CREATE TEMPORARY FUNCTION `FUNCTION1` AS 
'org.apache.flink.function.function1' LANGUAGE JAVA USING JAR 
'file:///path/to/test.jar' WITH (\n"
+                                + "  'k1' = 'v1',\n"
+                                + "  'k2' = 'v2'\n"
+                                + ")");
     }
 
     @Test
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/FunctionDescriptor.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/FunctionDescriptor.java
new file mode 100644
index 00000000000..2dad5982e5b
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/FunctionDescriptor.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.functions.UserDefinedFunctionHelper;
+import org.apache.flink.table.resource.ResourceUri;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Describes a {@link CatalogFunction}.
+ *
+ * <p>A {@link FunctionDescriptor} is a template for creating a {@link 
CatalogFunction} instance. It
+ * closely resembles the "CREATE FUNCTION" SQL DDL statement.
+ *
+ * <p>This can be used to register a table in the Table API, see {@link
+ * TableEnvironment#createFunction(String, FunctionDescriptor)}.
+ */
+@PublicEvolving
+public class FunctionDescriptor {
+
+    private final String className;
+    private final FunctionLanguage language;
+    private final List<ResourceUri> resourceUris;
+    private final Map<String, String> options;
+
+    private FunctionDescriptor(
+            String className,
+            FunctionLanguage language,
+            List<ResourceUri> resourceUris,
+            Map<String, String> options) {
+        this.className = className;
+        this.language = language;
+        this.resourceUris = resourceUris;
+        this.options = options;
+    }
+
+    /** Creates a {@link Builder} for a function descriptor with the given 
class name. */
+    public static Builder forClassName(String className) {
+        return new Builder(className);
+    }
+
+    /** Creates a {@link Builder} for a function descriptor for the given 
function class. */
+    public static Builder forFunctionClass(Class<? extends 
UserDefinedFunction> functionClass) {
+        try {
+            UserDefinedFunctionHelper.validateClass(functionClass);
+        } catch (Throwable t) {
+            throw new ValidationException(
+                    String.format(
+                            "Can not create a function '%s' due to 
implementation errors.",
+                            functionClass.getName()),
+                    t);
+        }
+        return new 
Builder(functionClass.getName()).language(FunctionLanguage.JAVA);
+    }
+
+    public String getClassName() {
+        return className;
+    }
+
+    public FunctionLanguage getLanguage() {
+        return language;
+    }
+
+    public List<ResourceUri> getResourceUris() {
+        return resourceUris;
+    }
+
+    public Map<String, String> getOptions() {
+        return options;
+    }
+
+    /** Builder for {@link FunctionDescriptor}. */
+    @PublicEvolving
+    public static final class Builder {
+        private final String className;
+        private FunctionLanguage language = FunctionLanguage.JAVA;
+        private final List<ResourceUri> resourceUris = new ArrayList<>();
+        private final Map<String, String> options = new HashMap<>();
+
+        private Builder(String className) {
+            this.className = className;
+        }
+
+        /**
+         * Sets the language of the function. Equivalent to the {@code 
LANGUAGE} clause in the
+         * "CREATE FUNCTION" SQL DDL statement.
+         */
+        public Builder language(FunctionLanguage language) {
+            Preconditions.checkNotNull(language, "Function language must not 
be null.");
+            this.language = language;
+            return this;
+        }
+
+        /**
+         * Adds a list of resource URIs to the function descriptor. Equivalent 
to the {@code USING}
+         * clause in the "CREATE FUNCTION" SQL DDL statement.
+         */
+        public Builder resourceUris(List<ResourceUri> uri) {
+            Preconditions.checkNotNull(uri, "Resource URIs must not be null.");
+            this.resourceUris.addAll(uri);
+            return this;
+        }
+
+        /**
+         * Adds a single resource URI to the function descriptor. Equivalent 
to the {@code USING}
+         * clause in the "CREATE FUNCTION" SQL DDL statement.
+         */
+        public Builder resourceUri(ResourceUri uri) {
+            Preconditions.checkNotNull(uri, "Resource URI must not be null.");
+            this.resourceUris.add(uri);
+            return this;
+        }
+
+        /**
+         * Adds an option to the function descriptor. Equivalent to the {@code 
WITH} clause in the
+         * "CREATE FUNCTION" SQL DDL statement.
+         */
+        public Builder option(String key, String value) {
+            Preconditions.checkNotNull(key, "Option key must not be null.");
+            Preconditions.checkNotNull(value, "Option value must not be 
null.");
+            this.options.put(key, value);
+            return this;
+        }
+
+        /**
+         * Adds multiple options to the function descriptor. Equivalent to the 
{@code WITH} clause
+         * in the "CREATE FUNCTION" SQL DDL statement.
+         */
+        public Builder options(Map<String, String> options) {
+            Preconditions.checkNotNull(options, "Options must not be null.");
+            this.options.putAll(options);
+            return this;
+        }
+
+        public FunctionDescriptor build() {
+            return new FunctionDescriptor(
+                    className,
+                    language,
+                    Collections.unmodifiableList(resourceUris),
+                    Collections.unmodifiableMap(options));
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index 3b514aa4ae4..b0bc8538833 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -561,6 +561,21 @@ public interface TableEnvironment {
      */
     void createFunction(String path, String className, List<ResourceUri> 
resourceUris);
 
+    /**
+     * Creates a catalog function in the given path described by a {@link 
FunctionDescriptor}.
+     *
+     * <p>Compared to system functions with a globally defined name, catalog 
functions are always
+     * (implicitly or explicitly) identified by a catalog and database.
+     *
+     * <p>There must not be another function (temporary or permanent) 
registered under the same
+     * path.
+     *
+     * @param path The path under which the function will be registered. See 
also the {@link
+     *     TableEnvironment} class description for the format of the path.
+     * @param functionDescriptor The descriptor of the function to create.
+     */
+    void createFunction(String path, FunctionDescriptor functionDescriptor);
+
     /**
      * Registers a {@link UserDefinedFunction} class as a catalog function in 
the given path by the
      * specific class name and user defined resource uri.
@@ -587,6 +602,23 @@ public interface TableEnvironment {
     void createFunction(
             String path, String className, List<ResourceUri> resourceUris, 
boolean ignoreIfExists);
 
+    /**
+     * Creates a catalog function in the given path described by a {@link 
FunctionDescriptor}.
+     *
+     * <p>Compared to system functions with a globally defined name, catalog 
functions are always
+     * (implicitly or explicitly) identified by a catalog and database.
+     *
+     * <p>There must not be another function (temporary or permanent) 
registered under the same
+     * path.
+     *
+     * @param path The path under which the function will be registered. See 
also the {@link
+     *     TableEnvironment} class description for the format of the path.
+     * @param functionDescriptor The descriptor of the function to create.
+     * @param ignoreIfExists If a function exists under the given path and 
this flag is set, no
+     *     operation is executed. An exception is thrown otherwise.
+     */
+    void createFunction(String path, FunctionDescriptor functionDescriptor, 
boolean ignoreIfExists);
+
     /**
      * Registers a {@link UserDefinedFunction} class as a temporary catalog 
function.
      *
@@ -651,6 +683,24 @@ public interface TableEnvironment {
      */
     void createTemporaryFunction(String path, String className, 
List<ResourceUri> resourceUris);
 
+    /**
+     * Creates a temporary catalog function using a {@link FunctionDescriptor} 
to describe the
+     * function.
+     *
+     * <p>Compared to {@link #createTemporarySystemFunction(String, String, 
List)} with a globally
+     * defined name, catalog functions are always (implicitly or explicitly) 
identified by a catalog
+     * and database.
+     *
+     * <p>Temporary functions can shadow permanent ones. If a permanent 
function under a given name
+     * exists, it will be inaccessible in the current session. To make the 
permanent function
+     * available again one can drop the corresponding temporary function.
+     *
+     * @param path The path under which the function will be registered. See 
also the {@link
+     *     TableEnvironment} class description for the format of the path.
+     * @param functionDescriptor The descriptor of the function to create.
+     */
+    void createTemporaryFunction(String path, FunctionDescriptor 
functionDescriptor);
+
     /**
      * Registers a {@link UserDefinedFunction} class as a temporary system 
function by the specific
      * class name and user defined resource uri.
@@ -672,6 +722,19 @@ public interface TableEnvironment {
     void createTemporarySystemFunction(
             String name, String className, List<ResourceUri> resourceUris);
 
+    /**
+     * Creates a temporary system function using a {@link FunctionDescriptor} 
to describe the
+     * function.
+     *
+     * <p>Temporary functions can shadow permanent ones. If a permanent 
function under a given name
+     * exists, it will be inaccessible in the current session. To make the 
permanent function
+     * available again one can drop the corresponding temporary system 
function.
+     *
+     * @param name The name under which the function will be registered 
globally.
+     * @param functionDescriptor The descriptor of the function to create.
+     */
+    void createTemporarySystemFunction(String name, FunctionDescriptor 
functionDescriptor);
+
     /**
      * Drops a catalog function registered in the given path.
      *
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index f3c1e2f628f..37017b9df32 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.api.ExplainFormat;
 import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.api.FunctionDescriptor;
 import org.apache.flink.table.api.ModelDescriptor;
 import org.apache.flink.table.api.PlanReference;
 import org.apache.flink.table.api.ResultKind;
@@ -417,7 +418,14 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
     @Override
     public void createTemporarySystemFunction(
             String name, String className, List<ResourceUri> resourceUris) {
-        functionCatalog.registerTemporarySystemFunction(name, className, 
resourceUris);
+        createTemporarySystemFunction(
+                name,
+                
FunctionDescriptor.forClassName(className).resourceUris(resourceUris).build());
+    }
+
+    @Override
+    public void createTemporarySystemFunction(String name, FunctionDescriptor 
functionDescriptor) {
+        functionCatalog.registerTemporarySystemFunction(name, 
functionDescriptor, false);
     }
 
     @Override
@@ -435,9 +443,8 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
             String path,
             Class<? extends UserDefinedFunction> functionClass,
             boolean ignoreIfExists) {
-        final UnresolvedIdentifier unresolvedIdentifier = 
getParser().parseIdentifier(path);
-        functionCatalog.registerCatalogFunction(
-                unresolvedIdentifier, functionClass, ignoreIfExists);
+        createFunction(
+                path, 
FunctionDescriptor.forFunctionClass(functionClass).build(), ignoreIfExists);
     }
 
     @Override
@@ -448,9 +455,27 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
     @Override
     public void createFunction(
             String path, String className, List<ResourceUri> resourceUris, 
boolean ignoreIfExists) {
+        createFunction(
+                path,
+                FunctionDescriptor.forClassName(className)
+                        .language(FunctionLanguage.JAVA)
+                        .resourceUris(resourceUris)
+                        .build(),
+                ignoreIfExists);
+    }
+
+    @Override
+    public void createFunction(String path, FunctionDescriptor 
functionDescriptor) {
+        createFunction(path, functionDescriptor, false);
+    }
+
+    @Override
+    public void createFunction(
+            String path, FunctionDescriptor functionDescriptor, boolean 
ignoreIfExists) {
+
         final UnresolvedIdentifier unresolvedIdentifier = 
getParser().parseIdentifier(path);
         functionCatalog.registerCatalogFunction(
-                unresolvedIdentifier, className, resourceUris, ignoreIfExists);
+                unresolvedIdentifier, functionDescriptor, ignoreIfExists);
     }
 
     @Override
@@ -477,9 +502,23 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
     @Override
     public void createTemporaryFunction(
             String path, String className, List<ResourceUri> resourceUris) {
+        createTemporaryFunction(
+                path,
+                FunctionDescriptor.forClassName(className)
+                        .language(FunctionLanguage.JAVA)
+                        .resourceUris(resourceUris)
+                        .build());
+    }
+
+    @Override
+    public void createTemporaryFunction(String path, FunctionDescriptor 
functionDescriptor) {
         final UnresolvedIdentifier unresolvedIdentifier = 
getParser().parseIdentifier(path);
         final CatalogFunction catalogFunction =
-                new CatalogFunctionImpl(className, FunctionLanguage.JAVA, 
resourceUris);
+                new CatalogFunctionImpl(
+                        functionDescriptor.getClassName(),
+                        functionDescriptor.getLanguage(),
+                        functionDescriptor.getResourceUris(),
+                        functionDescriptor.getOptions());
         functionCatalog.registerTemporaryCatalogFunction(
                 unresolvedIdentifier, catalogFunction, false);
     }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java
index d8ad6932580..2a6829cb78d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java
@@ -24,6 +24,7 @@ import org.apache.flink.util.StringUtils;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 
@@ -36,6 +37,7 @@ public class CatalogFunctionImpl implements CatalogFunction {
     private final String className; // Fully qualified class name of the 
function
     private final FunctionLanguage functionLanguage;
     private final List<ResourceUri> resourceUris;
+    private final Map<String, String> options;
 
     public CatalogFunctionImpl(String className) {
         this(className, FunctionLanguage.JAVA, Collections.emptyList());
@@ -47,12 +49,21 @@ public class CatalogFunctionImpl implements CatalogFunction 
{
 
     public CatalogFunctionImpl(
             String className, FunctionLanguage functionLanguage, 
List<ResourceUri> resourceUris) {
+        this(className, functionLanguage, resourceUris, 
Collections.emptyMap());
+    }
+
+    public CatalogFunctionImpl(
+            String className,
+            FunctionLanguage functionLanguage,
+            List<ResourceUri> resourceUris,
+            Map<String, String> options) {
         checkArgument(
                 !StringUtils.isNullOrWhitespaceOnly(className),
                 "className cannot be null or empty");
         this.className = className;
         this.functionLanguage = checkNotNull(functionLanguage, 
"functionLanguage cannot be null");
         this.resourceUris = resourceUris;
+        this.options = checkNotNull(options, "options cannot be null");
     }
 
     @Override
@@ -63,7 +74,10 @@ public class CatalogFunctionImpl implements CatalogFunction {
     @Override
     public CatalogFunction copy() {
         return new CatalogFunctionImpl(
-                getClassName(), functionLanguage, 
Collections.unmodifiableList(resourceUris));
+                getClassName(),
+                functionLanguage,
+                Collections.unmodifiableList(resourceUris),
+                Collections.unmodifiableMap(options));
     }
 
     @Override
@@ -86,23 +100,31 @@ public class CatalogFunctionImpl implements 
CatalogFunction {
         return resourceUris;
     }
 
+    @Override
+    public Map<String, String> getOptions() {
+        return options;
+    }
+
     @Override
     public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
+
         CatalogFunctionImpl that = (CatalogFunctionImpl) o;
         return Objects.equals(className, that.className)
                 && functionLanguage == that.functionLanguage
-                && Objects.equals(resourceUris, that.resourceUris);
+                && Objects.equals(resourceUris, that.resourceUris)
+                && Objects.equals(options, that.options);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(className, functionLanguage, resourceUris);
+        int result = Objects.hashCode(className);
+        result = 31 * result + Objects.hashCode(functionLanguage);
+        result = 31 * result + Objects.hashCode(resourceUris);
+        result = 31 * result + Objects.hashCode(options);
+        return result;
     }
 
     @Override
@@ -116,6 +138,9 @@ public class CatalogFunctionImpl implements CatalogFunction 
{
                 + "', "
                 + "functionResource='"
                 + getFunctionResources()
+                + "', "
+                + "options='"
+                + getOptions()
                 + "'}";
     }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedFunction.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedFunction.java
index 8ae5c9ff179..089e08fd02d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedFunction.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedFunction.java
@@ -184,20 +184,23 @@ public final class ContextResolvedFunction {
 
     @Override
     public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
+
         ContextResolvedFunction that = (ContextResolvedFunction) o;
         return isTemporary == that.isTemporary
                 && Objects.equals(functionIdentifier, that.functionIdentifier)
-                && functionDefinition.equals(that.functionDefinition);
+                && functionDefinition.equals(that.functionDefinition)
+                && Objects.equals(catalogFunction, that.catalogFunction);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(isTemporary, functionIdentifier, 
functionDefinition);
+        int result = Boolean.hashCode(isTemporary);
+        result = 31 * result + Objects.hashCode(functionIdentifier);
+        result = 31 * result + functionDefinition.hashCode();
+        result = 31 * result + Objects.hashCode(catalogFunction);
+        return result;
     }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 058e87a36b6..d0d72467c33 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.catalog;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.FunctionDescriptor;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -55,6 +56,7 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
@@ -115,23 +117,17 @@ public final class FunctionCatalog {
                 name, new InlineCatalogFunction(definition), ignoreIfExists);
     }
 
-    /** Registers a uninstantiated temporary system function. */
-    public void registerTemporarySystemFunction(
-            String name,
-            String fullyQualifiedName,
-            FunctionLanguage language,
-            boolean ignoreIfExists) {
-        registerTemporarySystemFunction(
-                name, new CatalogFunctionImpl(fullyQualifiedName, language), 
ignoreIfExists);
-    }
-
     /** Registers a temporary system function from resource uris. */
     public void registerTemporarySystemFunction(
-            String name, String className, List<ResourceUri> resourceUris) {
+            String name, FunctionDescriptor functionDescriptor, boolean 
ignoreIfExists) {
         registerTemporarySystemFunction(
                 name,
-                new CatalogFunctionImpl(className, FunctionLanguage.JAVA, 
resourceUris),
-                false);
+                new CatalogFunctionImpl(
+                        functionDescriptor.getClassName(),
+                        functionDescriptor.getLanguage(),
+                        functionDescriptor.getResourceUris(),
+                        functionDescriptor.getOptions()),
+                ignoreIfExists);
     }
 
     /** Drops a temporary system function. Returns true if a function was 
dropped. */
@@ -208,37 +204,18 @@ public final class FunctionCatalog {
         return dropTempCatalogFunction(identifier, ignoreIfNotExist) != null;
     }
 
-    /** Registers a catalog function by also considering temporary catalog 
functions. */
-    public void registerCatalogFunction(
-            UnresolvedIdentifier unresolvedIdentifier,
-            Class<? extends UserDefinedFunction> functionClass,
-            boolean ignoreIfExists) {
-        final ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
-        final CatalogFunction catalogFunction =
-                new CatalogFunctionImpl(functionClass.getName(), 
FunctionLanguage.JAVA);
-
-        try {
-            UserDefinedFunctionHelper.validateClass(functionClass);
-        } catch (Throwable t) {
-            throw new ValidationException(
-                    String.format(
-                            "Could not register catalog function '%s' due to 
implementation errors.",
-                            identifier.asSummaryString()),
-                    t);
-        }
-
-        registerCatalogFunction(identifier, catalogFunction, ignoreIfExists);
-    }
-
     public void registerCatalogFunction(
             UnresolvedIdentifier unresolvedIdentifier,
-            String className,
-            List<ResourceUri> resourceUris,
+            FunctionDescriptor functionDescriptor,
             boolean ignoreIfExists) {
 
         final ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
         final CatalogFunction catalogFunction =
-                new CatalogFunctionImpl(className, FunctionLanguage.JAVA, 
resourceUris);
+                new CatalogFunctionImpl(
+                        functionDescriptor.getClassName(),
+                        functionDescriptor.getLanguage(),
+                        functionDescriptor.getResourceUris(),
+                        functionDescriptor.getOptions());
 
         registerCatalogFunction(identifier, catalogFunction, ignoreIfExists);
     }
@@ -888,5 +865,20 @@ public final class FunctionCatalog {
         public FunctionDefinition getDefinition() {
             return definition;
         }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            InlineCatalogFunction that = (InlineCatalogFunction) o;
+            return Objects.equals(definition, that.definition);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(definition);
+        }
     }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTempSystemFunctionOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTempSystemFunctionOperation.java
index d2bb01d062c..b9118400016 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTempSystemFunctionOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTempSystemFunctionOperation.java
@@ -48,11 +48,12 @@ public class CreateTempSystemFunctionOperation implements 
CreateOperation {
             String functionClass,
             boolean ignoreIfExists,
             FunctionLanguage functionLanguage,
-            List<ResourceUri> resourceUris) {
+            List<ResourceUri> resourceUris,
+            Map<String, String> options) {
         this.functionName = functionName;
         this.ignoreIfExists = ignoreIfExists;
         this.catalogFunction =
-                new CatalogFunctionImpl(functionClass, functionLanguage, 
resourceUris);
+                new CatalogFunctionImpl(functionClass, functionLanguage, 
resourceUris, options);
     }
 
     public CreateTempSystemFunctionOperation(
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
index fa9baacbd9e..ae6c8f639a4 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.FunctionDescriptor;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.FunctionDefinition;
@@ -115,15 +116,14 @@ class FunctionCatalogTest {
         
assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER)).isEmpty();
 
         // test catalog function is found
-        catalog.createFunction(
-                IDENTIFIER.toObjectPath(),
-                new CatalogFunctionImpl(FUNCTION_1.getClass().getName()),
-                false);
+        final CatalogFunctionImpl catalogFunction =
+                new CatalogFunctionImpl(FUNCTION_1.getClass().getName());
+        catalog.createFunction(IDENTIFIER.toObjectPath(), catalogFunction, 
false);
 
         assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER))
                 .hasValue(
                         ContextResolvedFunction.permanent(
-                                FunctionIdentifier.of(IDENTIFIER), 
FUNCTION_1));
+                                FunctionIdentifier.of(IDENTIFIER), FUNCTION_1, 
catalogFunction));
 
         // test temp catalog function is found
         functionCatalog.registerTemporaryCatalogFunction(
@@ -132,7 +132,9 @@ class FunctionCatalogTest {
         assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER))
                 .hasValue(
                         ContextResolvedFunction.temporary(
-                                FunctionIdentifier.of(IDENTIFIER), 
FUNCTION_2));
+                                FunctionIdentifier.of(IDENTIFIER),
+                                FUNCTION_2,
+                                new 
FunctionCatalog.InlineCatalogFunction(FUNCTION_2)));
     }
 
     @Test
@@ -141,15 +143,14 @@ class FunctionCatalogTest {
         
assertThat(functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER)).isEmpty();
 
         // test catalog function is found
-        catalog.createFunction(
-                IDENTIFIER.toObjectPath(),
-                new CatalogFunctionImpl(FUNCTION_1.getClass().getName()),
-                false);
+        final CatalogFunctionImpl function =
+                new CatalogFunctionImpl(FUNCTION_1.getClass().getName());
+        catalog.createFunction(IDENTIFIER.toObjectPath(), function, false);
 
         
assertThat(functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER))
                 .hasValue(
                         ContextResolvedFunction.permanent(
-                                FunctionIdentifier.of(IDENTIFIER), 
FUNCTION_1));
+                                FunctionIdentifier.of(IDENTIFIER), FUNCTION_1, 
function));
 
         // test temporary catalog function is found
         functionCatalog.registerTemporaryCatalogFunction(
@@ -158,7 +159,9 @@ class FunctionCatalogTest {
         
assertThat(functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER))
                 .hasValue(
                         ContextResolvedFunction.temporary(
-                                FunctionIdentifier.of(IDENTIFIER), 
FUNCTION_2));
+                                FunctionIdentifier.of(IDENTIFIER),
+                                FUNCTION_2,
+                                new 
FunctionCatalog.InlineCatalogFunction(FUNCTION_2)));
 
         // test system function is found
         moduleManager.loadModule("test_module", new TestModule());
@@ -230,14 +233,22 @@ class FunctionCatalogTest {
     void testUninstantiatedTemporarySystemFunction() {
         // register first time
         functionCatalog.registerTemporarySystemFunction(
-                NAME, FUNCTION_1.getClass().getName(), FunctionLanguage.JAVA, 
false);
+                NAME,
+                
FunctionDescriptor.forClassName(FUNCTION_1.getClass().getName())
+                        .language(FunctionLanguage.JAVA)
+                        .build(),
+                false);
         
assertThat(functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER))
                 .hasValue(
                         
ContextResolvedFunction.temporary(FunctionIdentifier.of(NAME), FUNCTION_1));
 
         // register second time lenient
         functionCatalog.registerTemporarySystemFunction(
-                NAME, FUNCTION_2.getClass().getName(), FunctionLanguage.JAVA, 
true);
+                NAME,
+                
FunctionDescriptor.forClassName(FUNCTION_2.getClass().getName())
+                        .language(FunctionLanguage.JAVA)
+                        .build(),
+                true);
         
assertThat(functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER))
                 .hasValue(
                         
ContextResolvedFunction.temporary(FunctionIdentifier.of(NAME), FUNCTION_1));
@@ -247,8 +258,10 @@ class FunctionCatalogTest {
                         () ->
                                 
functionCatalog.registerTemporarySystemFunction(
                                         NAME,
-                                        FUNCTION_2.getClass().getName(),
-                                        FunctionLanguage.JAVA,
+                                        FunctionDescriptor.forClassName(
+                                                        
FUNCTION_2.getClass().getName())
+                                                
.language(FunctionLanguage.JAVA)
+                                                .build(),
                                         false))
                 .satisfies(
                         anyCauseMatches(
@@ -260,8 +273,10 @@ class FunctionCatalogTest {
                         () ->
                                 
functionCatalog.registerTemporarySystemFunction(
                                         NAME,
-                                        FUNCTION_INVALID.getClass().getName(),
-                                        FunctionLanguage.JAVA,
+                                        FunctionDescriptor.forClassName(
+                                                        
FUNCTION_INVALID.getClass().getName())
+                                                
.language(FunctionLanguage.JAVA)
+                                                .build(),
                                         false))
                 .satisfies(
                         anyCauseMatches(
@@ -274,7 +289,11 @@ class FunctionCatalogTest {
 
         // test register uninstantiated table function
         functionCatalog.registerTemporarySystemFunction(
-                NAME, TABLE_FUNCTION.getClass().getName(), 
FunctionLanguage.JAVA, false);
+                NAME,
+                
FunctionDescriptor.forClassName(TABLE_FUNCTION.getClass().getName())
+                        .language(FunctionLanguage.JAVA)
+                        .build(),
+                false);
         
assertThat(functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER))
                 .hasValue(
                         ContextResolvedFunction.temporary(
@@ -284,7 +303,11 @@ class FunctionCatalogTest {
 
         // test register uninstantiated aggregate function
         functionCatalog.registerTemporarySystemFunction(
-                NAME, AGGREGATE_FUNCTION.getClass().getName(), 
FunctionLanguage.JAVA, false);
+                NAME,
+                
FunctionDescriptor.forClassName(AGGREGATE_FUNCTION.getClass().getName())
+                        .language(FunctionLanguage.JAVA)
+                        .build(),
+                false);
         
assertThat(functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER))
                 .hasValue(
                         ContextResolvedFunction.temporary(
@@ -295,26 +318,37 @@ class FunctionCatalogTest {
     void testCatalogFunction() {
         // register first time
         functionCatalog.registerCatalogFunction(
-                PARTIAL_UNRESOLVED_IDENTIFIER, FUNCTION_1.getClass(), false);
+                PARTIAL_UNRESOLVED_IDENTIFIER,
+                
FunctionDescriptor.forFunctionClass(FUNCTION_1.getClass()).build(),
+                false);
+        final CatalogFunctionImpl catalogFunction =
+                new CatalogFunctionImpl(
+                        FUNCTION_1.getClass().getName(),
+                        FunctionLanguage.JAVA,
+                        Collections.emptyList(),
+                        Collections.emptyMap());
         assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER))
                 .hasValue(
                         ContextResolvedFunction.permanent(
-                                FunctionIdentifier.of(IDENTIFIER), 
FUNCTION_1));
+                                FunctionIdentifier.of(IDENTIFIER), FUNCTION_1, 
catalogFunction));
 
         // register second time lenient
         functionCatalog.registerCatalogFunction(
-                PARTIAL_UNRESOLVED_IDENTIFIER, FUNCTION_2.getClass(), true);
+                PARTIAL_UNRESOLVED_IDENTIFIER,
+                
FunctionDescriptor.forFunctionClass(FUNCTION_2.getClass()).build(),
+                true);
         assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER))
                 .hasValue(
                         ContextResolvedFunction.permanent(
-                                FunctionIdentifier.of(IDENTIFIER), 
FUNCTION_1));
+                                FunctionIdentifier.of(IDENTIFIER), FUNCTION_1, 
catalogFunction));
 
         // register second time not lenient
         assertThatThrownBy(
                         () ->
                                 functionCatalog.registerCatalogFunction(
                                         PARTIAL_UNRESOLVED_IDENTIFIER,
-                                        FUNCTION_2.getClass(),
+                                        
FunctionDescriptor.forFunctionClass(FUNCTION_2.getClass())
+                                                .build(),
                                         false))
                 .satisfies(
                         anyCauseMatches(
@@ -349,25 +383,56 @@ class FunctionCatalogTest {
                         () ->
                                 functionCatalog.registerCatalogFunction(
                                         PARTIAL_UNRESOLVED_IDENTIFIER,
-                                        FUNCTION_INVALID.getClass(),
+                                        FunctionDescriptor.forFunctionClass(
+                                                        
FUNCTION_INVALID.getClass())
+                                                .build(),
                                         false))
                 .satisfies(
                         anyCauseMatches(
                                 ValidationException.class,
-                                "Could not register catalog function '"
-                                        + IDENTIFIER.asSummaryString()
+                                "Can not create a function '"
+                                        + FUNCTION_INVALID.getClass().getName()
                                         + "' due to implementation errors."));
     }
 
+    @Test
+    void testCatalogFunctionWithOptions() {
+        // register permanent function
+        functionCatalog.registerCatalogFunction(
+                PARTIAL_UNRESOLVED_IDENTIFIER,
+                FunctionDescriptor.forFunctionClass(FUNCTION_2.getClass())
+                        .option("key1", "option1")
+                        .build(),
+                false);
+        assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER))
+                .hasValue(
+                        ContextResolvedFunction.permanent(
+                                FunctionIdentifier.of(IDENTIFIER),
+                                FUNCTION_2,
+                                new CatalogFunctionImpl(
+                                        FUNCTION_2.getClass().getName(),
+                                        FunctionLanguage.JAVA,
+                                        Collections.emptyList(),
+                                        Collections.singletonMap("key1", 
"option1"))));
+    }
+
     @Test
     void testTemporaryCatalogFunction() {
         // register permanent function
         functionCatalog.registerCatalogFunction(
-                PARTIAL_UNRESOLVED_IDENTIFIER, FUNCTION_2.getClass(), false);
+                PARTIAL_UNRESOLVED_IDENTIFIER,
+                
FunctionDescriptor.forFunctionClass(FUNCTION_2.getClass()).build(),
+                false);
+        final CatalogFunctionImpl catalogFunction =
+                new CatalogFunctionImpl(
+                        FUNCTION_2.getClass().getName(),
+                        FunctionLanguage.JAVA,
+                        Collections.emptyList(),
+                        Collections.emptyMap());
         assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER))
                 .hasValue(
                         ContextResolvedFunction.permanent(
-                                FunctionIdentifier.of(IDENTIFIER), 
FUNCTION_2));
+                                FunctionIdentifier.of(IDENTIFIER), FUNCTION_2, 
catalogFunction));
 
         // register temporary first time
         functionCatalog.registerTemporaryCatalogFunction(
@@ -375,8 +440,10 @@ class FunctionCatalogTest {
         assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER))
                 .hasValue(
                         ContextResolvedFunction.temporary(
-                                FunctionIdentifier.of(IDENTIFIER),
-                                FUNCTION_1)); // temporary function hides 
catalog function
+                                FunctionIdentifier.of(
+                                        IDENTIFIER), // temporary function 
hides catalog function
+                                FUNCTION_1,
+                                new 
FunctionCatalog.InlineCatalogFunction(FUNCTION_1)));
 
         // dropping catalog functions is not possible in this state
         assertThatThrownBy(
@@ -396,7 +463,8 @@ class FunctionCatalogTest {
                         () ->
                                 functionCatalog.registerCatalogFunction(
                                         PARTIAL_UNRESOLVED_IDENTIFIER,
-                                        FUNCTION_2.getClass(),
+                                        
FunctionDescriptor.forFunctionClass(FUNCTION_2.getClass())
+                                                .build(),
                                         false))
                 .satisfies(
                         anyCauseMatches(
@@ -412,7 +480,9 @@ class FunctionCatalogTest {
         assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER))
                 .hasValue(
                         ContextResolvedFunction.temporary(
-                                FunctionIdentifier.of(IDENTIFIER), 
FUNCTION_1));
+                                FunctionIdentifier.of(IDENTIFIER),
+                                FUNCTION_1,
+                                new 
FunctionCatalog.InlineCatalogFunction(FUNCTION_1)));
 
         // register temporary second time not lenient
         assertThatThrownBy(
@@ -435,7 +505,8 @@ class FunctionCatalogTest {
                 .hasValue(
                         ContextResolvedFunction.permanent(
                                 FunctionIdentifier.of(IDENTIFIER),
-                                FUNCTION_2)); // permanent function is visible 
again
+                                FUNCTION_2,
+                                catalogFunction)); // permanent function is 
visible again
 
         // drop temporary second time lenient
         assertThat(
@@ -472,32 +543,42 @@ class FunctionCatalogTest {
     void testUninstantiatedTemporaryCatalogFunction() {
         // register permanent function
         functionCatalog.registerCatalogFunction(
-                PARTIAL_UNRESOLVED_IDENTIFIER, FUNCTION_2.getClass(), false);
+                PARTIAL_UNRESOLVED_IDENTIFIER,
+                
FunctionDescriptor.forFunctionClass(FUNCTION_2.getClass()).build(),
+                false);
         assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER))
                 .hasValue(
                         ContextResolvedFunction.permanent(
-                                FunctionIdentifier.of(IDENTIFIER), 
FUNCTION_2));
+                                FunctionIdentifier.of(IDENTIFIER),
+                                FUNCTION_2,
+                                new CatalogFunctionImpl(
+                                        FUNCTION_2.getClass().getName(),
+                                        FunctionLanguage.JAVA,
+                                        Collections.emptyList(),
+                                        Collections.emptyMap())));
 
         // register temporary first time
+        final CatalogFunctionImpl temporaryCatalogFunction =
+                new CatalogFunctionImpl(FUNCTION_1.getClass().getName());
         functionCatalog.registerTemporaryCatalogFunction(
-                PARTIAL_UNRESOLVED_IDENTIFIER,
-                new CatalogFunctionImpl(FUNCTION_1.getClass().getName()),
-                false);
+                PARTIAL_UNRESOLVED_IDENTIFIER, temporaryCatalogFunction, 
false);
         // temporary function hides catalog function
         assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER))
                 .hasValue(
                         ContextResolvedFunction.temporary(
-                                FunctionIdentifier.of(IDENTIFIER), 
FUNCTION_1));
+                                FunctionIdentifier.of(IDENTIFIER),
+                                FUNCTION_1,
+                                temporaryCatalogFunction));
 
         // register temporary second time lenient
         functionCatalog.registerTemporaryCatalogFunction(
-                PARTIAL_UNRESOLVED_IDENTIFIER,
-                new CatalogFunctionImpl(FUNCTION_1.getClass().getName()),
-                true);
+                PARTIAL_UNRESOLVED_IDENTIFIER, temporaryCatalogFunction, true);
         assertThat(functionCatalog.lookupFunction(FULL_UNRESOLVED_IDENTIFIER))
                 .hasValue(
                         ContextResolvedFunction.temporary(
-                                FunctionIdentifier.of(IDENTIFIER), 
FUNCTION_1));
+                                FunctionIdentifier.of(IDENTIFIER),
+                                FUNCTION_1,
+                                temporaryCatalogFunction));
 
         // register temporary second time not lenient
         assertThatThrownBy(
@@ -534,26 +615,30 @@ class FunctionCatalogTest {
         
functionCatalog.dropTemporaryCatalogFunction(PARTIAL_UNRESOLVED_IDENTIFIER, 
true);
 
         // test register uninstantiated table function
+        final CatalogFunctionImpl temporaryTableCatalogFunction =
+                new CatalogFunctionImpl(TABLE_FUNCTION.getClass().getName());
         functionCatalog.registerTemporaryCatalogFunction(
-                PARTIAL_UNRESOLVED_IDENTIFIER,
-                new CatalogFunctionImpl(TABLE_FUNCTION.getClass().getName()),
-                false);
+                PARTIAL_UNRESOLVED_IDENTIFIER, temporaryTableCatalogFunction, 
false);
         
assertThat(functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER))
                 .hasValue(
                         ContextResolvedFunction.temporary(
-                                FunctionIdentifier.of(IDENTIFIER), 
TABLE_FUNCTION));
+                                FunctionIdentifier.of(IDENTIFIER),
+                                TABLE_FUNCTION,
+                                temporaryTableCatalogFunction));
 
         
functionCatalog.dropTemporaryCatalogFunction(PARTIAL_UNRESOLVED_IDENTIFIER, 
true);
 
         // test register uninstantiated aggregate function
+        final CatalogFunctionImpl temporaryAggregateCatalogFunction =
+                new 
CatalogFunctionImpl(AGGREGATE_FUNCTION.getClass().getName());
         functionCatalog.registerTemporaryCatalogFunction(
-                PARTIAL_UNRESOLVED_IDENTIFIER,
-                new 
CatalogFunctionImpl(AGGREGATE_FUNCTION.getClass().getName()),
-                false);
+                PARTIAL_UNRESOLVED_IDENTIFIER, 
temporaryAggregateCatalogFunction, false);
         
assertThat(functionCatalog.lookupFunction(PARTIAL_UNRESOLVED_IDENTIFIER))
                 .hasValue(
                         ContextResolvedFunction.temporary(
-                                FunctionIdentifier.of(IDENTIFIER), 
AGGREGATE_FUNCTION));
+                                FunctionIdentifier.of(IDENTIFIER),
+                                AGGREGATE_FUNCTION,
+                                temporaryAggregateCatalogFunction));
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
index cb98e51a09e..0b14e7f8de6 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.table.api.FunctionDescriptor;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogFunctionImpl;
 import org.apache.flink.table.catalog.FunctionCatalog;
@@ -385,7 +386,12 @@ public class ResourceManagerTest {
                         new ModuleManager());
 
         functionCatalog.registerCatalogFunction(
-                FULL_UNRESOLVED_IDENTIFIER1, GENERATED_LOWER_UDF_CLASS, 
resourceUris, false);
+                FULL_UNRESOLVED_IDENTIFIER1,
+                FunctionDescriptor.forClassName(GENERATED_LOWER_UDF_CLASS)
+                        .language(FunctionLanguage.JAVA)
+                        .resourceUris(resourceUris)
+                        .build(),
+                false);
 
         Map<ResourceUri, ResourceManager.ResourceCounter> 
functionResourceInfos =
                 resourceManager.functionResourceInfos();
@@ -396,7 +402,12 @@ public class ResourceManagerTest {
         // Register catalog function again to validate that unregister catalog 
function will not
         // decrease the reference count of resourceUris.
         functionCatalog.registerCatalogFunction(
-                FULL_UNRESOLVED_IDENTIFIER1, GENERATED_LOWER_UDF_CLASS, 
resourceUris, false);
+                FULL_UNRESOLVED_IDENTIFIER1,
+                FunctionDescriptor.forClassName(GENERATED_LOWER_UDF_CLASS)
+                        .language(FunctionLanguage.JAVA)
+                        .resourceUris(resourceUris)
+                        .build(),
+                false);
         functionCatalog.registerTemporaryCatalogFunction(
                 FULL_UNRESOLVED_IDENTIFIER2,
                 new CatalogFunctionImpl(
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
index 090c699e33f..e394db7dd71 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
@@ -21,7 +21,9 @@ package org.apache.flink.table.catalog;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.resource.ResourceUri;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /** Interface for a function in a catalog. */
@@ -69,4 +71,9 @@ public interface CatalogFunction {
      * @return an {@link ResourceUri} list of the function
      */
     List<ResourceUri> getFunctionResources();
+
+    /** Returns a map of string-based options. */
+    default Map<String, String> getOptions() {
+        return Collections.emptyMap();
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
index 6c0b31cf10d..269af4f60d0 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
@@ -558,20 +558,29 @@ public class SqlNodeToOperationConversion {
         UnresolvedIdentifier unresolvedIdentifier =
                 
UnresolvedIdentifier.of(sqlCreateFunction.getFunctionIdentifier());
         List<ResourceUri> resourceUris = 
getFunctionResources(sqlCreateFunction.getResourceInfos());
+        final Map<String, String> options =
+                sqlCreateFunction.getPropertyList().getList().stream()
+                        .map(SqlTableOption.class::cast)
+                        .collect(
+                                Collectors.toMap(
+                                        SqlTableOption::getKeyString,
+                                        SqlTableOption::getValueString));
         if (sqlCreateFunction.isSystemFunction()) {
             return new CreateTempSystemFunctionOperation(
                     unresolvedIdentifier.getObjectName(),
                     
sqlCreateFunction.getFunctionClassName().getValueAs(String.class),
                     sqlCreateFunction.isIfNotExists(),
                     parseLanguage(sqlCreateFunction.getFunctionLanguage()),
-                    resourceUris);
+                    resourceUris,
+                    options);
         } else {
             FunctionLanguage language = 
parseLanguage(sqlCreateFunction.getFunctionLanguage());
             CatalogFunction catalogFunction =
                     new CatalogFunctionImpl(
                             
sqlCreateFunction.getFunctionClassName().getValueAs(String.class),
                             language,
-                            resourceUris);
+                            resourceUris,
+                            options);
             ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
 
             return new CreateCatalogFunctionOperation(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
index facb4a11158..23b11bd531f 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
@@ -492,7 +492,7 @@ final class RexNodeJsonSerializer extends 
StdSerializer<RexNode> {
             assert identifier.getIdentifier().isPresent();
             serializeCatalogFunction(
                     identifier.getIdentifier().get(),
-                    resolvedFunction.getDefinition(),
+                    resolvedFunction,
                     gen,
                     serializerProvider,
                     serializeCatalogObjects);
@@ -501,7 +501,7 @@ final class RexNodeJsonSerializer extends 
StdSerializer<RexNode> {
 
     private static void serializeCatalogFunction(
             ObjectIdentifier objectIdentifier,
-            FunctionDefinition definition,
+            ContextResolvedFunction resolvedFunction,
             JsonGenerator gen,
             SerializerProvider serializerProvider,
             boolean serializeCatalogObjects)
@@ -512,6 +512,18 @@ final class RexNodeJsonSerializer extends 
StdSerializer<RexNode> {
             return;
         }
 
+        if (resolvedFunction.getCatalogFunction() != null
+                && 
!resolvedFunction.getCatalogFunction().getOptions().isEmpty()) {
+            throw new TableException(
+                    String.format(
+                            "Catalog functions with custom options can not be 
serialized into the "
+                                    + "compiled plan. Please set the option 
'%s'='%s' to only"
+                                    + " serialize the function's identifier.",
+                            
TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS.key(),
+                            CatalogPlanCompilation.IDENTIFIER));
+        }
+
+        final FunctionDefinition definition = resolvedFunction.getDefinition();
         if (!(definition instanceof UserDefinedFunction)
                 || !isClassNameSerializable((UserDefinedFunction) definition)) 
{
             throw cannotSerializePermanentCatalogFunction(objectIdentifier);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index da91a54060f..f5f4e4d6696 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -1075,7 +1075,7 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
                         "CREATE TEMPORARY SYSTEM FUNCTION: (functionName: 
[test_udf2], "
                                 + "catalogFunction: 
[CatalogFunctionImpl{className='org.apache.fink.function.function2', "
                                 + "functionLanguage='SCALA', "
-                                + 
"functionResource='[ResourceUri{resourceType=JAR, 
uri='file:///path/to/test.jar'}]'}], "
+                                + 
"functionResource='[ResourceUri{resourceType=JAR, 
uri='file:///path/to/test.jar'}]', options='{}'}], "
                                 + "ignoreIfExists: [false], functionLanguage: 
[SCALA])");
     }
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 27d8ea979fc..2487e6427a2 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.operations;
 
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.FunctionDescriptor;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
@@ -161,7 +162,7 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
                 UnresolvedIdentifier.of(
                         ObjectIdentifier.of(
                                 catalogManager.getCurrentCatalog(), "default", 
"myFunc")),
-                TableFunc0.class,
+                FunctionDescriptor.forFunctionClass(TableFunc0.class).build(),
                 true);
 
         final String sql =
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
index c151cac5a02..76b0066256b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
@@ -20,12 +20,16 @@ package 
org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.FunctionDescriptor;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import 
org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation;
 import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanRestore;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
 import org.apache.flink.table.catalog.ContextResolvedFunction;
 import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.functions.AsyncScalarFunction;
@@ -80,6 +84,7 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -135,7 +140,23 @@ public class RexNodeJsonSerdeTest {
     private static final NonSerializableFunctionDefinition 
NON_SER_FUNCTION_DEF_IMPL =
             new NonSerializableFunctionDefinition();
     private static final ContextResolvedFunction PERMANENT_FUNCTION =
-            ContextResolvedFunction.permanent(FUNCTION_CAT_ID, SER_UDF_IMPL);
+            ContextResolvedFunction.permanent(
+                    FUNCTION_CAT_ID,
+                    SER_UDF_IMPL,
+                    new CatalogFunctionImpl(
+                            SER_UDF_IMPL.getClass().getName(),
+                            FunctionLanguage.JAVA,
+                            Collections.emptyList(),
+                            Collections.emptyMap()));
+    private static final ContextResolvedFunction 
PERMANENT_FUNCTION_WITH_OPTIONS =
+            ContextResolvedFunction.permanent(
+                    FUNCTION_CAT_ID,
+                    SER_UDF_IMPL,
+                    new CatalogFunctionImpl(
+                            SER_UDF_IMPL.getClass().getName(),
+                            FunctionLanguage.JAVA,
+                            Collections.emptyList(),
+                            Map.of("option1", "value1", "option2", "value2")));
 
     @ParameterizedTest
     @MethodSource("testRexNodeSerde")
@@ -329,7 +350,32 @@ public class RexNodeJsonSerdeTest {
                 assertThat(actual)
                         .isEqualTo(
                                 ContextResolvedFunction.permanent(
-                                        FUNCTION_CAT_ID, SER_UDF_IMPL_OTHER));
+                                        FUNCTION_CAT_ID,
+                                        SER_UDF_IMPL_OTHER,
+                                        new CatalogFunctionImpl(
+                                                
SER_UDF_IMPL_OTHER.getClass().getName(),
+                                                FunctionLanguage.JAVA,
+                                                Collections.emptyList(),
+                                                Collections.emptyMap())));
+            }
+
+            @Test
+            void withCatalogFunctionWithOptions() throws Exception {
+                final SerdeContext serdeContext = serdeContext(compilation, 
restore);
+                serdeContext
+                        .getFlinkContext()
+                        .getFunctionCatalog()
+                        .registerCatalogFunction(
+                                UNRESOLVED_FUNCTION_CAT_ID,
+                                
FunctionDescriptor.forFunctionClass(SER_UDF_CLASS)
+                                        .option("option1", "value1")
+                                        .build(),
+                                false);
+
+                testJsonRoundTrip(
+                        serdeContext,
+                        createFunctionCall(serdeContext, 
PERMANENT_FUNCTION_WITH_OPTIONS),
+                        RexNode.class);
             }
         }
 
@@ -367,7 +413,10 @@ public class RexNodeJsonSerdeTest {
                 assertThat(actual)
                         .isEqualTo(
                                 ContextResolvedFunction.temporary(
-                                        FUNCTION_CAT_ID, 
NON_SER_FUNCTION_DEF_IMPL));
+                                        FUNCTION_CAT_ID,
+                                        NON_SER_FUNCTION_DEF_IMPL,
+                                        new 
FunctionCatalog.InlineCatalogFunction(
+                                                NON_SER_FUNCTION_DEF_IMPL)));
             }
         }
 
@@ -402,7 +451,10 @@ public class RexNodeJsonSerdeTest {
                 assertThat(actual)
                         .isEqualTo(
                                 ContextResolvedFunction.temporary(
-                                        FUNCTION_CAT_ID, 
NON_SER_FUNCTION_DEF_IMPL));
+                                        FUNCTION_CAT_ID,
+                                        NON_SER_FUNCTION_DEF_IMPL,
+                                        new 
FunctionCatalog.InlineCatalogFunction(
+                                                NON_SER_FUNCTION_DEF_IMPL)));
             }
 
             @Test
@@ -418,7 +470,13 @@ public class RexNodeJsonSerdeTest {
                 assertThat(actual)
                         .isEqualTo(
                                 ContextResolvedFunction.permanent(
-                                        FUNCTION_CAT_ID, SER_UDF_IMPL_OTHER));
+                                        FUNCTION_CAT_ID,
+                                        SER_UDF_IMPL_OTHER,
+                                        new CatalogFunctionImpl(
+                                                
SER_UDF_IMPL_OTHER.getClass().getName(),
+                                                FunctionLanguage.JAVA,
+                                                Collections.emptyList(),
+                                                Collections.emptyMap())));
             }
         }
     }
@@ -475,7 +533,38 @@ public class RexNodeJsonSerdeTest {
                 assertThat(actual)
                         .isEqualTo(
                                 ContextResolvedFunction.temporary(
-                                        FUNCTION_CAT_ID, 
NON_SER_FUNCTION_DEF_IMPL));
+                                        FUNCTION_CAT_ID,
+                                        NON_SER_FUNCTION_DEF_IMPL,
+                                        new 
FunctionCatalog.InlineCatalogFunction(
+                                                NON_SER_FUNCTION_DEF_IMPL)));
+            }
+
+            @Test
+            void withCatalogFunctionWithOptions() throws Exception {
+                final SerdeContext serdeContext = serdeContext(compilation, 
restore);
+                serdeContext
+                        .getFlinkContext()
+                        .getFunctionCatalog()
+                        .registerCatalogFunction(
+                                UNRESOLVED_FUNCTION_CAT_ID,
+                                
FunctionDescriptor.forFunctionClass(SER_UDF_CLASS)
+                                        .option("option1", "value1")
+                                        .build(),
+                                false);
+
+                assertThatThrownBy(
+                                () ->
+                                        testJsonRoundTrip(
+                                                serdeContext,
+                                                createFunctionCall(
+                                                        serdeContext,
+                                                        
PERMANENT_FUNCTION_WITH_OPTIONS),
+                                                RexNode.class))
+                        .hasMessageContaining(
+                                "Catalog functions with custom options can"
+                                        + " not be serialized into the 
compiled plan. Please set the option"
+                                        + " 
'table.plan.compile.catalog-objects'='IDENTIFIER' to only serialize"
+                                        + " the function's identifier.");
             }
         }
 
@@ -552,7 +641,10 @@ public class RexNodeJsonSerdeTest {
                 assertThat(actual)
                         .isEqualTo(
                                 ContextResolvedFunction.temporary(
-                                        FUNCTION_CAT_ID, 
NON_SER_FUNCTION_DEF_IMPL));
+                                        FUNCTION_CAT_ID,
+                                        NON_SER_FUNCTION_DEF_IMPL,
+                                        new 
FunctionCatalog.InlineCatalogFunction(
+                                                NON_SER_FUNCTION_DEF_IMPL)));
             }
         }
     }
@@ -760,12 +852,17 @@ public class RexNodeJsonSerdeTest {
         serdeContext
                 .getFlinkContext()
                 .getFunctionCatalog()
-                .registerCatalogFunction(UNRESOLVED_FUNCTION_CAT_ID, 
SER_UDF_CLASS, false);
+                .registerCatalogFunction(
+                        UNRESOLVED_FUNCTION_CAT_ID,
+                        
FunctionDescriptor.forFunctionClass(SER_UDF_CLASS).build(),
+                        false);
         serdeContext
                 .getFlinkContext()
                 .getFunctionCatalog()
                 .registerCatalogFunction(
-                        UNRESOLVED_ASYNC_FUNCTION_CAT_ID, SER_ASYNC_UDF_CLASS, 
false);
+                        UNRESOLVED_ASYNC_FUNCTION_CAT_ID,
+                        
FunctionDescriptor.forFunctionClass(SER_ASYNC_UDF_CLASS).build(),
+                        false);
         return serdeContext;
     }
 
@@ -781,7 +878,10 @@ public class RexNodeJsonSerdeTest {
         serdeContext
                 .getFlinkContext()
                 .getFunctionCatalog()
-                .registerCatalogFunction(UNRESOLVED_FUNCTION_CAT_ID, 
SER_UDF_CLASS_OTHER, false);
+                .registerCatalogFunction(
+                        UNRESOLVED_FUNCTION_CAT_ID,
+                        
FunctionDescriptor.forFunctionClass(SER_UDF_CLASS_OTHER).build(),
+                        false);
     }
 
     private static void registerTemporaryFunction(SerdeContext serdeContext) {

Reply via email to