[
https://issues.apache.org/jira/browse/FLINK-21434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Wei Zhong reassigned FLINK-21434:
---------------------------------
Assignee: Wei Zhong
> When UDAF return ROW type, and the number of fields is more than 14, the
> crash happend
> --------------------------------------------------------------------------------------
>
> Key: FLINK-21434
> URL: https://issues.apache.org/jira/browse/FLINK-21434
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.12.1
> Environment: python 3.7.5
> pyflink 1.12.1
> Reporter: awayne
> Assignee: Wei Zhong
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.13.0, 1.12.3
>
>
> Code(a simple udaf to return a Row containing 15 fields):
> {code:python}
> from pyflink.common import Row
> from pyflink.table.udf import AggregateFunction, udaf
> from pyflink.table import DataTypes, EnvironmentSettings,
> StreamTableEnvironment
> class Test(AggregateFunction):
> def create_accumulator(self):
> return Row(0, 0)
> def get_value(self, accumulator):
> return Row(1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23,
> 1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23)
> def accumulate(self, accumulator, a, b):
> pass
> def get_result_type(self):
> return DataTypes.ROW([
> DataTypes.FIELD("f1", DataTypes.FLOAT()),
> DataTypes.FIELD("f2", DataTypes.FLOAT()),
> DataTypes.FIELD("f3", DataTypes.FLOAT()),
> DataTypes.FIELD("f4", DataTypes.FLOAT()),
> DataTypes.FIELD("f5", DataTypes.FLOAT()),
> DataTypes.FIELD("f6", DataTypes.FLOAT()),
> DataTypes.FIELD("f7", DataTypes.FLOAT()),
> DataTypes.FIELD("f8", DataTypes.FLOAT()),
> DataTypes.FIELD("f9", DataTypes.FLOAT()),
> DataTypes.FIELD("f10", DataTypes.FLOAT()),
> DataTypes.FIELD("f11", DataTypes.FLOAT()),
> DataTypes.FIELD("f12", DataTypes.FLOAT()),
> DataTypes.FIELD("f13", DataTypes.FLOAT()),
> DataTypes.FIELD("f14", DataTypes.FLOAT()),
> DataTypes.FIELD("f15", DataTypes.FLOAT())
> ])
> def get_accumulator_type(self):
> return DataTypes.ROW([
> DataTypes.FIELD("f1", DataTypes.BIGINT()),
> DataTypes.FIELD("f2", DataTypes.BIGINT())])
> def udaf_test():
> env_settings =
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> table_env = StreamTableEnvironment.create(environment_settings=env_settings)
> test = udaf(Test())
> table_env.execute_sql("""
> CREATE TABLE print_sink (
> `name` STRING,
> `agg` ROW<f1 FLOAT, f2 FLOAT, f3 FLOAT, f4 FLOAT,
> f5 FLOAT, f6 FLOAT, f7 FLOAT, f8 FLOAT,
> f9 FLOAT, f10 FLOAT, f11 FLOAT, f12 FLOAT,
> f13 FLOAT, f14 FLOAT, f15 FLOAT>
> ) WITH (
> 'connector' = 'print'
> )
> """)
> table = table_env.from_elements([(1, 2, "Lee")], ['value', 'count', 'name'])
> result_table = table.group_by(table.name)\
> .select(table.name, test(table.value, table.count))
> result_table.execute_insert("print_sink").wait()
> if __name__ == "__main__":
> udaf_test()
> {code}
> Exception:
> {code:java}
> Caused by: java.io.EOFException
> at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397)
> at java.base/java.io.DataInputStream.readFloat(DataInputStream.java:451)
> at
> org.apache.flink.api.common.typeutils.base.FloatSerializer.deserialize(FloatSerializer.java:72)
> at
> org.apache.flink.api.common.typeutils.base.FloatSerializer.deserialize(FloatSerializer.java:30)
> at
> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106)
> at
> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49)
> at
> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)