[
https://issues.apache.org/jira/browse/FLINK-30637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Xin Chen updated FLINK-30637:
-----------------------------
Description:
In linux-arch64 environment, “window_type is
OverWindow.ROW_UNBOUNDED_FOLLOWING” in in the
PandasBatchOverWindowAggregateFunctionOperation class of the pyflink source
code has returned the wrong result.
For example, when window_type is 6, it represents the window type of
‘ROW_UNBOUNDED_FOLLOWING’, but “window_type is
OverWindow.ROW_UNBOUNDED_FOLLOWING” return false because the memory address of
window_type has changed. It will lead to the wrong type of window, such as row
sliding window, so, the wrong input data of python udf have been assembled and
wrong results of that have appeared.
Specifically, the pyflink unit testcase is
‘test_over_window_aggregate_function’ in
‘pyflink\table\tests\test_pandas_udaf.py’. It performance incorrectly when I
execute it by pytest on linux-aarch64 system. I cut this unit use case to the
following code and executed it in the flink standalone mode of aarch64 system,
and got the same error result:
{code:java}
import unittest
from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table.udf import udaf, AggregateFunction
class MaxAdd(AggregateFunction, unittest.TestCase):
def open(self, function_context):
mg = function_context.get_metric_group()
self.counter = mg.add_group("key", "value").counter("my_counter")
self.counter_sum = 0
def get_value(self, accumulator):
# counter
self.counter.inc(10)
self.counter_sum += 10
return accumulator[0]
def create_accumulator(self):
return []
def accumulate(self, accumulator, *args):
result = 0
for arg in args:
result += arg.max()
accumulator.append(result)
@udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
def mean_udaf(v):
import logging
logging.error("debug")
logging.error(v)
return v.mean()
t_env = TableEnvironment.create(
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build())
t_env.get_config().get_configuration().set_string("parallelism.default", "2")
t_env.get_config().get_configuration().set_string(
"python.fn-execution.bundle.size", "1")
import datetime
t = t_env.from_elements(
[
(1, 2, 3, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)),
(1, 3, 1, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)),
(1, 8, 5, datetime.datetime(2018, 3, 11, 4, 20, 0, 0))
],
DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.TINYINT()),
DataTypes.FIELD("b", DataTypes.SMALLINT()),
DataTypes.FIELD("c", DataTypes.INT()),
DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))]))
# sink
t_env.execute_sql("""
CREATE TABLE mySink (
c INT,
d FLOAT
) WITH (
'connector' = 'print'
)
""")
t_env.create_temporary_system_function("mean_udaf", mean_udaf)
t_env.register_function("max_add", udaf(MaxAdd(),
result_type=DataTypes.INT(),
func_type="pandas"))
t_env.register_table("T", t)
t_env.execute_sql("""
insert into mySink
select
max_add(b, c)
over (PARTITION BY a ORDER BY rowtime
ROWS BETWEEN UNBOUNDED preceding AND 0 FOLLOWING),
mean_udaf(b)
over (PARTITION BY a ORDER BY rowtime
ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING)
from T
""").wait()
'''
assert_equals(actual,
["5,4.3333335",
"13,5.5",
"6,4.3333335"])
'''{code}
The expected results are ["5,4.3333335", "13,5.5", "6,4.3333335"], but actual
results are List(5,2.0, 13,5.5, 4,2.5). For ‘mean_udaf’ in the code as, by
adding the error log, I found the input of udf in the taskmanager log as
follows:
{code:java}
// linux-aarch64:
2023-01-11 10:08:02,140 ERROR
/home/python3711/lib/python3.7/site-packages/pyflink/fn_execution/operations.py:231
[] - debug
2023-01-11 10:08:02,141 ERROR
/home/python3711/lib/python3.7/site-packages/pyflink/fn_execution/operations.py:232
[] - [0 2
Name: b, dtype: int16]
2023-01-11 10:08:02,163 ERROR
/home/python3711/lib/python3.7/site-packages/pyflink/fn_execution/operations.py:319
[] - [0 2
1 3
Name: b, dtype: int16]
2023-01-11 10:08:02,165 ERROR
/home/python3711/lib/python3.7/site-packages/pyflink/fn_execution/operations.py:319
[] - [0 2
1 3
2 8
Name: b, dtype: int16]{code}
But in linux-x86, it show correctly as follows:
was:
In linux-arch64 environment, “window_type is
OverWindow.ROW_UNBOUNDED_FOLLOWING” in in the
PandasBatchOverWindowAggregateFunctionOperation class of the pyflink source
code has returned the wrong result.
> In linux-aarch64 environment, using “is” judgment to match the window type of
> overwindow have returned incorrect matching results
> ---------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-30637
> URL: https://issues.apache.org/jira/browse/FLINK-30637
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.13.6
> Environment: Linux version 5.10.0-60.18.0.50.oe2203.aarch64
> (abuild@obs-worker-002) (gcc_old (GCC) 10.3.1, GNU ld (GNU Binutils) 2.37) #1
> SMP Wed Mar 30 02:43:08 UTC 2022
>
> pyflink-version:1.13.6
> Reporter: Xin Chen
> Priority: Major
> Labels: pull-request-available
>
> In linux-arch64 environment, “window_type is
> OverWindow.ROW_UNBOUNDED_FOLLOWING” in in the
> PandasBatchOverWindowAggregateFunctionOperation class of the pyflink source
> code has returned the wrong result.
> For example, when window_type is 6, it represents the window type of
> ‘ROW_UNBOUNDED_FOLLOWING’, but “window_type is
> OverWindow.ROW_UNBOUNDED_FOLLOWING” return false because the memory address
> of window_type has changed. It will lead to the wrong type of window, such as
> row sliding window, so, the wrong input data of python udf have been
> assembled and wrong results of that have appeared.
>
> Specifically, the pyflink unit testcase is
> ‘test_over_window_aggregate_function’ in
> ‘pyflink\table\tests\test_pandas_udaf.py’. It performance incorrectly when I
> execute it by pytest on linux-aarch64 system. I cut this unit use case to the
> following code and executed it in the flink standalone mode of aarch64
> system, and got the same error result:
>
> {code:java}
> import unittest
> from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
> from pyflink.table.udf import udaf, AggregateFunction
> class MaxAdd(AggregateFunction, unittest.TestCase):
> def open(self, function_context):
> mg = function_context.get_metric_group()
> self.counter = mg.add_group("key", "value").counter("my_counter")
> self.counter_sum = 0
> def get_value(self, accumulator):
> # counter
> self.counter.inc(10)
> self.counter_sum += 10
> return accumulator[0]
> def create_accumulator(self):
> return []
> def accumulate(self, accumulator, *args):
> result = 0
> for arg in args:
> result += arg.max()
> accumulator.append(result)
> @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
> def mean_udaf(v):
> import logging
> logging.error("debug")
> logging.error(v)
> return v.mean()
> t_env = TableEnvironment.create(
>
> EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build())
> t_env.get_config().get_configuration().set_string("parallelism.default", "2")
> t_env.get_config().get_configuration().set_string(
> "python.fn-execution.bundle.size", "1")
> import datetime
> t = t_env.from_elements(
> [
> (1, 2, 3, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)),
> (1, 3, 1, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)),
> (1, 8, 5, datetime.datetime(2018, 3, 11, 4, 20, 0, 0))
> ],
> DataTypes.ROW(
> [DataTypes.FIELD("a", DataTypes.TINYINT()),
> DataTypes.FIELD("b", DataTypes.SMALLINT()),
> DataTypes.FIELD("c", DataTypes.INT()),
> DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))]))
> # sink
> t_env.execute_sql("""
> CREATE TABLE mySink (
> c INT,
> d FLOAT
> ) WITH (
> 'connector' = 'print'
> )
> """)
> t_env.create_temporary_system_function("mean_udaf", mean_udaf)
> t_env.register_function("max_add", udaf(MaxAdd(),
> result_type=DataTypes.INT(),
> func_type="pandas"))
> t_env.register_table("T", t)
> t_env.execute_sql("""
> insert into mySink
> select
> max_add(b, c)
> over (PARTITION BY a ORDER BY rowtime
> ROWS BETWEEN UNBOUNDED preceding AND 0 FOLLOWING),
> mean_udaf(b)
> over (PARTITION BY a ORDER BY rowtime
> ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING)
> from T
> """).wait()
> '''
> assert_equals(actual,
> ["5,4.3333335",
> "13,5.5",
> "6,4.3333335"])
> '''{code}
> The expected results are ["5,4.3333335", "13,5.5", "6,4.3333335"], but actual
> results are List(5,2.0, 13,5.5, 4,2.5). For ‘mean_udaf’ in the code as, by
> adding the error log, I found the input of udf in the taskmanager log as
> follows:
> {code:java}
> // linux-aarch64:
> 2023-01-11 10:08:02,140 ERROR
> /home/python3711/lib/python3.7/site-packages/pyflink/fn_execution/operations.py:231
> [] - debug
> 2023-01-11 10:08:02,141 ERROR
> /home/python3711/lib/python3.7/site-packages/pyflink/fn_execution/operations.py:232
> [] - [0 2
> Name: b, dtype: int16]
> 2023-01-11 10:08:02,163 ERROR
> /home/python3711/lib/python3.7/site-packages/pyflink/fn_execution/operations.py:319
> [] - [0 2
> 1 3
> Name: b, dtype: int16]
> 2023-01-11 10:08:02,165 ERROR
> /home/python3711/lib/python3.7/site-packages/pyflink/fn_execution/operations.py:319
> [] - [0 2
> 1 3
> 2 8
> Name: b, dtype: int16]{code}
> But in linux-x86, it show correctly as follows:
>
>
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)