This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new cc0116948d4 [FLINK-38508][python] Allow string type specifications in
Python UDF (#27106)
cc0116948d4 is described below
commit cc0116948d42c85bb0d59e353228d10772193f7f
Author: Shengkai <[email protected]>
AuthorDate: Thu Oct 16 11:12:34 2025 +0800
[FLINK-38508][python] Allow string type specifications in Python UDF
(#27106)
---
.../pyflink/table/tests/test_pandas_udaf.py | 17 +
flink-python/pyflink/table/tests/test_udf.py | 365 +++++++++++----------
flink-python/pyflink/table/tests/test_udtf.py | 14 +
flink-python/pyflink/table/udf.py | 4 -
.../functions/python/PythonAggregateFunction.java | 7 +-
.../functions/python/PythonScalarFunction.java | 7 +-
.../python/PythonTableAggregateFunction.java | 7 +-
.../functions/python/PythonTableFunction.java | 7 +-
8 files changed, 234 insertions(+), 194 deletions(-)
diff --git a/flink-python/pyflink/table/tests/test_pandas_udaf.py
b/flink-python/pyflink/table/tests/test_pandas_udaf.py
index 650efcd966c..09e17dd1fca 100644
--- a/flink-python/pyflink/table/tests/test_pandas_udaf.py
+++ b/flink-python/pyflink/table/tests/test_pandas_udaf.py
@@ -17,6 +17,7 @@
################################################################################
import uuid
+from pyflink.common import Row
from pyflink.table.expressions import col, call, lit, row_interval
from pyflink.table.types import DataTypes
from pyflink.table.udf import udaf, udf, AggregateFunction
@@ -39,6 +40,17 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase):
func_type="pandas"))
cls.t_env.create_temporary_system_function("mean_udaf", mean_udaf)
+ def test_pandas_udaf_in_sql(self):
+ sql = f"""
+ CREATE TEMPORARY FUNCTION pymean AS
+ '{BatchPandasUDAFITTests.__module__}.mean_str_udaf'
+ LANGUAGE PYTHON
+ """
+ self.t_env.execute_sql(sql)
+ self.assert_equals(
+ list(self.t_env.execute_sql("SELECT pymean(1)").collect()),
+ [Row(1)])
+
def test_check_result_type(self):
def pandas_udaf():
pass
@@ -861,6 +873,11 @@ def mean_udaf(v):
return v.mean()
+@udaf(input_types=['FLOAT'], result_type='FLOAT', func_type="pandas")
+def mean_str_udaf(v):
+ return v.mean()
+
+
class MaxAdd(AggregateFunction):
def __init__(self):
diff --git a/flink-python/pyflink/table/tests/test_udf.py
b/flink-python/pyflink/table/tests/test_udf.py
index 08b139a3f6f..4667702bbbd 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -697,6 +697,192 @@ class UserDefinedFunctionTests(object):
"{1=flink, 2=pyflink},
1000000000000000000.050000000000000000, "
"1000000000000000000.059999999999999999]"])
+ def test_using_udf_in_sql(self):
+ sql = f"""
+ CREATE TEMPORARY FUNCTION echo AS
+ '{UserDefinedFunctionTests.__module__}.echo'
+ LANGUAGE PYTHON
+ """
+ self.t_env.execute_sql(sql)
+ self.assert_equals(
+ list(self.t_env.execute_sql("SELECT echo(1)").collect()),
+ [Row(1)])
+
+ def test_all_data_types_string(self):
+ @udf(result_type='BOOLEAN')
+ def boolean_func(bool_param):
+ assert isinstance(bool_param, bool), 'bool_param of wrong type %s
!' \
+ % type(bool_param)
+ return bool_param
+
+ @udf(result_type='TINYINT')
+ def tinyint_func(tinyint_param):
+ assert isinstance(tinyint_param, int), 'tinyint_param of wrong
type %s !' \
+ % type(tinyint_param)
+ return tinyint_param
+
+ @udf(result_type='SMALLINT')
+ def smallint_func(smallint_param):
+ assert isinstance(smallint_param, int), 'smallint_param of wrong
type %s !' \
+ % type(smallint_param)
+ assert smallint_param == 32767, 'smallint_param of wrong value %s'
% smallint_param
+ return smallint_param
+
+ @udf(result_type='INT')
+ def int_func(int_param):
+ assert isinstance(int_param, int), 'int_param of wrong type %s !' \
+ % type(int_param)
+ assert int_param == -2147483648, 'int_param of wrong value %s' %
int_param
+ return int_param
+
+ @udf(result_type='BIGINT')
+ def bigint_func(bigint_param):
+ assert isinstance(bigint_param, int), 'bigint_param of wrong type
%s !' \
+ % type(bigint_param)
+ return bigint_param
+
+ @udf(result_type='BIGINT')
+ def bigint_func_none(bigint_param):
+ assert bigint_param is None, 'bigint_param %s should be None!' %
bigint_param
+ return bigint_param
+
+ @udf(result_type='FLOAT')
+ def float_func(float_param):
+ assert isinstance(float_param, float) and float_equal(float_param,
1.23, 1e-6), \
+ 'float_param is wrong value %s !' % float_param
+ return float_param
+
+ @udf(result_type='DOUBLE')
+ def double_func(double_param):
+ assert isinstance(double_param, float) and
float_equal(double_param, 1.98932, 1e-7), \
+ 'double_param is wrong value %s !' % double_param
+ return double_param
+
+ @udf(result_type='BYTES')
+ def bytes_func(bytes_param):
+ assert bytes_param == b'flink', \
+ 'bytes_param is wrong value %s !' % bytes_param
+ return bytes_param
+
+ @udf(result_type='STRING')
+ def str_func(str_param):
+ assert str_param == 'pyflink', \
+ 'str_param is wrong value %s !' % str_param
+ return str_param
+
+ @udf(result_type='DATE')
+ def date_func(date_param):
+ from datetime import date
+ assert date_param == date(year=2014, month=9, day=13), \
+ 'date_param is wrong value %s !' % date_param
+ return date_param
+
+ @udf(result_type='TIME')
+ def time_func(time_param):
+ from datetime import time
+ assert time_param == time(hour=12, minute=0, second=0,
microsecond=123000), \
+ 'time_param is wrong value %s !' % time_param
+ return time_param
+
+ @udf(result_type='TIMESTAMP(3)')
+ def timestamp_func(timestamp_param):
+ from datetime import datetime
+ assert timestamp_param == datetime(2018, 3, 11, 3, 0, 0, 123000), \
+ 'timestamp_param is wrong value %s !' % timestamp_param
+ return timestamp_param
+
+ @udf(result_type='ARRAY<BIGINT>')
+ def array_func(array_param):
+ assert array_param == [[1, 2, 3]] or array_param == ((1, 2, 3),), \
+ 'array_param is wrong value %s !' % array_param
+ return array_param[0]
+
+ @udf(result_type='MAP<BIGINT, STRING>')
+ def map_func(map_param):
+ assert map_param == {1: 'flink', 2: 'pyflink'}, \
+ 'map_param is wrong value %s !' % map_param
+ return map_param
+
+ @udf(result_type='DECIMAL(38, 18)')
+ def decimal_func(decimal_param):
+ from decimal import Decimal
+ assert decimal_param ==
Decimal('1000000000000000000.050000000000000000'), \
+ 'decimal_param is wrong value %s !' % decimal_param
+ return decimal_param
+
+ @udf(result_type='DECIMAL(38, 18)')
+ def decimal_cut_func(decimal_param):
+ from decimal import Decimal
+ assert decimal_param ==
Decimal('1000000000000000000.059999999999999999'), \
+ 'decimal_param is wrong value %s !' % decimal_param
+ return decimal_param
+
+ sink_table = generate_random_table_name()
+ sink_table_ddl = f"""
+ CREATE TABLE {sink_table}(
+ a BIGINT, b BIGINT, c TINYINT, d BOOLEAN, e SMALLINT, f INT, g
FLOAT, h DOUBLE,
+ i BYTES, j STRING, k DATE, l TIME, m TIMESTAMP(3), n
ARRAY<BIGINT>,
+ o MAP<BIGINT, STRING>, p DECIMAL(38, 18), q DECIMAL(38, 18))
+ WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
+
+ import datetime
+ import decimal
+ t = self.t_env.from_elements(
+ [(1, None, 1, True, 32767, -2147483648, 1.23, 1.98932,
+ bytearray(b'flink'), 'pyflink', datetime.date(2014, 9, 13),
+ datetime.time(hour=12, minute=0, second=0, microsecond=123000),
+ datetime.datetime(2018, 3, 11, 3, 0, 0, 123000), [[1, 2, 3]],
+ {1: 'flink', 2: 'pyflink'},
decimal.Decimal('1000000000000000000.05'),
+
decimal.Decimal('1000000000000000000.05999999999999999899999999999'))],
+ DataTypes.ROW(
+ [DataTypes.FIELD("a", DataTypes.BIGINT()),
+ DataTypes.FIELD("b", DataTypes.BIGINT()),
+ DataTypes.FIELD("c", DataTypes.TINYINT()),
+ DataTypes.FIELD("d", DataTypes.BOOLEAN()),
+ DataTypes.FIELD("e", DataTypes.SMALLINT()),
+ DataTypes.FIELD("f", DataTypes.INT()),
+ DataTypes.FIELD("g", DataTypes.FLOAT()),
+ DataTypes.FIELD("h", DataTypes.DOUBLE()),
+ DataTypes.FIELD("i", DataTypes.BYTES()),
+ DataTypes.FIELD("j", DataTypes.STRING()),
+ DataTypes.FIELD("k", DataTypes.DATE()),
+ DataTypes.FIELD("l", DataTypes.TIME()),
+ DataTypes.FIELD("m", DataTypes.TIMESTAMP(3)),
+ DataTypes.FIELD("n",
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.BIGINT()))),
+ DataTypes.FIELD("o", DataTypes.MAP(DataTypes.BIGINT(),
DataTypes.STRING())),
+ DataTypes.FIELD("p", DataTypes.DECIMAL(38, 18)),
+ DataTypes.FIELD("q", DataTypes.DECIMAL(38, 18))]))
+
+ t.select(
+ bigint_func(t.a),
+ bigint_func_none(t.b),
+ tinyint_func(t.c),
+ boolean_func(t.d),
+ smallint_func(t.e),
+ int_func(t.f),
+ float_func(t.g),
+ double_func(t.h),
+ bytes_func(t.i),
+ str_func(t.j),
+ date_func(t.k),
+ time_func(t.l),
+ timestamp_func(t.m),
+ array_func(t.n),
+ map_func(t.o),
+ decimal_func(t.p),
+ decimal_cut_func(t.q)) \
+ .execute_insert(sink_table).wait()
+ actual = source_sink_utils.results()
+ # Currently the sink result precision of DataTypes.TIME(precision)
only supports 0.
+ self.assert_equals(actual,
+ ["+I[1, null, 1, true, 32767, -2147483648, 1.23,
1.98932, "
+ "[102, 108, 105, 110, 107], pyflink, 2014-09-13,
12:00:00.123, "
+ "2018-03-11T03:00:00.123, [1, 2, 3], "
+ "{1=flink, 2=pyflink},
1000000000000000000.050000000000000000, "
+ "1000000000000000000.059999999999999999]"])
+
def test_create_and_drop_function(self):
t_env = self.t_env
@@ -912,180 +1098,6 @@ class
PyFlinkEmbeddedThreadTests(UserDefinedFunctionTests, PyFlinkBatchTableTest
super(PyFlinkEmbeddedThreadTests, self).setUp()
self.t_env.get_config().set("python.execution-mode", "thread")
- def test_all_data_types_string(self):
- @udf(result_type='BOOLEAN')
- def boolean_func(bool_param):
- assert isinstance(bool_param, bool), 'bool_param of wrong type %s
!' \
- % type(bool_param)
- return bool_param
-
- @udf(result_type='TINYINT')
- def tinyint_func(tinyint_param):
- assert isinstance(tinyint_param, int), 'tinyint_param of wrong
type %s !' \
- % type(tinyint_param)
- return tinyint_param
-
- @udf(result_type='SMALLINT')
- def smallint_func(smallint_param):
- assert isinstance(smallint_param, int), 'smallint_param of wrong
type %s !' \
- % type(smallint_param)
- assert smallint_param == 32767, 'smallint_param of wrong value %s'
% smallint_param
- return smallint_param
-
- @udf(result_type='INT')
- def int_func(int_param):
- assert isinstance(int_param, int), 'int_param of wrong type %s !' \
- % type(int_param)
- assert int_param == -2147483648, 'int_param of wrong value %s' %
int_param
- return int_param
-
- @udf(result_type='BIGINT')
- def bigint_func(bigint_param):
- assert isinstance(bigint_param, int), 'bigint_param of wrong type
%s !' \
- % type(bigint_param)
- return bigint_param
-
- @udf(result_type='BIGINT')
- def bigint_func_none(bigint_param):
- assert bigint_param is None, 'bigint_param %s should be None!' %
bigint_param
- return bigint_param
-
- @udf(result_type='FLOAT')
- def float_func(float_param):
- assert isinstance(float_param, float) and float_equal(float_param,
1.23, 1e-6), \
- 'float_param is wrong value %s !' % float_param
- return float_param
-
- @udf(result_type='DOUBLE')
- def double_func(double_param):
- assert isinstance(double_param, float) and
float_equal(double_param, 1.98932, 1e-7), \
- 'double_param is wrong value %s !' % double_param
- return double_param
-
- @udf(result_type='BYTES')
- def bytes_func(bytes_param):
- assert bytes_param == b'flink', \
- 'bytes_param is wrong value %s !' % bytes_param
- return bytes_param
-
- @udf(result_type='STRING')
- def str_func(str_param):
- assert str_param == 'pyflink', \
- 'str_param is wrong value %s !' % str_param
- return str_param
-
- @udf(result_type='DATE')
- def date_func(date_param):
- from datetime import date
- assert date_param == date(year=2014, month=9, day=13), \
- 'date_param is wrong value %s !' % date_param
- return date_param
-
- @udf(result_type='TIME')
- def time_func(time_param):
- from datetime import time
- assert time_param == time(hour=12, minute=0, second=0,
microsecond=123000), \
- 'time_param is wrong value %s !' % time_param
- return time_param
-
- @udf(result_type='TIMESTAMP(3)')
- def timestamp_func(timestamp_param):
- from datetime import datetime
- assert timestamp_param == datetime(2018, 3, 11, 3, 0, 0, 123000), \
- 'timestamp_param is wrong value %s !' % timestamp_param
- return timestamp_param
-
- @udf(result_type='ARRAY<BIGINT>')
- def array_func(array_param):
- assert array_param == [[1, 2, 3]] or array_param == ((1, 2, 3),), \
- 'array_param is wrong value %s !' % array_param
- return array_param[0]
-
- @udf(result_type='MAP<BIGINT, STRING>')
- def map_func(map_param):
- assert map_param == {1: 'flink', 2: 'pyflink'}, \
- 'map_param is wrong value %s !' % map_param
- return map_param
-
- @udf(result_type='DECIMAL(38, 18)')
- def decimal_func(decimal_param):
- from decimal import Decimal
- assert decimal_param ==
Decimal('1000000000000000000.050000000000000000'), \
- 'decimal_param is wrong value %s !' % decimal_param
- return decimal_param
-
- @udf(result_type='DECIMAL(38, 18)')
- def decimal_cut_func(decimal_param):
- from decimal import Decimal
- assert decimal_param ==
Decimal('1000000000000000000.059999999999999999'), \
- 'decimal_param is wrong value %s !' % decimal_param
- return decimal_param
-
- sink_table = generate_random_table_name()
- sink_table_ddl = f"""
- CREATE TABLE {sink_table}(
- a BIGINT, b BIGINT, c TINYINT, d BOOLEAN, e SMALLINT, f INT, g
FLOAT, h DOUBLE, i BYTES,
- j STRING, k DATE, l TIME, m TIMESTAMP(3), n ARRAY<BIGINT>, o
MAP<BIGINT, STRING>,
- p DECIMAL(38, 18), q DECIMAL(38, 18)) WITH
('connector'='test-sink')
- """
- self.t_env.execute_sql(sink_table_ddl)
-
- import datetime
- import decimal
- t = self.t_env.from_elements(
- [(1, None, 1, True, 32767, -2147483648, 1.23, 1.98932,
- bytearray(b'flink'), 'pyflink', datetime.date(2014, 9, 13),
- datetime.time(hour=12, minute=0, second=0, microsecond=123000),
- datetime.datetime(2018, 3, 11, 3, 0, 0, 123000), [[1, 2, 3]],
- {1: 'flink', 2: 'pyflink'},
decimal.Decimal('1000000000000000000.05'),
-
decimal.Decimal('1000000000000000000.05999999999999999899999999999'))],
- DataTypes.ROW(
- [DataTypes.FIELD("a", DataTypes.BIGINT()),
- DataTypes.FIELD("b", DataTypes.BIGINT()),
- DataTypes.FIELD("c", DataTypes.TINYINT()),
- DataTypes.FIELD("d", DataTypes.BOOLEAN()),
- DataTypes.FIELD("e", DataTypes.SMALLINT()),
- DataTypes.FIELD("f", DataTypes.INT()),
- DataTypes.FIELD("g", DataTypes.FLOAT()),
- DataTypes.FIELD("h", DataTypes.DOUBLE()),
- DataTypes.FIELD("i", DataTypes.BYTES()),
- DataTypes.FIELD("j", DataTypes.STRING()),
- DataTypes.FIELD("k", DataTypes.DATE()),
- DataTypes.FIELD("l", DataTypes.TIME()),
- DataTypes.FIELD("m", DataTypes.TIMESTAMP(3)),
- DataTypes.FIELD("n",
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.BIGINT()))),
- DataTypes.FIELD("o", DataTypes.MAP(DataTypes.BIGINT(),
DataTypes.STRING())),
- DataTypes.FIELD("p", DataTypes.DECIMAL(38, 18)),
- DataTypes.FIELD("q", DataTypes.DECIMAL(38, 18))]))
-
- t.select(
- bigint_func(t.a),
- bigint_func_none(t.b),
- tinyint_func(t.c),
- boolean_func(t.d),
- smallint_func(t.e),
- int_func(t.f),
- float_func(t.g),
- double_func(t.h),
- bytes_func(t.i),
- str_func(t.j),
- date_func(t.k),
- time_func(t.l),
- timestamp_func(t.m),
- array_func(t.n),
- map_func(t.o),
- decimal_func(t.p),
- decimal_cut_func(t.q)) \
- .execute_insert(sink_table).wait()
- actual = source_sink_utils.results()
- # Currently the sink result precision of DataTypes.TIME(precision)
only supports 0.
- self.assert_equals(actual,
- ["+I[1, null, 1, true, 32767, -2147483648, 1.23,
1.98932, "
- "[102, 108, 105, 110, 107], pyflink, 2014-09-13,
12:00:00.123, "
- "2018-03-11T03:00:00.123, [1, 2, 3], "
- "{1=flink, 2=pyflink},
1000000000000000000.050000000000000000, "
- "1000000000000000000.059999999999999999]"])
-
# test specify the input_types
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
result_type=DataTypes.BIGINT())
@@ -1141,6 +1153,11 @@ class CallablePlus(object):
return col + 1
+@udf(input_types=['BIGINT'], result_type='BIGINT')
+def echo(i: str):
+ return i
+
+
if __name__ == '__main__':
import unittest
diff --git a/flink-python/pyflink/table/tests/test_udtf.py
b/flink-python/pyflink/table/tests/test_udtf.py
index 3055a9edacf..bf3ca4fb5c5 100644
--- a/flink-python/pyflink/table/tests/test_udtf.py
+++ b/flink-python/pyflink/table/tests/test_udtf.py
@@ -17,6 +17,7 @@
################################################################################
import unittest
+from pyflink.common import Row
from pyflink.table import DataTypes
from pyflink.table.udf import TableFunction, udtf, ScalarFunction, udf
from pyflink.table.expressions import col
@@ -71,6 +72,19 @@ class UserDefinedTableFunctionTests(object):
actual = source_sink_utils.results()
self.assert_equals(actual, ["+I[1, 1, 0]", "+I[2, 2, 0]", "+I[3, 3,
0]", "+I[3, 3, 1]"])
+ def test_table_function_in_sql(self):
+ sql = f"""
+ CREATE TEMPORARY FUNCTION pyfunc AS
+ '{UserDefinedTableFunctionTests.__module__}.identity'
+ LANGUAGE PYTHON
+ """
+ self.t_env.execute_sql(sql)
+ self.assert_equals(
+ list(self.t_env.execute_sql(
+ "SELECT v FROM (VALUES (1)) AS T(id), LATERAL
TABLE(pyfunc(id)) AS P(v)"
+ ).collect()),
+ [Row(1)])
+
class PyFlinkStreamUserDefinedFunctionTests(UserDefinedTableFunctionTests,
PyFlinkStreamTableTestCase):
diff --git a/flink-python/pyflink/table/udf.py
b/flink-python/pyflink/table/udf.py
index 5b63cda8e94..c32d8ec9490 100644
--- a/flink-python/pyflink/table/udf.py
+++ b/flink-python/pyflink/table/udf.py
@@ -573,10 +573,6 @@ class
UserDefinedAggregateFunctionWrapper(UserDefinedFunctionWrapper):
else:
self._accumulator_type = 'ARRAY<{0}>'.format(self._result_type)
- if j_input_types is not None:
- gateway = get_gateway()
- j_input_types = java_utils.to_jarray(
- gateway.jvm.DataType, [_to_java_data_type(i) for i in
self._input_types])
if isinstance(self._result_type, DataType):
j_result_type = _to_java_data_type(self._result_type)
else:
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonAggregateFunction.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonAggregateFunction.java
index c518c3ddcbd..d1bf6d61122 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonAggregateFunction.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonAggregateFunction.java
@@ -172,10 +172,9 @@ public class PythonAggregateFunction extends
AggregateFunction implements Python
if (inputTypesString != null) {
inputTypes =
- (DataType[])
- Arrays.stream(inputTypesString)
- .map(typeFactory::createDataType)
- .toArray();
+ Arrays.stream(inputTypesString)
+ .map(typeFactory::createDataType)
+ .toArray(DataType[]::new);
}
if (inputTypes != null) {
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java
index fe3c1fc9f75..9d14db265f9 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java
@@ -159,10 +159,9 @@ public class PythonScalarFunction extends ScalarFunction
implements PythonFuncti
if (inputTypesString != null) {
inputTypes =
- (DataType[])
- Arrays.stream(inputTypesString)
- .map(typeFactory::createDataType)
- .toArray();
+ Arrays.stream(inputTypesString)
+ .map(typeFactory::createDataType)
+ .toArray(DataType[]::new);
}
if (inputTypes != null) {
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableAggregateFunction.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableAggregateFunction.java
index 3d29d20ecb0..10083679294 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableAggregateFunction.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableAggregateFunction.java
@@ -171,10 +171,9 @@ public class PythonTableAggregateFunction extends
TableAggregateFunction impleme
TypeInference.Builder builder = TypeInference.newBuilder();
if (inputTypesString != null) {
inputTypes =
- (DataType[])
- Arrays.stream(inputTypesString)
- .map(typeFactory::createDataType)
- .toArray();
+ Arrays.stream(inputTypesString)
+ .map(typeFactory::createDataType)
+ .toArray(DataType[]::new);
}
if (inputTypes != null) {
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
index 19da281bdb9..8d6ac79d813 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
@@ -160,10 +160,9 @@ public class PythonTableFunction extends
TableFunction<Row> implements PythonFun
if (inputTypesString != null) {
inputTypes =
- (DataType[])
- Arrays.stream(inputTypesString)
- .map(typeFactory::createDataType)
- .toArray();
+ Arrays.stream(inputTypesString)
+ .map(typeFactory::createDataType)
+ .toArray(DataType[]::new);
}
if (inputTypes != null) {