Wang Qilong created FLINK-36897: ----------------------------------- Summary: Error in calling processElement for AbstractAsynchronous StateStreamOperator Key: FLINK-36897 URL: https://issues.apache.org/jira/browse/FLINK-36897 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 2.0.0 Reporter: Wang Qilong
When I created the AbstractAsynchronous StateMapBundleOperator and inherited it from the AbstractAsynchronous StateStreamOperator, there was an error in the data passed into the element by the processElement of the AbstractAsynchronous StateMapBundleOperator itself The inheritance relationship between asynchronous synchronization and two classes is: AbstractMapBundleOperator->AbstractStreamOperator AbstractAsyncStateMapBundleOperator->AbstractAsyncStateStreamOperator->AbstractStreamOperator The reason for creating this class is to enable KeyedMapBundleOperator to support asynchronous running capability Example of incorrect information: For example, the original data format was: val data = new mutable.MutableList[(String, Long)] data.+=(("x", 1L)) data.+=(("x", 2L)) data.+=(("x", 3L)) data.+=(("y", 1L)) data.+=(("y", 2L)) data.+=(("z", 3L)) So the result of data transmission becomes: val data = new mutable.MutableList[(String, Long)] data.+=(("x", 1L)) data.+=(("x", 2L)) data.+=(("x", 3L)) data.+=(("x", 1L)) data.+=(("x", 2L)) data.+=(("x", 3L)) How to reproduce: Run testOverloadedAccumulator in SQL/AggregateITCase.jva in [1] [1] https://github.com/Au-Miner/flink/tree/FLINK-36882 -- This message was sent by Atlassian Jira (v8.20.10#820010)