dianfu commented on a change in pull request #12476:
URL: https://github.com/apache/flink/pull/12476#discussion_r435883002



##########
File path: flink-python/pyflink/table/table_environment.py
##########
@@ -154,7 +155,9 @@ def load_module(self, module_name: str, module: Module):
         ValidationException is thrown when there is already a module with the 
same name.
 
         :param module_name: Name of the :class:`~pyflink.table.Module`.
+        :type module_name: str

Review comment:
       could be removed since the type information is already available in the 
typehint.

##########
File path: flink-python/pyflink/table/table_environment.py
##########
@@ -166,11 +169,254 @@ def unload_module(self, module_name: str):
         ValidationException is thrown when there is no module with the given 
name.
 
         :param module_name: Name of the :class:`~pyflink.table.Module`.
+        :type module_name: str
 
         .. versionadded:: 1.12.0
         """
         self._j_tenv.unloadModule(module_name)
 
+    def create_java_temporary_system_function(self, name: str, 
function_class_name: str):
+        """
+        Registers a java user defined function class as a temporary system 
function.
+
+        Compared to .. seealso:: :func:`create_temporary_function`, system 
functions are identified
+        by a global name that is independent of the current catalog and 
current database. Thus,
+        this method allows to extend the set of built-in system functions like 
TRIM, ABS, etc.
+
+        Temporary functions can shadow permanent ones. If a permanent function 
under a given name
+        exists, it will be inaccessible in the current session. To make the 
permanent function
+        available again one can drop the corresponding temporary system 
function.
+
+        Example:
+        ::
+
+            >>> table_env.create_java_temporary_system_function("func",
+            ...     "java.user.defined.function.class.name")
+
+        :param name: The name under which the function will be registered 
globally.
+        :type name: str
+        :param function_class_name: The java full qualified class name of the 
function class
+                                    containing the implementation. The 
function must have a
+                                    public no-argument constructor and can be 
founded in current
+                                    Java classloader.
+        :type function_class_name: str
+
+        .. versionadded:: 1.12.0
+        """
+        gateway = get_gateway()
+        java_function = 
gateway.jvm.Thread.currentThread().getContextClassLoader() \
+            .loadClass(function_class_name)
+        self._j_tenv.createTemporarySystemFunction(name, java_function)
+
+    def create_temporary_system_function(self, name: str,
+                                         function: UserDefinedFunctionWrapper):
+        """
+        Registers a python user defined function class as a temporary system 
function.
+
+        Compared to .. seealso:: :func:`create_temporary_function`, system 
functions are identified
+        by a global name that is independent of the current catalog and 
current database. Thus,
+        this method allows to extend the set of built-in system functions like 
TRIM, ABS, etc.
+
+        Temporary functions can shadow permanent ones. If a permanent function 
under a given name
+        exists, it will be inaccessible in the current session. To make the 
permanent function
+        available again one can drop the corresponding temporary system 
function.
+
+        Example:
+        ::
+
+            >>> table_env.create_temporary_system_function(
+            ...     "add_one", udf(lambda i: i + 1, DataTypes.BIGINT(), 
DataTypes.BIGINT()))
+
+            >>> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
+            ...      result_type=DataTypes.BIGINT())
+            ... def add(i, j):
+            ...     return i + j
+            >>> table_env.create_temporary_system_function("add", add)
+
+            >>> class SubtractOne(ScalarFunction):
+            ...     def eval(self, i):
+            ...         return i - 1
+            >>> table_env.create_temporary_system_function(
+            ...     "subtract_one", udf(SubtractOne(), DataTypes.BIGINT(), 
DataTypes.BIGINT()))
+
+        :param name: The name under which the function will be registered 
globally.
+        :type name: str
+        :param function: The function class containing the implementation. The 
function must have a
+                         public no-argument constructor and can be founded in 
current Java
+                         classloader.
+        :type function: pyflink.table.udf.UserDefinedFunctionWrapper
+
+        .. versionadded:: 1.12.0
+        """
+        java_function = function.java_user_defined_function()
+        self._j_tenv.createTemporarySystemFunction(name, java_function)
+
+    def drop_temporary_system_function(self, name: str):
+        """
+        Drops a temporary system function registered under the given name.
+
+        If a permanent function with the given name exists, it will be used 
from now on for any
+        queries that reference this name.
+
+        :param name: The name under which the function has been registered 
globally.
+        :type name: str
+        :return: true if a function existed under the given name and was 
removed.
+        :rtype: bool
+
+        .. versionadded:: 1.12.0
+        """
+        return self._j_tenv.dropTemporarySystemFunction(name)
+
+    def create_java_function(self, path: str, function_class_name: str,

Review comment:
       should also add `create_function`?

##########
File path: flink-python/pyflink/table/table_environment.py
##########
@@ -166,11 +169,254 @@ def unload_module(self, module_name: str):
         ValidationException is thrown when there is no module with the given 
name.
 
         :param module_name: Name of the :class:`~pyflink.table.Module`.
+        :type module_name: str
 
         .. versionadded:: 1.12.0
         """
         self._j_tenv.unloadModule(module_name)
 
+    def create_java_temporary_system_function(self, name: str, 
function_class_name: str):
+        """
+        Registers a java user defined function class as a temporary system 
function.
+
+        Compared to .. seealso:: :func:`create_temporary_function`, system 
functions are identified
+        by a global name that is independent of the current catalog and 
current database. Thus,
+        this method allows to extend the set of built-in system functions like 
TRIM, ABS, etc.
+
+        Temporary functions can shadow permanent ones. If a permanent function 
under a given name
+        exists, it will be inaccessible in the current session. To make the 
permanent function
+        available again one can drop the corresponding temporary system 
function.
+
+        Example:
+        ::
+
+            >>> table_env.create_java_temporary_system_function("func",
+            ...     "java.user.defined.function.class.name")
+
+        :param name: The name under which the function will be registered 
globally.
+        :type name: str
+        :param function_class_name: The java full qualified class name of the 
function class
+                                    containing the implementation. The 
function must have a
+                                    public no-argument constructor and can be 
founded in current
+                                    Java classloader.
+        :type function_class_name: str
+
+        .. versionadded:: 1.12.0
+        """
+        gateway = get_gateway()
+        java_function = 
gateway.jvm.Thread.currentThread().getContextClassLoader() \
+            .loadClass(function_class_name)
+        self._j_tenv.createTemporarySystemFunction(name, java_function)
+
+    def create_temporary_system_function(self, name: str,
+                                         function: UserDefinedFunctionWrapper):
+        """
+        Registers a python user defined function class as a temporary system 
function.
+
+        Compared to .. seealso:: :func:`create_temporary_function`, system 
functions are identified
+        by a global name that is independent of the current catalog and 
current database. Thus,
+        this method allows to extend the set of built-in system functions like 
TRIM, ABS, etc.
+
+        Temporary functions can shadow permanent ones. If a permanent function 
under a given name
+        exists, it will be inaccessible in the current session. To make the 
permanent function
+        available again one can drop the corresponding temporary system 
function.
+
+        Example:
+        ::
+
+            >>> table_env.create_temporary_system_function(
+            ...     "add_one", udf(lambda i: i + 1, DataTypes.BIGINT(), 
DataTypes.BIGINT()))
+
+            >>> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
+            ...      result_type=DataTypes.BIGINT())
+            ... def add(i, j):
+            ...     return i + j
+            >>> table_env.create_temporary_system_function("add", add)
+
+            >>> class SubtractOne(ScalarFunction):
+            ...     def eval(self, i):
+            ...         return i - 1
+            >>> table_env.create_temporary_system_function(
+            ...     "subtract_one", udf(SubtractOne(), DataTypes.BIGINT(), 
DataTypes.BIGINT()))
+
+        :param name: The name under which the function will be registered 
globally.
+        :type name: str
+        :param function: The function class containing the implementation. The 
function must have a
+                         public no-argument constructor and can be founded in 
current Java
+                         classloader.
+        :type function: pyflink.table.udf.UserDefinedFunctionWrapper
+
+        .. versionadded:: 1.12.0
+        """
+        java_function = function.java_user_defined_function()
+        self._j_tenv.createTemporarySystemFunction(name, java_function)
+
+    def drop_temporary_system_function(self, name: str):
+        """
+        Drops a temporary system function registered under the given name.
+
+        If a permanent function with the given name exists, it will be used 
from now on for any
+        queries that reference this name.
+
+        :param name: The name under which the function has been registered 
globally.
+        :type name: str
+        :return: true if a function existed under the given name and was 
removed.
+        :rtype: bool
+
+        .. versionadded:: 1.12.0
+        """
+        return self._j_tenv.dropTemporarySystemFunction(name)
+
+    def create_java_function(self, path: str, function_class_name: str,
+                             ignore_if_exists: bool = None):
+        """
+        Registers a java user defined function class as a catalog function in 
the given path.
+
+        Compared to system functions with a globally defined name, catalog 
functions are always
+        (implicitly or explicitly) identified by a catalog and database.
+
+        There must not be another function (temporary or permanent) registered 
under the same path.
+
+        Example:
+        ::
+
+            >>> table_env.create_java_function("func", 
"java.user.defined.function.class.name")
+
+        :param path: The path under which the function will be registered.
+                     See also the :class:`~pyflink.table.TableEnvironment` 
class description for
+                     the format of the path.
+        :type path: str
+        :param function_class_name: The java full qualified class name of the 
function class
+                                    containing the implementation. The 
function must have a
+                                    public no-argument constructor and can be 
founded in current
+                                    Java classloader.
+        :type function_class_name: str
+        :param ignore_if_exists: If a function exists under the given path and 
this flag is set,
+                                 no operation is executed. An exception is 
thrown otherwise.
+        :type ignore_if_exists: bool
+
+        .. versionadded:: 1.12.0
+        """
+        gateway = get_gateway()
+        java_function = 
gateway.jvm.Thread.currentThread().getContextClassLoader() \
+            .loadClass(function_class_name)
+        if ignore_if_exists is None:
+            self._j_tenv.createFunction(path, java_function)
+        else:
+            self._j_tenv.createFunction(path, java_function, ignore_if_exists)
+
+    def drop_function(self, path):
+        """
+        Drops a catalog function registered in the given path.
+
+        :param path: The path under which the function will be registered.
+                     See also the :class:`~pyflink.table.TableEnvironment` 
class description for
+                     the format of the path.
+        :type path: str
+        :return: true if a function existed in the given path and was removed.
+        :rtype: bool
+
+        .. versionadded:: 1.12.0
+        """
+        return self._j_tenv.dropFunction(path)
+
+    def create_java_temporary_function(self, path: str, function_class_name: 
str):
+        """
+        Registers a java user defined function class as a temporary catalog 
function.
+
+        Compared to .. seealso:: :func:`create_temporary_system_function` with 
a globally defined

Review comment:
       create_temporary_system_function -> create_java_temporary_system_function

##########
File path: flink-python/pyflink/table/table_environment.py
##########
@@ -166,11 +169,254 @@ def unload_module(self, module_name: str):
         ValidationException is thrown when there is no module with the given 
name.
 
         :param module_name: Name of the :class:`~pyflink.table.Module`.
+        :type module_name: str

Review comment:
       ditto

##########
File path: flink-python/pyflink/table/table_environment.py
##########
@@ -166,11 +169,254 @@ def unload_module(self, module_name: str):
         ValidationException is thrown when there is no module with the given 
name.
 
         :param module_name: Name of the :class:`~pyflink.table.Module`.
+        :type module_name: str
 
         .. versionadded:: 1.12.0
         """
         self._j_tenv.unloadModule(module_name)
 
+    def create_java_temporary_system_function(self, name: str, 
function_class_name: str):
+        """
+        Registers a java user defined function class as a temporary system 
function.
+
+        Compared to .. seealso:: :func:`create_temporary_function`, system 
functions are identified

Review comment:
       create_temporary_function -> create_java_temporary_function




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to