[
https://issues.apache.org/jira/browse/FLINK-34129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816250#comment-17816250
]
Jeyhun Karimov commented on FLINK-34129:
----------------------------------------
Hi [~loserwang1024], [~xuyangzhong] I am not sure if it a bug or expected
behaviour in local-global aggregation.
Partitioned aggregates (see {{GroupAggFunction::processElement}}) solve the
above-mentioned issue by tracking the {{firstRow}} and avoid sending the first
row to {{retract}} function. In this case, since the state partitioned and
there is only one operator instance responsible for the partition, we can avoid
the above mentioned behaviour.
In the presence of local-global aggregates, however:
- it is difficult to prevent the above-mentioned behaviour in
{{LocalGroupAggFunction}} instances, since there can be multiple of
{{LocalGroupAggFunction}} instances, and there is no ordering among them ( to
track {{firstRow}} and to avoid it being retracted)
- it is difficult to prefent the above-mentioned behaviour in
{{GlobalGroupAggFunction}} instances, since it already receives pre-aggregated
data.
Currently, the only way to avoid this behavior is to either
- Use the {{firstRow}} tracking (similar to
{{GroupAggFunction::processElement}}) in {{LocalGroupAggFunction}} AND use
parallelism 1
- Use the partitioned aggregates
> MiniBatchGlobalGroupAggFunction will make -D as +I then make +I as -U when
> state expired
> -----------------------------------------------------------------------------------------
>
> Key: FLINK-34129
> URL: https://issues.apache.org/jira/browse/FLINK-34129
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.18.1
> Reporter: Hongshun Wang
> Priority: Major
> Fix For: 1.19.0
>
>
> Take sum for example:
> When state is expired, then an update operation from source happens.
> MiniBatchGlobalGroupAggFunction take -U[1, 20] and +U[1, 20] as input, but
> will emit +I[1, -20] and -D[1, -20]. The sink will detele the data from
> external database.
> Let's see why this will happens:
> * when state is expired and -U[1, 20] arrive,
> MiniBatchGlobalGroupAggFunction will create a new sum accumulator and set
> firstRow as true.
> {code:java}
> if (stateAcc == null) {
> stateAcc = globalAgg.createAccumulators();
> firstRow = true;
> } {code}
> * then sum accumulator will retract sum value as -20
> * As the first row, MiniBatchGlobalGroupAggFunction will change -U as +I,
> then emit to downstream.
> {code:java}
> if (!recordCounter.recordCountIsZero(acc)) {
> // if this was not the first row and we have to emit retractions
> if (!firstRow) {
> // ignore
> } else {
> // update acc to state
> accState.update(acc);
>
> // this is the first, output new result
> // prepare INSERT message for new row
> resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT);
> out.collect(resultRow);
> } {code}
> * when next +U[1, 20] arrives, sum accumulator will retract sum value as 0,
> so RetractionRecordCounter#recordCountIsZero will return true. Because
> firstRow = false now, will change the +U as -D, then emit to downtream.
> {code:java}
> if (!recordCounter.recordCountIsZero(acc)) {
> // ignode
> }else{
> // we retracted the last record for this key
> // if this is not first row sent out a DELETE message
> if (!firstRow) {
> // prepare DELETE message for previous row
> resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE);
> out.collect(resultRow);
> } {code}
>
> So the sink will receiver +I and -D after a source update operation, the data
> will be delete.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)