[ 
https://issues.apache.org/jira/browse/FLINK-30637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo resolved FLINK-30637.
----------------------------------
    Fix Version/s: 1.16.1
                   1.15.4
         Assignee: Xin Chen
       Resolution: Fixed

Merged into master via d053867fb5c0fc647ea9266aab35598d7f3fc5c4
Merged into release-1.16 via eca940c5bf9e17c90dbb6f35e4ba370027137368
Merged into release-1.15 via 4035d61a2756ec16046fb687f533be0501fbbd35

> In linux-aarch64 environment, using “is” judgment to match the window type of 
> overwindow have returned incorrect matching results
> ---------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30637
>                 URL: https://issues.apache.org/jira/browse/FLINK-30637
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.13.6
>         Environment: Linux version 5.10.0-60.18.0.50.oe2203.aarch64
> (abuild@obs-worker-002) (gcc_old (GCC) 10.3.1, GNU ld (GNU Binutils) 2.37) #1 
> SMP Wed Mar 30 02:43:08 UTC 2022
>  
> pyflink-version:1.13.6
>            Reporter: Xin Chen
>            Assignee: Xin Chen
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.16.1, 1.15.4
>
>
> In  linux-arch64 environment, “window_type is 
> OverWindow.ROW_UNBOUNDED_FOLLOWING” in  in the 
> PandasBatchOverWindowAggregateFunctionOperation class of the pyflink source 
> code has returned the wrong result.
> For example, when window_type is 6, it represents the window type of 
> ‘ROW_UNBOUNDED_FOLLOWING’, but “window_type is 
> OverWindow.ROW_UNBOUNDED_FOLLOWING” return false because the memory address 
> of window_type has changed. It will lead to the wrong type of window, such as 
> row sliding window, so, the wrong input data of python udf have been 
> assembled and wrong results of that have appeared.
>  
> Specifically, the pyflink unit testcase is 
> ‘test_over_window_aggregate_function’ in 
> ‘pyflink\table\tests\test_pandas_udaf.py’. It performance incorrectly when I 
> execute it by pytest on linux-aarch64 system. I cut this unit use case to the 
> following code and executed it in the flink standalone mode of aarch64 
> system, and got the same error result:
>  
> {code:java}
> import unittest
> from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
> from pyflink.table.udf import udaf, AggregateFunction
> class MaxAdd(AggregateFunction, unittest.TestCase):
>     def open(self, function_context):
>         mg = function_context.get_metric_group()
>         self.counter = mg.add_group("key", "value").counter("my_counter")
>         self.counter_sum = 0
>     def get_value(self, accumulator):
>         # counter
>         self.counter.inc(10)
>         self.counter_sum += 10
>         return accumulator[0]
>     def create_accumulator(self):
>         return []
>     def accumulate(self, accumulator, *args):
>         result = 0
>         for arg in args:
>             result += arg.max()
>         accumulator.append(result)
> @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
> def mean_udaf(v):
>     import logging
>     logging.error("debug")
>     logging.error(v)
>     return v.mean()
> t_env = TableEnvironment.create(
>     
> EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build())
> t_env.get_config().get_configuration().set_string("parallelism.default", "2")
> t_env.get_config().get_configuration().set_string(
>     "python.fn-execution.bundle.size", "1")
> import datetime
> t = t_env.from_elements(
>     [
>         (1, 2, 3, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)),
>         (1, 3, 1, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)),
>         (1, 8, 5, datetime.datetime(2018, 3, 11, 4, 20, 0, 0))
>     ],
>     DataTypes.ROW(
>         [DataTypes.FIELD("a", DataTypes.TINYINT()),
>          DataTypes.FIELD("b", DataTypes.SMALLINT()),
>          DataTypes.FIELD("c", DataTypes.INT()),
>          DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))]))
> # sink
> t_env.execute_sql("""
>         CREATE TABLE mySink (
>           c INT,
>           d FLOAT 
>         ) WITH (
>           'connector' = 'print'
>         )
>     """)
> t_env.create_temporary_system_function("mean_udaf", mean_udaf)
> t_env.register_function("max_add", udaf(MaxAdd(),
>                                         result_type=DataTypes.INT(),
>                                         func_type="pandas"))
> t_env.register_table("T", t)
> t_env.execute_sql("""
>         insert into mySink
>         select
>          max_add(b, c)
>          over (PARTITION BY a ORDER BY rowtime
>          ROWS BETWEEN UNBOUNDED preceding AND 0 FOLLOWING),
>          mean_udaf(b)
>          over (PARTITION BY a ORDER BY rowtime
>          ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING)
>         from T
>     """).wait()
> '''
> assert_equals(actual,
>               ["5,4.3333335",
>                "13,5.5",
>                "6,4.3333335"])
> '''{code}
> The expected results are ["5,4.3333335", "13,5.5", "6,4.3333335"], but actual 
> results are List(5,2.0, 13,5.5, 4,2.5). For ‘mean_udaf’ and 
> ‘OverWindow.UNBOUNDED FOLLOWING’ in the code, by adding the error log, I 
> found that when window_type is 6 and 'OverWindow.ROW_UNBOUNDED_FOLLOWING' 
> also represents 6, the following code from pyflink source code returned false.
> {code:java}
> // pyflink\fn_execution\operations.py (line 273)
> elif window_type is OverWindow.ROW_UNBOUNDED_FOLLOWING:
>     # row unbounded following window
>     window_start = window.lower_boundary
>     for j in range(input_cnt):
>         start = max(j + window_start, 0)
>         series_slices = [s.iloc[start: input_cnt] for s in input_series]
>         result.append(func(series_slices)){code}
> And it It finally chose row sliding window to assemble input data of 
> mean_udaf:
> {code:java}
> // pyflink\fn_execution\operations.py (line 280) 
> else:
>     # row sliding window
>     window_start = window.lower_boundary
>     window_end = window.upper_boundary
>     for j in range(input_cnt):
>         start = max(j + window_start, 0)
>         end = min(j + window_end + 1, input_cnt)
>         series_slices = [s.iloc[start: end] for s in input_series]
>         result.append(func(series_slices)){code}
> Obviously, that's not right. The right choice will be made in x86 environment.
> The reason is window_ type‘s memory address  is different from 
> ‘OverWindow.ROW_ UNBOUNDED_ FOLLOWING’ in linux-aarch64 environment. On the 
> contrary, they are the same in the linux-x86 environment. The reason why the 
> memory address is different is unknown yet. But I observed that window_type 
> comes from 'serialized_fn.windows':
> {code:java}
> def __init__(self, spec): 
> super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(spec) 
> self.windows = [window for window in self.spec.serialized_fn.windows]
> {code}
> Perhaps grpc, protobuf dependencies or serialization operations in the arrch 
> environment have affected the memory address of the int variables, I'll 
> explore the underlying reasons later.
>  
> Solution and suggestion:
> Since the window selections need to compare the values of two integer 
> variables(window_type, OverWindow.ROW_ UNBOUNDED_ FOLLOWING), I recommend 
> replacing ‘is’ with ‘==’ at the window type matching. That can also prevents 
> erroneous results caused by python small integer object pool failure which 
> may also affects the memory address. And this modification has been verified 
> to perform correctly on both x86 and aarch64 environments, either this unit 
> test case or the case I cut.
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to