[ 
https://issues.apache.org/jira/browse/FLINK-30637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xin Chen updated FLINK-30637:
-----------------------------
    Description: 
In  linux-arch64 environment, “window_type is 
OverWindow.ROW_UNBOUNDED_FOLLOWING” in  in the 
PandasBatchOverWindowAggregateFunctionOperation class of the pyflink source 
code has returned the wrong result.

For example, when window_type is 6, it represents the window type of 
‘ROW_UNBOUNDED_FOLLOWING’, but “window_type is 
OverWindow.ROW_UNBOUNDED_FOLLOWING” return false because the memory address of 
window_type has changed. It will lead to the wrong type of window, such as row 
sliding window, so, the wrong input data of python udf have been assembled and 
wrong results of that have appeared.

 

Specifically, the pyflink unit testcase is 
‘test_over_window_aggregate_function’ in 
‘pyflink\table\tests\test_pandas_udaf.py’. It performance incorrectly when I 
execute it by pytest on linux-aarch64 system. I cut this unit use case to the 
following code and executed it in the flink standalone mode of aarch64 system, 
and got the same error result:

 
{code:java}
import unittest
from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table.udf import udaf, AggregateFunction


class MaxAdd(AggregateFunction, unittest.TestCase):

    def open(self, function_context):
        mg = function_context.get_metric_group()
        self.counter = mg.add_group("key", "value").counter("my_counter")
        self.counter_sum = 0

    def get_value(self, accumulator):
        # counter
        self.counter.inc(10)
        self.counter_sum += 10
        return accumulator[0]

    def create_accumulator(self):
        return []

    def accumulate(self, accumulator, *args):
        result = 0
        for arg in args:
            result += arg.max()
        accumulator.append(result)


@udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
def mean_udaf(v):
    import logging
    logging.error("debug")
    logging.error(v)
    return v.mean()


t_env = TableEnvironment.create(
    
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build())
t_env.get_config().get_configuration().set_string("parallelism.default", "2")
t_env.get_config().get_configuration().set_string(
    "python.fn-execution.bundle.size", "1")

import datetime

t = t_env.from_elements(
    [
        (1, 2, 3, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)),
        (1, 3, 1, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)),
        (1, 8, 5, datetime.datetime(2018, 3, 11, 4, 20, 0, 0))
    ],
    DataTypes.ROW(
        [DataTypes.FIELD("a", DataTypes.TINYINT()),
         DataTypes.FIELD("b", DataTypes.SMALLINT()),
         DataTypes.FIELD("c", DataTypes.INT()),
         DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))]))

# sink
t_env.execute_sql("""
        CREATE TABLE mySink (
          c INT,
          d FLOAT 
        ) WITH (
          'connector' = 'print'
        )
    """)

t_env.create_temporary_system_function("mean_udaf", mean_udaf)
t_env.register_function("max_add", udaf(MaxAdd(),
                                        result_type=DataTypes.INT(),
                                        func_type="pandas"))
t_env.register_table("T", t)
t_env.execute_sql("""
        insert into mySink
        select
         max_add(b, c)
         over (PARTITION BY a ORDER BY rowtime
         ROWS BETWEEN UNBOUNDED preceding AND 0 FOLLOWING),
         mean_udaf(b)
         over (PARTITION BY a ORDER BY rowtime
         ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING)
        from T
    """).wait()
'''
assert_equals(actual,
              ["5,4.3333335",
               "13,5.5",
               "6,4.3333335"])
'''{code}
The expected results are ["5,4.3333335", "13,5.5", "6,4.3333335"], but actual 
results are List(5,2.0, 13,5.5, 4,2.5). For ‘mean_udaf’ and 
‘OverWindow.UNBOUNDED FOLLOWING’ in the code, by adding the error log, I found 
that when window_type is 6 and 'OverWindow.ROW_UNBOUNDED_FOLLOWING' also 
represents 6, the following code from pyflink source code returned false.
{code:java}
// pyflink\fn_execution\operations.py (line 273)
elif window_type is OverWindow.ROW_UNBOUNDED_FOLLOWING:
    # row unbounded following window
    window_start = window.lower_boundary
    for j in range(input_cnt):
        start = max(j + window_start, 0)
        series_slices = [s.iloc[start: input_cnt] for s in input_series]
        result.append(func(series_slices)){code}
And it It finally chose row sliding window to assemble input data of mean_udaf:
{code:java}
// pyflink\fn_execution\operations.py (line 280) 
else:
    # row sliding window
    window_start = window.lower_boundary
    window_end = window.upper_boundary
    for j in range(input_cnt):
        start = max(j + window_start, 0)
        end = min(j + window_end + 1, input_cnt)
        series_slices = [s.iloc[start: end] for s in input_series]
        result.append(func(series_slices)){code}
Obviously, that's not right. The right choice will be made in x86 environment.

The reason is window_ type‘s memory address  is different from ‘OverWindow.ROW_ 
UNBOUNDED_ FOLLOWING’ in linux-aarch64 environment. On the contrary, they are 
the same in the linux-x86 environment. The reason why the memory address is 
different is unknown yet. But I observed that window_type comes from 
'serialized_fn.windows':
{code:java}
def __init__(self, spec): 
super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(spec) 
self.windows = [window for window in self.spec.serialized_fn.windows]
{code}
Perhaps the grpc, protobuf dependand serialization operations in the arrch 
environment have affected the memory address

 

 

 

 

 

  was:
In  linux-arch64 environment, “window_type is 
OverWindow.ROW_UNBOUNDED_FOLLOWING” in  in the 
PandasBatchOverWindowAggregateFunctionOperation class of the pyflink source 
code has returned the wrong result.

For example, when window_type is 6, it represents the window type of 
‘ROW_UNBOUNDED_FOLLOWING’, but “window_type is 
OverWindow.ROW_UNBOUNDED_FOLLOWING” return false because the memory address of 
window_type has changed. It will lead to the wrong type of window, such as row 
sliding window, so, the wrong input data of python udf have been assembled and 
wrong results of that have appeared.

 

Specifically, the pyflink unit testcase is 
‘test_over_window_aggregate_function’ in 
‘pyflink\table\tests\test_pandas_udaf.py’. It performance incorrectly when I 
execute it by pytest on linux-aarch64 system. I cut this unit use case to the 
following code and executed it in the flink standalone mode of aarch64 system, 
and got the same error result:

 
{code:java}
import unittest
from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table.udf import udaf, AggregateFunction


class MaxAdd(AggregateFunction, unittest.TestCase):

    def open(self, function_context):
        mg = function_context.get_metric_group()
        self.counter = mg.add_group("key", "value").counter("my_counter")
        self.counter_sum = 0

    def get_value(self, accumulator):
        # counter
        self.counter.inc(10)
        self.counter_sum += 10
        return accumulator[0]

    def create_accumulator(self):
        return []

    def accumulate(self, accumulator, *args):
        result = 0
        for arg in args:
            result += arg.max()
        accumulator.append(result)


@udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
def mean_udaf(v):
    import logging
    logging.error("debug")
    logging.error(v)
    return v.mean()


t_env = TableEnvironment.create(
    
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build())
t_env.get_config().get_configuration().set_string("parallelism.default", "2")
t_env.get_config().get_configuration().set_string(
    "python.fn-execution.bundle.size", "1")

import datetime

t = t_env.from_elements(
    [
        (1, 2, 3, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)),
        (1, 3, 1, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)),
        (1, 8, 5, datetime.datetime(2018, 3, 11, 4, 20, 0, 0))
    ],
    DataTypes.ROW(
        [DataTypes.FIELD("a", DataTypes.TINYINT()),
         DataTypes.FIELD("b", DataTypes.SMALLINT()),
         DataTypes.FIELD("c", DataTypes.INT()),
         DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))]))

# sink
t_env.execute_sql("""
        CREATE TABLE mySink (
          c INT,
          d FLOAT 
        ) WITH (
          'connector' = 'print'
        )
    """)

t_env.create_temporary_system_function("mean_udaf", mean_udaf)
t_env.register_function("max_add", udaf(MaxAdd(),
                                        result_type=DataTypes.INT(),
                                        func_type="pandas"))
t_env.register_table("T", t)
t_env.execute_sql("""
        insert into mySink
        select
         max_add(b, c)
         over (PARTITION BY a ORDER BY rowtime
         ROWS BETWEEN UNBOUNDED preceding AND 0 FOLLOWING),
         mean_udaf(b)
         over (PARTITION BY a ORDER BY rowtime
         ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING)
        from T
    """).wait()
'''
assert_equals(actual,
              ["5,4.3333335",
               "13,5.5",
               "6,4.3333335"])
'''{code}
The expected results are ["5,4.3333335", "13,5.5", "6,4.3333335"], but actual 
results are List(5,2.0, 13,5.5, 4,2.5). For ‘mean_udaf’ in the code as, by 
adding the error log, I found the input of udf in the taskmanager log as 
follows:
{code:java}
// linux-aarch64:
2023-01-11 10:08:02,140 ERROR 
/home/python3711/lib/python3.7/site-packages/pyflink/fn_execution/operations.py:231
 [] - debug
2023-01-11 10:08:02,141 ERROR 
/home/python3711/lib/python3.7/site-packages/pyflink/fn_execution/operations.py:232
 [] - [0    2
Name: b, dtype: int16]
2023-01-11 10:08:02,163 ERROR 
/home/python3711/lib/python3.7/site-packages/pyflink/fn_execution/operations.py:319
 [] - [0    2
1    3
Name: b, dtype: int16] 
2023-01-11 10:08:02,165 ERROR 
/home/python3711/lib/python3.7/site-packages/pyflink/fn_execution/operations.py:319
 [] - [0    2
1    3
2    8
Name: b, dtype: int16]{code}
But in linux-x86, it show correctly as follows:

 

 

 

 

 

 


> In linux-aarch64 environment, using “is” judgment to match the window type of 
> overwindow have returned incorrect matching results
> ---------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30637
>                 URL: https://issues.apache.org/jira/browse/FLINK-30637
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.13.6
>         Environment: Linux version 5.10.0-60.18.0.50.oe2203.aarch64
> (abuild@obs-worker-002) (gcc_old (GCC) 10.3.1, GNU ld (GNU Binutils) 2.37) #1 
> SMP Wed Mar 30 02:43:08 UTC 2022
>  
> pyflink-version:1.13.6
>            Reporter: Xin Chen
>            Priority: Major
>              Labels: pull-request-available
>
> In  linux-arch64 environment, “window_type is 
> OverWindow.ROW_UNBOUNDED_FOLLOWING” in  in the 
> PandasBatchOverWindowAggregateFunctionOperation class of the pyflink source 
> code has returned the wrong result.
> For example, when window_type is 6, it represents the window type of 
> ‘ROW_UNBOUNDED_FOLLOWING’, but “window_type is 
> OverWindow.ROW_UNBOUNDED_FOLLOWING” return false because the memory address 
> of window_type has changed. It will lead to the wrong type of window, such as 
> row sliding window, so, the wrong input data of python udf have been 
> assembled and wrong results of that have appeared.
>  
> Specifically, the pyflink unit testcase is 
> ‘test_over_window_aggregate_function’ in 
> ‘pyflink\table\tests\test_pandas_udaf.py’. It performance incorrectly when I 
> execute it by pytest on linux-aarch64 system. I cut this unit use case to the 
> following code and executed it in the flink standalone mode of aarch64 
> system, and got the same error result:
>  
> {code:java}
> import unittest
> from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
> from pyflink.table.udf import udaf, AggregateFunction
> class MaxAdd(AggregateFunction, unittest.TestCase):
>     def open(self, function_context):
>         mg = function_context.get_metric_group()
>         self.counter = mg.add_group("key", "value").counter("my_counter")
>         self.counter_sum = 0
>     def get_value(self, accumulator):
>         # counter
>         self.counter.inc(10)
>         self.counter_sum += 10
>         return accumulator[0]
>     def create_accumulator(self):
>         return []
>     def accumulate(self, accumulator, *args):
>         result = 0
>         for arg in args:
>             result += arg.max()
>         accumulator.append(result)
> @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
> def mean_udaf(v):
>     import logging
>     logging.error("debug")
>     logging.error(v)
>     return v.mean()
> t_env = TableEnvironment.create(
>     
> EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build())
> t_env.get_config().get_configuration().set_string("parallelism.default", "2")
> t_env.get_config().get_configuration().set_string(
>     "python.fn-execution.bundle.size", "1")
> import datetime
> t = t_env.from_elements(
>     [
>         (1, 2, 3, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)),
>         (1, 3, 1, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)),
>         (1, 8, 5, datetime.datetime(2018, 3, 11, 4, 20, 0, 0))
>     ],
>     DataTypes.ROW(
>         [DataTypes.FIELD("a", DataTypes.TINYINT()),
>          DataTypes.FIELD("b", DataTypes.SMALLINT()),
>          DataTypes.FIELD("c", DataTypes.INT()),
>          DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))]))
> # sink
> t_env.execute_sql("""
>         CREATE TABLE mySink (
>           c INT,
>           d FLOAT 
>         ) WITH (
>           'connector' = 'print'
>         )
>     """)
> t_env.create_temporary_system_function("mean_udaf", mean_udaf)
> t_env.register_function("max_add", udaf(MaxAdd(),
>                                         result_type=DataTypes.INT(),
>                                         func_type="pandas"))
> t_env.register_table("T", t)
> t_env.execute_sql("""
>         insert into mySink
>         select
>          max_add(b, c)
>          over (PARTITION BY a ORDER BY rowtime
>          ROWS BETWEEN UNBOUNDED preceding AND 0 FOLLOWING),
>          mean_udaf(b)
>          over (PARTITION BY a ORDER BY rowtime
>          ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING)
>         from T
>     """).wait()
> '''
> assert_equals(actual,
>               ["5,4.3333335",
>                "13,5.5",
>                "6,4.3333335"])
> '''{code}
> The expected results are ["5,4.3333335", "13,5.5", "6,4.3333335"], but actual 
> results are List(5,2.0, 13,5.5, 4,2.5). For ‘mean_udaf’ and 
> ‘OverWindow.UNBOUNDED FOLLOWING’ in the code, by adding the error log, I 
> found that when window_type is 6 and 'OverWindow.ROW_UNBOUNDED_FOLLOWING' 
> also represents 6, the following code from pyflink source code returned false.
> {code:java}
> // pyflink\fn_execution\operations.py (line 273)
> elif window_type is OverWindow.ROW_UNBOUNDED_FOLLOWING:
>     # row unbounded following window
>     window_start = window.lower_boundary
>     for j in range(input_cnt):
>         start = max(j + window_start, 0)
>         series_slices = [s.iloc[start: input_cnt] for s in input_series]
>         result.append(func(series_slices)){code}
> And it It finally chose row sliding window to assemble input data of 
> mean_udaf:
> {code:java}
> // pyflink\fn_execution\operations.py (line 280) 
> else:
>     # row sliding window
>     window_start = window.lower_boundary
>     window_end = window.upper_boundary
>     for j in range(input_cnt):
>         start = max(j + window_start, 0)
>         end = min(j + window_end + 1, input_cnt)
>         series_slices = [s.iloc[start: end] for s in input_series]
>         result.append(func(series_slices)){code}
> Obviously, that's not right. The right choice will be made in x86 environment.
> The reason is window_ type‘s memory address  is different from 
> ‘OverWindow.ROW_ UNBOUNDED_ FOLLOWING’ in linux-aarch64 environment. On the 
> contrary, they are the same in the linux-x86 environment. The reason why the 
> memory address is different is unknown yet. But I observed that window_type 
> comes from 'serialized_fn.windows':
> {code:java}
> def __init__(self, spec): 
> super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(spec) 
> self.windows = [window for window in self.spec.serialized_fn.windows]
> {code}
> Perhaps the grpc, protobuf dependand serialization operations in the arrch 
> environment have affected the memory address
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to