[
https://issues.apache.org/jira/browse/FLINK-36897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17905361#comment-17905361
]
Zakelly Lan commented on FLINK-36897:
-------------------------------------
I had a glance at your commit, in
`AsyncStateMiniBatchGroupAggFunction#finishBundle` you should not do
`ctx.setCurrentKey(currentKey);` which is a sync processing method. The
`AbstractAsyncStateStreamOperator#asyncProcessWithKey` is prepared for this
scenario.
Given the minibatch is obviously more difficult to process with async state, my
advice is to hold back the specific sql implementations until there is a
unified framework for minibatch with async state.
> Error executing processElement when inheriting from
> AbstractAsyncStateStreamOperator
> ------------------------------------------------------------------------------------
>
> Key: FLINK-36897
> URL: https://issues.apache.org/jira/browse/FLINK-36897
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 2.0.0
> Reporter: Wang Qilong
> Priority: Major
>
> When I created the AbstractAsynchronous StateMapBundleOperator and inherited
> it from the AbstractAsynchronous StateStreamOperator, there was an error in
> the data passed into the element by the processElement of the
> AbstractAsynchronous StateMapBundleOperator itself
> The inheritance relationship between asynchronous synchronization and two
> classes is:
> AbstractMapBundleOperator->AbstractStreamOperator
> AbstractAsyncStateMapBundleOperator->AbstractAsyncStateStreamOperator->AbstractStreamOperator
> The reason for creating this class is to enable KeyedMapBundleOperator to
> support asynchronous running capability
> Example of incorrect information: For example, the original data format was:
> val data = new mutable.MutableList[(String, Long)]
> data.+=(("x", 1L))
> data.+=(("x", 2L))
> data.+=(("x", 3L))
> data.+=(("y", 1L))
> data.+=(("y", 2L))
> data.+=(("z", 3L))
> So the result of data transmission becomes:
> val data = new mutable.MutableList[(String, Long)]
> data.+=(("x", 1L))
> data.+=(("x", 2L))
> data.+=(("x", 3L))
> data.+=(("x", 1L))
> data.+=(("x", 2L))
> data.+=(("x", 3L))
> How to reproduce:
> Run testOverloadedAccumulator() in sql/AggregateITCase.java in [1]
>
> [1] [https://github.com/Au-Miner/flink/tree/FLINK-36882]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)