Wang Qilong created FLINK-36897:
-----------------------------------

             Summary: Error in calling processElement for AbstractAsynchronous 
StateStreamOperator
                 Key: FLINK-36897
                 URL: https://issues.apache.org/jira/browse/FLINK-36897
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 2.0.0
            Reporter: Wang Qilong


When I created the AbstractAsynchronous StateMapBundleOperator and inherited it 
from the AbstractAsynchronous StateStreamOperator, there was an error in the 
data passed into the element by the processElement of the AbstractAsynchronous 
StateMapBundleOperator itself

The inheritance relationship between asynchronous synchronization and two 
classes is:
AbstractMapBundleOperator->AbstractStreamOperator
AbstractAsyncStateMapBundleOperator->AbstractAsyncStateStreamOperator->AbstractStreamOperator
The reason for creating this class is to enable KeyedMapBundleOperator to 
support asynchronous running capability

Example of incorrect information: For example, the original data format was:
val data = new mutable.MutableList[(String, Long)]
data.+=(("x", 1L))
data.+=(("x", 2L))
data.+=(("x", 3L))
data.+=(("y", 1L))
data.+=(("y", 2L))
data.+=(("z", 3L))
So the result of data transmission becomes:
val data = new mutable.MutableList[(String, Long)]
data.+=(("x", 1L))
data.+=(("x", 2L))
data.+=(("x", 3L))
data.+=(("x", 1L))
data.+=(("x", 2L))
data.+=(("x", 3L))

How to reproduce:
Run testOverloadedAccumulator in SQL/AggregateITCase.jva in [1]

 

[1]  https://github.com/Au-Miner/flink/tree/FLINK-36882



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to