[ 
https://issues.apache.org/jira/browse/FLINK-21434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Zhong reassigned FLINK-21434:
---------------------------------

    Assignee: Wei Zhong

> When UDAF return ROW type, and the number of fields is more than 14, the 
> crash happend
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-21434
>                 URL: https://issues.apache.org/jira/browse/FLINK-21434
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.12.1
>         Environment: python 3.7.5
> pyflink 1.12.1
>            Reporter: awayne
>            Assignee: Wei Zhong
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.13.0, 1.12.3
>
>
>  Code(a simple udaf to return a Row containing 15 fields):
> {code:python}
> from pyflink.common import Row
> from pyflink.table.udf import AggregateFunction, udaf
> from pyflink.table import DataTypes, EnvironmentSettings, 
> StreamTableEnvironment
> class Test(AggregateFunction):
>   def create_accumulator(self):
>     return Row(0, 0)
>   def get_value(self, accumulator):
>     return Row(1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23,
>                1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23)
>   def accumulate(self, accumulator, a, b):
>     pass
>   def get_result_type(self):
>     return DataTypes.ROW([
>         DataTypes.FIELD("f1", DataTypes.FLOAT()),
>         DataTypes.FIELD("f2", DataTypes.FLOAT()),
>         DataTypes.FIELD("f3", DataTypes.FLOAT()),
>         DataTypes.FIELD("f4", DataTypes.FLOAT()),
>         DataTypes.FIELD("f5", DataTypes.FLOAT()),
>         DataTypes.FIELD("f6", DataTypes.FLOAT()),
>         DataTypes.FIELD("f7", DataTypes.FLOAT()),
>         DataTypes.FIELD("f8", DataTypes.FLOAT()),
>         DataTypes.FIELD("f9", DataTypes.FLOAT()),
>         DataTypes.FIELD("f10", DataTypes.FLOAT()),
>         DataTypes.FIELD("f11", DataTypes.FLOAT()),
>         DataTypes.FIELD("f12", DataTypes.FLOAT()),
>         DataTypes.FIELD("f13", DataTypes.FLOAT()),
>         DataTypes.FIELD("f14", DataTypes.FLOAT()),
>         DataTypes.FIELD("f15", DataTypes.FLOAT())
>     ])
>   def get_accumulator_type(self):
>     return DataTypes.ROW([
>         DataTypes.FIELD("f1", DataTypes.BIGINT()),
>         DataTypes.FIELD("f2", DataTypes.BIGINT())])
> def udaf_test():
>   env_settings = 
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
>   table_env = StreamTableEnvironment.create(environment_settings=env_settings)
>   test = udaf(Test())
>   table_env.execute_sql("""
>       CREATE TABLE print_sink (
>           `name` STRING,
>           `agg` ROW<f1 FLOAT, f2 FLOAT, f3 FLOAT, f4 FLOAT,
>                     f5 FLOAT, f6 FLOAT, f7 FLOAT, f8 FLOAT,
>                     f9 FLOAT, f10 FLOAT, f11 FLOAT, f12 FLOAT,
>                     f13 FLOAT, f14 FLOAT, f15 FLOAT>
>       ) WITH (
>           'connector' = 'print'
>       )
>   """)
>   table = table_env.from_elements([(1, 2, "Lee")], ['value', 'count', 'name'])
>   result_table = table.group_by(table.name)\
>                       .select(table.name, test(table.value, table.count))
>   result_table.execute_insert("print_sink").wait()
> if __name__ == "__main__":
>   udaf_test()
> {code}
> Exception:
> {code:java}
> Caused by: java.io.EOFException
>       at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397)
>       at java.base/java.io.DataInputStream.readFloat(DataInputStream.java:451)
>       at 
> org.apache.flink.api.common.typeutils.base.FloatSerializer.deserialize(FloatSerializer.java:72)
>       at 
> org.apache.flink.api.common.typeutils.base.FloatSerializer.deserialize(FloatSerializer.java:30)
>       at 
> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106)
>       at 
> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49)
>       at 
> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to