[ 
https://issues.apache.org/jira/browse/FLINK-36897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wang Qilong updated FLINK-36897:
--------------------------------
    Description: 
When I created the AbstractAsynchronous StateMapBundleOperator and inherited it 
from the AbstractAsynchronous StateStreamOperator, there was an error in the 
data passed into the element by the processElement of the AbstractAsynchronous 
StateMapBundleOperator itself

The inheritance relationship between asynchronous synchronization and two 
classes is:
AbstractMapBundleOperator->AbstractStreamOperator
AbstractAsyncStateMapBundleOperator->AbstractAsyncStateStreamOperator->AbstractStreamOperator
The reason for creating this class is to enable KeyedMapBundleOperator to 
support asynchronous running capability

Example of incorrect information: For example, the original data format was:
val data = new mutable.MutableList[(String, Long)]
data.+=(("x", 1L))
data.+=(("x", 2L))
data.+=(("x", 3L))
data.+=(("y", 1L))
data.+=(("y", 2L))
data.+=(("z", 3L))
So the result of data transmission becomes:
val data = new mutable.MutableList[(String, Long)]
data.+=(("x", 1L))
data.+=(("x", 2L))
data.+=(("x", 3L))
data.+=(("x", 1L))
data.+=(("x", 2L))
data.+=(("x", 3L))

How to reproduce:
Run testOverloadedAccumulator in sql/AggregateITCase.java in [1]

 

[1]  [https://github.com/Au-Miner/flink/tree/FLINK-36882]

  was:
When I created the AbstractAsynchronous StateMapBundleOperator and inherited it 
from the AbstractAsynchronous StateStreamOperator, there was an error in the 
data passed into the element by the processElement of the AbstractAsynchronous 
StateMapBundleOperator itself

The inheritance relationship between asynchronous synchronization and two 
classes is:
AbstractMapBundleOperator->AbstractStreamOperator
AbstractAsyncStateMapBundleOperator->AbstractAsyncStateStreamOperator->AbstractStreamOperator
The reason for creating this class is to enable KeyedMapBundleOperator to 
support asynchronous running capability

Example of incorrect information: For example, the original data format was:
val data = new mutable.MutableList[(String, Long)]
data.+=(("x", 1L))
data.+=(("x", 2L))
data.+=(("x", 3L))
data.+=(("y", 1L))
data.+=(("y", 2L))
data.+=(("z", 3L))
So the result of data transmission becomes:
val data = new mutable.MutableList[(String, Long)]
data.+=(("x", 1L))
data.+=(("x", 2L))
data.+=(("x", 3L))
data.+=(("x", 1L))
data.+=(("x", 2L))
data.+=(("x", 3L))

How to reproduce:
Run testOverloadedAccumulator in SQL/AggregateITCase.jva in [1]

 

[1]  https://github.com/Au-Miner/flink/tree/FLINK-36882


> Error executing processElement when inheriting from 
> AbstractAsyncStateStreamOperator
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-36897
>                 URL: https://issues.apache.org/jira/browse/FLINK-36897
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 2.0.0
>            Reporter: Wang Qilong
>            Priority: Major
>
> When I created the AbstractAsynchronous StateMapBundleOperator and inherited 
> it from the AbstractAsynchronous StateStreamOperator, there was an error in 
> the data passed into the element by the processElement of the 
> AbstractAsynchronous StateMapBundleOperator itself
> The inheritance relationship between asynchronous synchronization and two 
> classes is:
> AbstractMapBundleOperator->AbstractStreamOperator
> AbstractAsyncStateMapBundleOperator->AbstractAsyncStateStreamOperator->AbstractStreamOperator
> The reason for creating this class is to enable KeyedMapBundleOperator to 
> support asynchronous running capability
> Example of incorrect information: For example, the original data format was:
> val data = new mutable.MutableList[(String, Long)]
> data.+=(("x", 1L))
> data.+=(("x", 2L))
> data.+=(("x", 3L))
> data.+=(("y", 1L))
> data.+=(("y", 2L))
> data.+=(("z", 3L))
> So the result of data transmission becomes:
> val data = new mutable.MutableList[(String, Long)]
> data.+=(("x", 1L))
> data.+=(("x", 2L))
> data.+=(("x", 3L))
> data.+=(("x", 1L))
> data.+=(("x", 2L))
> data.+=(("x", 3L))
> How to reproduce:
> Run testOverloadedAccumulator in sql/AggregateITCase.java in [1]
>  
> [1]  [https://github.com/Au-Miner/flink/tree/FLINK-36882]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to