[ 
https://issues.apache.org/jira/browse/FLINK-36897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17905361#comment-17905361
 ] 

Zakelly Lan commented on FLINK-36897:
-------------------------------------

I had a glance at your commit, in 
`AsyncStateMiniBatchGroupAggFunction#finishBundle` you should not do 
`ctx.setCurrentKey(currentKey);` which is a sync processing method. The 
`AbstractAsyncStateStreamOperator#asyncProcessWithKey` is prepared for this 
scenario. 

Given the minibatch is obviously more difficult to process with async state, my 
advice is to hold back the specific sql implementations until there is a 
unified framework for minibatch with async state. 

> Error executing processElement when inheriting from 
> AbstractAsyncStateStreamOperator
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-36897
>                 URL: https://issues.apache.org/jira/browse/FLINK-36897
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 2.0.0
>            Reporter: Wang Qilong
>            Priority: Major
>
> When I created the AbstractAsynchronous StateMapBundleOperator and inherited 
> it from the AbstractAsynchronous StateStreamOperator, there was an error in 
> the data passed into the element by the processElement of the 
> AbstractAsynchronous StateMapBundleOperator itself
> The inheritance relationship between asynchronous synchronization and two 
> classes is:
> AbstractMapBundleOperator->AbstractStreamOperator
> AbstractAsyncStateMapBundleOperator->AbstractAsyncStateStreamOperator->AbstractStreamOperator
> The reason for creating this class is to enable KeyedMapBundleOperator to 
> support asynchronous running capability
> Example of incorrect information: For example, the original data format was:
> val data = new mutable.MutableList[(String, Long)]
> data.+=(("x", 1L))
> data.+=(("x", 2L))
> data.+=(("x", 3L))
> data.+=(("y", 1L))
> data.+=(("y", 2L))
> data.+=(("z", 3L))
> So the result of data transmission becomes:
> val data = new mutable.MutableList[(String, Long)]
> data.+=(("x", 1L))
> data.+=(("x", 2L))
> data.+=(("x", 3L))
> data.+=(("x", 1L))
> data.+=(("x", 2L))
> data.+=(("x", 3L))
> How to reproduce:
> Run testOverloadedAccumulator() in sql/AggregateITCase.java in [1]
>  
> [1]  [https://github.com/Au-Miner/flink/tree/FLINK-36882]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to