[
https://issues.apache.org/jira/browse/FLINK-30637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dian Fu updated FLINK-30637:
----------------------------
Fix Version/s: 1.17.0
> In linux-aarch64 environment, using “is” judgment to match the window type of
> overwindow have returned incorrect matching results
> ---------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-30637
> URL: https://issues.apache.org/jira/browse/FLINK-30637
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.13.6
> Environment: Linux version 5.10.0-60.18.0.50.oe2203.aarch64
> (abuild@obs-worker-002) (gcc_old (GCC) 10.3.1, GNU ld (GNU Binutils) 2.37) #1
> SMP Wed Mar 30 02:43:08 UTC 2022
>
> pyflink-version:1.13.6
> Reporter: Xin Chen
> Assignee: Xin Chen
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.17.0, 1.16.1, 1.15.4
>
>
> In linux-arch64 environment, “window_type is
> OverWindow.ROW_UNBOUNDED_FOLLOWING” in in the
> PandasBatchOverWindowAggregateFunctionOperation class of the pyflink source
> code has returned the wrong result.
> For example, when window_type is 6, it represents the window type of
> ‘ROW_UNBOUNDED_FOLLOWING’, but “window_type is
> OverWindow.ROW_UNBOUNDED_FOLLOWING” return false because the memory address
> of window_type has changed. It will lead to the wrong type of window, such as
> row sliding window, so, the wrong input data of python udf have been
> assembled and wrong results of that have appeared.
>
> Specifically, the pyflink unit testcase is
> ‘test_over_window_aggregate_function’ in
> ‘pyflink\table\tests\test_pandas_udaf.py’. It performance incorrectly when I
> execute it by pytest on linux-aarch64 system. I cut this unit use case to the
> following code and executed it in the flink standalone mode of aarch64
> system, and got the same error result:
>
> {code:java}
> import unittest
> from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
> from pyflink.table.udf import udaf, AggregateFunction
> class MaxAdd(AggregateFunction, unittest.TestCase):
> def open(self, function_context):
> mg = function_context.get_metric_group()
> self.counter = mg.add_group("key", "value").counter("my_counter")
> self.counter_sum = 0
> def get_value(self, accumulator):
> # counter
> self.counter.inc(10)
> self.counter_sum += 10
> return accumulator[0]
> def create_accumulator(self):
> return []
> def accumulate(self, accumulator, *args):
> result = 0
> for arg in args:
> result += arg.max()
> accumulator.append(result)
> @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
> def mean_udaf(v):
> import logging
> logging.error("debug")
> logging.error(v)
> return v.mean()
> t_env = TableEnvironment.create(
>
> EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build())
> t_env.get_config().get_configuration().set_string("parallelism.default", "2")
> t_env.get_config().get_configuration().set_string(
> "python.fn-execution.bundle.size", "1")
> import datetime
> t = t_env.from_elements(
> [
> (1, 2, 3, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)),
> (1, 3, 1, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)),
> (1, 8, 5, datetime.datetime(2018, 3, 11, 4, 20, 0, 0))
> ],
> DataTypes.ROW(
> [DataTypes.FIELD("a", DataTypes.TINYINT()),
> DataTypes.FIELD("b", DataTypes.SMALLINT()),
> DataTypes.FIELD("c", DataTypes.INT()),
> DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))]))
> # sink
> t_env.execute_sql("""
> CREATE TABLE mySink (
> c INT,
> d FLOAT
> ) WITH (
> 'connector' = 'print'
> )
> """)
> t_env.create_temporary_system_function("mean_udaf", mean_udaf)
> t_env.register_function("max_add", udaf(MaxAdd(),
> result_type=DataTypes.INT(),
> func_type="pandas"))
> t_env.register_table("T", t)
> t_env.execute_sql("""
> insert into mySink
> select
> max_add(b, c)
> over (PARTITION BY a ORDER BY rowtime
> ROWS BETWEEN UNBOUNDED preceding AND 0 FOLLOWING),
> mean_udaf(b)
> over (PARTITION BY a ORDER BY rowtime
> ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING)
> from T
> """).wait()
> '''
> assert_equals(actual,
> ["5,4.3333335",
> "13,5.5",
> "6,4.3333335"])
> '''{code}
> The expected results are ["5,4.3333335", "13,5.5", "6,4.3333335"], but actual
> results are List(5,2.0, 13,5.5, 4,2.5). For ‘mean_udaf’ and
> ‘OverWindow.UNBOUNDED FOLLOWING’ in the code, by adding the error log, I
> found that when window_type is 6 and 'OverWindow.ROW_UNBOUNDED_FOLLOWING'
> also represents 6, the following code from pyflink source code returned false.
> {code:java}
> // pyflink\fn_execution\operations.py (line 273)
> elif window_type is OverWindow.ROW_UNBOUNDED_FOLLOWING:
> # row unbounded following window
> window_start = window.lower_boundary
> for j in range(input_cnt):
> start = max(j + window_start, 0)
> series_slices = [s.iloc[start: input_cnt] for s in input_series]
> result.append(func(series_slices)){code}
> And it It finally chose row sliding window to assemble input data of
> mean_udaf:
> {code:java}
> // pyflink\fn_execution\operations.py (line 280)
> else:
> # row sliding window
> window_start = window.lower_boundary
> window_end = window.upper_boundary
> for j in range(input_cnt):
> start = max(j + window_start, 0)
> end = min(j + window_end + 1, input_cnt)
> series_slices = [s.iloc[start: end] for s in input_series]
> result.append(func(series_slices)){code}
> Obviously, that's not right. The right choice will be made in x86 environment.
> The reason is window_ type‘s memory address is different from
> ‘OverWindow.ROW_ UNBOUNDED_ FOLLOWING’ in linux-aarch64 environment. On the
> contrary, they are the same in the linux-x86 environment. The reason why the
> memory address is different is unknown yet. But I observed that window_type
> comes from 'serialized_fn.windows':
> {code:java}
> def __init__(self, spec):
> super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(spec)
> self.windows = [window for window in self.spec.serialized_fn.windows]
> {code}
> Perhaps grpc, protobuf dependencies or serialization operations in the arrch
> environment have affected the memory address of the int variables, I'll
> explore the underlying reasons later.
>
> Solution and suggestion:
> Since the window selections need to compare the values of two integer
> variables(window_type, OverWindow.ROW_ UNBOUNDED_ FOLLOWING), I recommend
> replacing ‘is’ with ‘==’ at the window type matching. That can also prevents
> erroneous results caused by python small integer object pool failure which
> may also affects the memory address. And this modification has been verified
> to perform correctly on both x86 and aarch64 environments, either this unit
> test case or the case I cut.
>
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)