This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cc0116948d4 [FLINK-38508][python] Allow string type specifications in 
Python UDF (#27106)
cc0116948d4 is described below

commit cc0116948d42c85bb0d59e353228d10772193f7f
Author: Shengkai <[email protected]>
AuthorDate: Thu Oct 16 11:12:34 2025 +0800

    [FLINK-38508][python] Allow string type specifications in Python UDF 
(#27106)
---
 .../pyflink/table/tests/test_pandas_udaf.py        |  17 +
 flink-python/pyflink/table/tests/test_udf.py       | 365 +++++++++++----------
 flink-python/pyflink/table/tests/test_udtf.py      |  14 +
 flink-python/pyflink/table/udf.py                  |   4 -
 .../functions/python/PythonAggregateFunction.java  |   7 +-
 .../functions/python/PythonScalarFunction.java     |   7 +-
 .../python/PythonTableAggregateFunction.java       |   7 +-
 .../functions/python/PythonTableFunction.java      |   7 +-
 8 files changed, 234 insertions(+), 194 deletions(-)

diff --git a/flink-python/pyflink/table/tests/test_pandas_udaf.py 
b/flink-python/pyflink/table/tests/test_pandas_udaf.py
index 650efcd966c..09e17dd1fca 100644
--- a/flink-python/pyflink/table/tests/test_pandas_udaf.py
+++ b/flink-python/pyflink/table/tests/test_pandas_udaf.py
@@ -17,6 +17,7 @@
 
################################################################################
 import uuid
 
+from pyflink.common import Row
 from pyflink.table.expressions import col, call, lit, row_interval
 from pyflink.table.types import DataTypes
 from pyflink.table.udf import udaf, udf, AggregateFunction
@@ -39,6 +40,17 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase):
                                                                    
func_type="pandas"))
         cls.t_env.create_temporary_system_function("mean_udaf", mean_udaf)
 
+    def test_pandas_udaf_in_sql(self):
+        sql = f"""
+             CREATE TEMPORARY FUNCTION pymean AS
+             '{BatchPandasUDAFITTests.__module__}.mean_str_udaf'
+             LANGUAGE PYTHON
+            """
+        self.t_env.execute_sql(sql)
+        self.assert_equals(
+            list(self.t_env.execute_sql("SELECT pymean(1)").collect()),
+            [Row(1)])
+
     def test_check_result_type(self):
         def pandas_udaf():
             pass
@@ -861,6 +873,11 @@ def mean_udaf(v):
     return v.mean()
 
 
+@udaf(input_types=['FLOAT'], result_type='FLOAT', func_type="pandas")
+def mean_str_udaf(v):
+    return v.mean()
+
+
 class MaxAdd(AggregateFunction):
 
     def __init__(self):
diff --git a/flink-python/pyflink/table/tests/test_udf.py 
b/flink-python/pyflink/table/tests/test_udf.py
index 08b139a3f6f..4667702bbbd 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -697,6 +697,192 @@ class UserDefinedFunctionTests(object):
                             "{1=flink, 2=pyflink}, 
1000000000000000000.050000000000000000, "
                             "1000000000000000000.059999999999999999]"])
 
+    def test_using_udf_in_sql(self):
+        sql = f"""
+                  CREATE TEMPORARY FUNCTION echo AS
+                  '{UserDefinedFunctionTests.__module__}.echo'
+                  LANGUAGE PYTHON
+                 """
+        self.t_env.execute_sql(sql)
+        self.assert_equals(
+            list(self.t_env.execute_sql("SELECT echo(1)").collect()),
+            [Row(1)])
+
+    def test_all_data_types_string(self):
+        @udf(result_type='BOOLEAN')
+        def boolean_func(bool_param):
+            assert isinstance(bool_param, bool), 'bool_param of wrong type %s 
!' \
+                                                 % type(bool_param)
+            return bool_param
+
+        @udf(result_type='TINYINT')
+        def tinyint_func(tinyint_param):
+            assert isinstance(tinyint_param, int), 'tinyint_param of wrong 
type %s !' \
+                                                   % type(tinyint_param)
+            return tinyint_param
+
+        @udf(result_type='SMALLINT')
+        def smallint_func(smallint_param):
+            assert isinstance(smallint_param, int), 'smallint_param of wrong 
type %s !' \
+                                                    % type(smallint_param)
+            assert smallint_param == 32767, 'smallint_param of wrong value %s' 
% smallint_param
+            return smallint_param
+
+        @udf(result_type='INT')
+        def int_func(int_param):
+            assert isinstance(int_param, int), 'int_param of wrong type %s !' \
+                                               % type(int_param)
+            assert int_param == -2147483648, 'int_param of wrong value %s' % 
int_param
+            return int_param
+
+        @udf(result_type='BIGINT')
+        def bigint_func(bigint_param):
+            assert isinstance(bigint_param, int), 'bigint_param of wrong type 
%s !' \
+                                                  % type(bigint_param)
+            return bigint_param
+
+        @udf(result_type='BIGINT')
+        def bigint_func_none(bigint_param):
+            assert bigint_param is None, 'bigint_param %s should be None!' % 
bigint_param
+            return bigint_param
+
+        @udf(result_type='FLOAT')
+        def float_func(float_param):
+            assert isinstance(float_param, float) and float_equal(float_param, 
1.23, 1e-6), \
+                'float_param is wrong value %s !' % float_param
+            return float_param
+
+        @udf(result_type='DOUBLE')
+        def double_func(double_param):
+            assert isinstance(double_param, float) and 
float_equal(double_param, 1.98932, 1e-7), \
+                'double_param is wrong value %s !' % double_param
+            return double_param
+
+        @udf(result_type='BYTES')
+        def bytes_func(bytes_param):
+            assert bytes_param == b'flink', \
+                'bytes_param is wrong value %s !' % bytes_param
+            return bytes_param
+
+        @udf(result_type='STRING')
+        def str_func(str_param):
+            assert str_param == 'pyflink', \
+                'str_param is wrong value %s !' % str_param
+            return str_param
+
+        @udf(result_type='DATE')
+        def date_func(date_param):
+            from datetime import date
+            assert date_param == date(year=2014, month=9, day=13), \
+                'date_param is wrong value %s !' % date_param
+            return date_param
+
+        @udf(result_type='TIME')
+        def time_func(time_param):
+            from datetime import time
+            assert time_param == time(hour=12, minute=0, second=0, 
microsecond=123000), \
+                'time_param is wrong value %s !' % time_param
+            return time_param
+
+        @udf(result_type='TIMESTAMP(3)')
+        def timestamp_func(timestamp_param):
+            from datetime import datetime
+            assert timestamp_param == datetime(2018, 3, 11, 3, 0, 0, 123000), \
+                'timestamp_param is wrong value %s !' % timestamp_param
+            return timestamp_param
+
+        @udf(result_type='ARRAY<BIGINT>')
+        def array_func(array_param):
+            assert array_param == [[1, 2, 3]] or array_param == ((1, 2, 3),), \
+                'array_param is wrong value %s !' % array_param
+            return array_param[0]
+
+        @udf(result_type='MAP<BIGINT, STRING>')
+        def map_func(map_param):
+            assert map_param == {1: 'flink', 2: 'pyflink'}, \
+                'map_param is wrong value %s !' % map_param
+            return map_param
+
+        @udf(result_type='DECIMAL(38, 18)')
+        def decimal_func(decimal_param):
+            from decimal import Decimal
+            assert decimal_param == 
Decimal('1000000000000000000.050000000000000000'), \
+                'decimal_param is wrong value %s !' % decimal_param
+            return decimal_param
+
+        @udf(result_type='DECIMAL(38, 18)')
+        def decimal_cut_func(decimal_param):
+            from decimal import Decimal
+            assert decimal_param == 
Decimal('1000000000000000000.059999999999999999'), \
+                'decimal_param is wrong value %s !' % decimal_param
+            return decimal_param
+
+        sink_table = generate_random_table_name()
+        sink_table_ddl = f"""
+               CREATE TABLE {sink_table}(
+               a BIGINT, b BIGINT, c TINYINT, d BOOLEAN, e SMALLINT, f INT, g 
FLOAT, h DOUBLE,
+               i BYTES, j STRING, k DATE, l TIME, m TIMESTAMP(3), n 
ARRAY<BIGINT>,
+               o MAP<BIGINT, STRING>, p DECIMAL(38, 18), q DECIMAL(38, 18))
+               WITH ('connector'='test-sink')
+           """
+        self.t_env.execute_sql(sink_table_ddl)
+
+        import datetime
+        import decimal
+        t = self.t_env.from_elements(
+            [(1, None, 1, True, 32767, -2147483648, 1.23, 1.98932,
+              bytearray(b'flink'), 'pyflink', datetime.date(2014, 9, 13),
+              datetime.time(hour=12, minute=0, second=0, microsecond=123000),
+              datetime.datetime(2018, 3, 11, 3, 0, 0, 123000), [[1, 2, 3]],
+              {1: 'flink', 2: 'pyflink'}, 
decimal.Decimal('1000000000000000000.05'),
+              
decimal.Decimal('1000000000000000000.05999999999999999899999999999'))],
+            DataTypes.ROW(
+                [DataTypes.FIELD("a", DataTypes.BIGINT()),
+                 DataTypes.FIELD("b", DataTypes.BIGINT()),
+                 DataTypes.FIELD("c", DataTypes.TINYINT()),
+                 DataTypes.FIELD("d", DataTypes.BOOLEAN()),
+                 DataTypes.FIELD("e", DataTypes.SMALLINT()),
+                 DataTypes.FIELD("f", DataTypes.INT()),
+                 DataTypes.FIELD("g", DataTypes.FLOAT()),
+                 DataTypes.FIELD("h", DataTypes.DOUBLE()),
+                 DataTypes.FIELD("i", DataTypes.BYTES()),
+                 DataTypes.FIELD("j", DataTypes.STRING()),
+                 DataTypes.FIELD("k", DataTypes.DATE()),
+                 DataTypes.FIELD("l", DataTypes.TIME()),
+                 DataTypes.FIELD("m", DataTypes.TIMESTAMP(3)),
+                 DataTypes.FIELD("n", 
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.BIGINT()))),
+                 DataTypes.FIELD("o", DataTypes.MAP(DataTypes.BIGINT(), 
DataTypes.STRING())),
+                 DataTypes.FIELD("p", DataTypes.DECIMAL(38, 18)),
+                 DataTypes.FIELD("q", DataTypes.DECIMAL(38, 18))]))
+
+        t.select(
+            bigint_func(t.a),
+            bigint_func_none(t.b),
+            tinyint_func(t.c),
+            boolean_func(t.d),
+            smallint_func(t.e),
+            int_func(t.f),
+            float_func(t.g),
+            double_func(t.h),
+            bytes_func(t.i),
+            str_func(t.j),
+            date_func(t.k),
+            time_func(t.l),
+            timestamp_func(t.m),
+            array_func(t.n),
+            map_func(t.o),
+            decimal_func(t.p),
+            decimal_cut_func(t.q)) \
+            .execute_insert(sink_table).wait()
+        actual = source_sink_utils.results()
+        # Currently the sink result precision of DataTypes.TIME(precision) 
only supports 0.
+        self.assert_equals(actual,
+                           ["+I[1, null, 1, true, 32767, -2147483648, 1.23, 
1.98932, "
+                            "[102, 108, 105, 110, 107], pyflink, 2014-09-13, 
12:00:00.123, "
+                            "2018-03-11T03:00:00.123, [1, 2, 3], "
+                            "{1=flink, 2=pyflink}, 
1000000000000000000.050000000000000000, "
+                            "1000000000000000000.059999999999999999]"])
+
     def test_create_and_drop_function(self):
         t_env = self.t_env
 
@@ -912,180 +1098,6 @@ class 
PyFlinkEmbeddedThreadTests(UserDefinedFunctionTests, PyFlinkBatchTableTest
         super(PyFlinkEmbeddedThreadTests, self).setUp()
         self.t_env.get_config().set("python.execution-mode", "thread")
 
-    def test_all_data_types_string(self):
-        @udf(result_type='BOOLEAN')
-        def boolean_func(bool_param):
-            assert isinstance(bool_param, bool), 'bool_param of wrong type %s 
!' \
-                                                 % type(bool_param)
-            return bool_param
-
-        @udf(result_type='TINYINT')
-        def tinyint_func(tinyint_param):
-            assert isinstance(tinyint_param, int), 'tinyint_param of wrong 
type %s !' \
-                                                   % type(tinyint_param)
-            return tinyint_param
-
-        @udf(result_type='SMALLINT')
-        def smallint_func(smallint_param):
-            assert isinstance(smallint_param, int), 'smallint_param of wrong 
type %s !' \
-                                                    % type(smallint_param)
-            assert smallint_param == 32767, 'smallint_param of wrong value %s' 
% smallint_param
-            return smallint_param
-
-        @udf(result_type='INT')
-        def int_func(int_param):
-            assert isinstance(int_param, int), 'int_param of wrong type %s !' \
-                                               % type(int_param)
-            assert int_param == -2147483648, 'int_param of wrong value %s' % 
int_param
-            return int_param
-
-        @udf(result_type='BIGINT')
-        def bigint_func(bigint_param):
-            assert isinstance(bigint_param, int), 'bigint_param of wrong type 
%s !' \
-                                                  % type(bigint_param)
-            return bigint_param
-
-        @udf(result_type='BIGINT')
-        def bigint_func_none(bigint_param):
-            assert bigint_param is None, 'bigint_param %s should be None!' % 
bigint_param
-            return bigint_param
-
-        @udf(result_type='FLOAT')
-        def float_func(float_param):
-            assert isinstance(float_param, float) and float_equal(float_param, 
1.23, 1e-6), \
-                'float_param is wrong value %s !' % float_param
-            return float_param
-
-        @udf(result_type='DOUBLE')
-        def double_func(double_param):
-            assert isinstance(double_param, float) and 
float_equal(double_param, 1.98932, 1e-7), \
-                'double_param is wrong value %s !' % double_param
-            return double_param
-
-        @udf(result_type='BYTES')
-        def bytes_func(bytes_param):
-            assert bytes_param == b'flink', \
-                'bytes_param is wrong value %s !' % bytes_param
-            return bytes_param
-
-        @udf(result_type='STRING')
-        def str_func(str_param):
-            assert str_param == 'pyflink', \
-                'str_param is wrong value %s !' % str_param
-            return str_param
-
-        @udf(result_type='DATE')
-        def date_func(date_param):
-            from datetime import date
-            assert date_param == date(year=2014, month=9, day=13), \
-                'date_param is wrong value %s !' % date_param
-            return date_param
-
-        @udf(result_type='TIME')
-        def time_func(time_param):
-            from datetime import time
-            assert time_param == time(hour=12, minute=0, second=0, 
microsecond=123000), \
-                'time_param is wrong value %s !' % time_param
-            return time_param
-
-        @udf(result_type='TIMESTAMP(3)')
-        def timestamp_func(timestamp_param):
-            from datetime import datetime
-            assert timestamp_param == datetime(2018, 3, 11, 3, 0, 0, 123000), \
-                'timestamp_param is wrong value %s !' % timestamp_param
-            return timestamp_param
-
-        @udf(result_type='ARRAY<BIGINT>')
-        def array_func(array_param):
-            assert array_param == [[1, 2, 3]] or array_param == ((1, 2, 3),), \
-                'array_param is wrong value %s !' % array_param
-            return array_param[0]
-
-        @udf(result_type='MAP<BIGINT, STRING>')
-        def map_func(map_param):
-            assert map_param == {1: 'flink', 2: 'pyflink'}, \
-                'map_param is wrong value %s !' % map_param
-            return map_param
-
-        @udf(result_type='DECIMAL(38, 18)')
-        def decimal_func(decimal_param):
-            from decimal import Decimal
-            assert decimal_param == 
Decimal('1000000000000000000.050000000000000000'), \
-                'decimal_param is wrong value %s !' % decimal_param
-            return decimal_param
-
-        @udf(result_type='DECIMAL(38, 18)')
-        def decimal_cut_func(decimal_param):
-            from decimal import Decimal
-            assert decimal_param == 
Decimal('1000000000000000000.059999999999999999'), \
-                'decimal_param is wrong value %s !' % decimal_param
-            return decimal_param
-
-        sink_table = generate_random_table_name()
-        sink_table_ddl = f"""
-            CREATE TABLE {sink_table}(
-            a BIGINT, b BIGINT, c TINYINT, d BOOLEAN, e SMALLINT, f INT, g 
FLOAT, h DOUBLE, i BYTES,
-            j STRING, k DATE, l TIME, m TIMESTAMP(3), n ARRAY<BIGINT>, o 
MAP<BIGINT, STRING>,
-            p DECIMAL(38, 18), q DECIMAL(38, 18)) WITH 
('connector'='test-sink')
-        """
-        self.t_env.execute_sql(sink_table_ddl)
-
-        import datetime
-        import decimal
-        t = self.t_env.from_elements(
-            [(1, None, 1, True, 32767, -2147483648, 1.23, 1.98932,
-              bytearray(b'flink'), 'pyflink', datetime.date(2014, 9, 13),
-              datetime.time(hour=12, minute=0, second=0, microsecond=123000),
-              datetime.datetime(2018, 3, 11, 3, 0, 0, 123000), [[1, 2, 3]],
-              {1: 'flink', 2: 'pyflink'}, 
decimal.Decimal('1000000000000000000.05'),
-              
decimal.Decimal('1000000000000000000.05999999999999999899999999999'))],
-            DataTypes.ROW(
-                [DataTypes.FIELD("a", DataTypes.BIGINT()),
-                 DataTypes.FIELD("b", DataTypes.BIGINT()),
-                 DataTypes.FIELD("c", DataTypes.TINYINT()),
-                 DataTypes.FIELD("d", DataTypes.BOOLEAN()),
-                 DataTypes.FIELD("e", DataTypes.SMALLINT()),
-                 DataTypes.FIELD("f", DataTypes.INT()),
-                 DataTypes.FIELD("g", DataTypes.FLOAT()),
-                 DataTypes.FIELD("h", DataTypes.DOUBLE()),
-                 DataTypes.FIELD("i", DataTypes.BYTES()),
-                 DataTypes.FIELD("j", DataTypes.STRING()),
-                 DataTypes.FIELD("k", DataTypes.DATE()),
-                 DataTypes.FIELD("l", DataTypes.TIME()),
-                 DataTypes.FIELD("m", DataTypes.TIMESTAMP(3)),
-                 DataTypes.FIELD("n", 
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.BIGINT()))),
-                 DataTypes.FIELD("o", DataTypes.MAP(DataTypes.BIGINT(), 
DataTypes.STRING())),
-                 DataTypes.FIELD("p", DataTypes.DECIMAL(38, 18)),
-                 DataTypes.FIELD("q", DataTypes.DECIMAL(38, 18))]))
-
-        t.select(
-            bigint_func(t.a),
-            bigint_func_none(t.b),
-            tinyint_func(t.c),
-            boolean_func(t.d),
-            smallint_func(t.e),
-            int_func(t.f),
-            float_func(t.g),
-            double_func(t.h),
-            bytes_func(t.i),
-            str_func(t.j),
-            date_func(t.k),
-            time_func(t.l),
-            timestamp_func(t.m),
-            array_func(t.n),
-            map_func(t.o),
-            decimal_func(t.p),
-            decimal_cut_func(t.q)) \
-            .execute_insert(sink_table).wait()
-        actual = source_sink_utils.results()
-        # Currently the sink result precision of DataTypes.TIME(precision) 
only supports 0.
-        self.assert_equals(actual,
-                           ["+I[1, null, 1, true, 32767, -2147483648, 1.23, 
1.98932, "
-                            "[102, 108, 105, 110, 107], pyflink, 2014-09-13, 
12:00:00.123, "
-                            "2018-03-11T03:00:00.123, [1, 2, 3], "
-                            "{1=flink, 2=pyflink}, 
1000000000000000000.050000000000000000, "
-                            "1000000000000000000.059999999999999999]"])
-
 
 # test specify the input_types
 @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], 
result_type=DataTypes.BIGINT())
@@ -1141,6 +1153,11 @@ class CallablePlus(object):
         return col + 1
 
 
+@udf(input_types=['BIGINT'], result_type='BIGINT')
+def echo(i: str):
+    return i
+
+
 if __name__ == '__main__':
     import unittest
 
diff --git a/flink-python/pyflink/table/tests/test_udtf.py 
b/flink-python/pyflink/table/tests/test_udtf.py
index 3055a9edacf..bf3ca4fb5c5 100644
--- a/flink-python/pyflink/table/tests/test_udtf.py
+++ b/flink-python/pyflink/table/tests/test_udtf.py
@@ -17,6 +17,7 @@
 
################################################################################
 import unittest
 
+from pyflink.common import Row
 from pyflink.table import DataTypes
 from pyflink.table.udf import TableFunction, udtf, ScalarFunction, udf
 from pyflink.table.expressions import col
@@ -71,6 +72,19 @@ class UserDefinedTableFunctionTests(object):
         actual = source_sink_utils.results()
         self.assert_equals(actual, ["+I[1, 1, 0]", "+I[2, 2, 0]", "+I[3, 3, 
0]", "+I[3, 3, 1]"])
 
+    def test_table_function_in_sql(self):
+        sql = f"""
+                   CREATE TEMPORARY FUNCTION pyfunc AS
+                   '{UserDefinedTableFunctionTests.__module__}.identity'
+                   LANGUAGE PYTHON
+                  """
+        self.t_env.execute_sql(sql)
+        self.assert_equals(
+            list(self.t_env.execute_sql(
+                "SELECT v FROM (VALUES (1)) AS T(id), LATERAL 
TABLE(pyfunc(id)) AS P(v)"
+            ).collect()),
+            [Row(1)])
+
 
 class PyFlinkStreamUserDefinedFunctionTests(UserDefinedTableFunctionTests,
                                             PyFlinkStreamTableTestCase):
diff --git a/flink-python/pyflink/table/udf.py 
b/flink-python/pyflink/table/udf.py
index 5b63cda8e94..c32d8ec9490 100644
--- a/flink-python/pyflink/table/udf.py
+++ b/flink-python/pyflink/table/udf.py
@@ -573,10 +573,6 @@ class 
UserDefinedAggregateFunctionWrapper(UserDefinedFunctionWrapper):
             else:
                 self._accumulator_type = 'ARRAY<{0}>'.format(self._result_type)
 
-        if j_input_types is not None:
-            gateway = get_gateway()
-            j_input_types = java_utils.to_jarray(
-                gateway.jvm.DataType, [_to_java_data_type(i) for i in 
self._input_types])
         if isinstance(self._result_type, DataType):
             j_result_type = _to_java_data_type(self._result_type)
         else:
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonAggregateFunction.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonAggregateFunction.java
index c518c3ddcbd..d1bf6d61122 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonAggregateFunction.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonAggregateFunction.java
@@ -172,10 +172,9 @@ public class PythonAggregateFunction extends 
AggregateFunction implements Python
 
         if (inputTypesString != null) {
             inputTypes =
-                    (DataType[])
-                            Arrays.stream(inputTypesString)
-                                    .map(typeFactory::createDataType)
-                                    .toArray();
+                    Arrays.stream(inputTypesString)
+                            .map(typeFactory::createDataType)
+                            .toArray(DataType[]::new);
         }
 
         if (inputTypes != null) {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java
index fe3c1fc9f75..9d14db265f9 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java
@@ -159,10 +159,9 @@ public class PythonScalarFunction extends ScalarFunction 
implements PythonFuncti
 
         if (inputTypesString != null) {
             inputTypes =
-                    (DataType[])
-                            Arrays.stream(inputTypesString)
-                                    .map(typeFactory::createDataType)
-                                    .toArray();
+                    Arrays.stream(inputTypesString)
+                            .map(typeFactory::createDataType)
+                            .toArray(DataType[]::new);
         }
 
         if (inputTypes != null) {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableAggregateFunction.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableAggregateFunction.java
index 3d29d20ecb0..10083679294 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableAggregateFunction.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableAggregateFunction.java
@@ -171,10 +171,9 @@ public class PythonTableAggregateFunction extends 
TableAggregateFunction impleme
         TypeInference.Builder builder = TypeInference.newBuilder();
         if (inputTypesString != null) {
             inputTypes =
-                    (DataType[])
-                            Arrays.stream(inputTypesString)
-                                    .map(typeFactory::createDataType)
-                                    .toArray();
+                    Arrays.stream(inputTypesString)
+                            .map(typeFactory::createDataType)
+                            .toArray(DataType[]::new);
         }
 
         if (inputTypes != null) {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
index 19da281bdb9..8d6ac79d813 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
@@ -160,10 +160,9 @@ public class PythonTableFunction extends 
TableFunction<Row> implements PythonFun
 
         if (inputTypesString != null) {
             inputTypes =
-                    (DataType[])
-                            Arrays.stream(inputTypesString)
-                                    .map(typeFactory::createDataType)
-                                    .toArray();
+                    Arrays.stream(inputTypesString)
+                            .map(typeFactory::createDataType)
+                            .toArray(DataType[]::new);
         }
 
         if (inputTypes != null) {

Reply via email to