[ 
https://issues.apache.org/jira/browse/FLINK-33936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17803395#comment-17803395
 ] 

xuyang commented on FLINK-33936:
--------------------------------

Thanks for your report. This bug seems to be that when optimizations about 
mini-batch agg were first introduced, some behaviors in the group agg function 
were not aligned. I think we need to fix it.

> The aggregation of mini-batches should output the result even if the result 
> is the same as before when TTL is configured.
> -------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33936
>                 URL: https://issues.apache.org/jira/browse/FLINK-33936
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.18.0
>            Reporter: Feng Jin
>            Priority: Major
>
> If mini-batch is enabled currently, and if the aggregated result is the same 
> as the previous output, this current aggregation result will not be sent 
> downstream.  This will cause downstream nodes to not receive updated data. If 
> there is a TTL set for states at this time, the TTL of downstream will not be 
> updated either.
> The specific logic is as follows.
> https://github.com/apache/flink/blob/a18c0cd3f0cdfd7e0acb53283f40cd2033a86472/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L224
> {code:java}
>                     if (!equaliser.equals(prevAggValue, newAggValue)) {
>                         // new row is not same with prev row
>                         if (generateUpdateBefore) {
>                             // prepare UPDATE_BEFORE message for previous row
>                             resultRow
>                                     .replace(currentKey, prevAggValue)
>                                     .setRowKind(RowKind.UPDATE_BEFORE);
>                             out.collect(resultRow);
>                         }
>                         // prepare UPDATE_AFTER message for new row
>                         resultRow.replace(currentKey, 
> newAggValue).setRowKind(RowKind.UPDATE_AFTER);
>                         out.collect(resultRow);
>                     }
>                     // new row is same with prev row, no need to output
> {code}
> When mini-batch is not enabled, even if the aggregation result of this time 
> is the same as last time, new results will still be sent if TTL is set.
> https://github.com/apache/flink/blob/e9353319ad625baa5b2c20fa709ab5b23f83c0f4/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L170
> {code:java}
>                 if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue, 
> newAggValue)) {
>                     // newRow is the same as before and state cleaning is not 
> enabled.
>                     // We do not emit retraction and acc message.
>                     // If state cleaning is enabled, we have to emit messages 
> to prevent too early
>                     // state eviction of downstream operators.
>                     return;
>                 } else {
>                     // retract previous result
>                     if (generateUpdateBefore) {
>                         // prepare UPDATE_BEFORE message for previous row
>                         resultRow
>                                 .replace(currentKey, prevAggValue)
>                                 .setRowKind(RowKind.UPDATE_BEFORE);
>                         out.collect(resultRow);
>                     }
>                     // prepare UPDATE_AFTER message for new row
>                     resultRow.replace(currentKey, 
> newAggValue).setRowKind(RowKind.UPDATE_AFTER);
>                 }
> {code}
> Therefore, based on the consideration of TTL scenarios, I believe that when 
> mini-batch aggregation is enabled, new results should also output when the 
> aggregated result is the same as the previous one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to