[
https://issues.apache.org/jira/browse/FLINK-33936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Feng Jin updated FLINK-33936:
-----------------------------
Summary: Mini-batch agg should output the result when the result is same as
last if TTL is setted. (was: Mini-batch should output the result when the
result is same as last if TTL is setted.)
> Mini-batch agg should output the result when the result is same as last if
> TTL is setted.
> -----------------------------------------------------------------------------------------
>
> Key: FLINK-33936
> URL: https://issues.apache.org/jira/browse/FLINK-33936
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.18.0
> Reporter: Feng Jin
> Priority: Major
>
> If mini-batch is enabled currently, and if the aggregated result is the same
> as the previous output, this current aggregation result will not be sent
> downstream. This will cause downstream nodes to not receive updated data. If
> there is a TTL set for states at this time, the TTL of downstream will not be
> updated either.
> The specific logic is as follows.
> https://github.com/apache/flink/blob/a18c0cd3f0cdfd7e0acb53283f40cd2033a86472/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L224
> {code:java}
> if (!equaliser.equals(prevAggValue, newAggValue)) {
> // new row is not same with prev row
> if (generateUpdateBefore) {
> // prepare UPDATE_BEFORE message for previous row
> resultRow
> .replace(currentKey, prevAggValue)
> .setRowKind(RowKind.UPDATE_BEFORE);
> out.collect(resultRow);
> }
> // prepare UPDATE_AFTER message for new row
> resultRow.replace(currentKey,
> newAggValue).setRowKind(RowKind.UPDATE_AFTER);
> out.collect(resultRow);
> }
> // new row is same with prev row, no need to output
> {code}
> When mini-batch is not enabled, even if the aggregation result of this time
> is the same as last time, new results will still be sent if TTL is set.
> https://github.com/apache/flink/blob/e9353319ad625baa5b2c20fa709ab5b23f83c0f4/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L170
> {code:java}
> if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue,
> newAggValue)) {
> // newRow is the same as before and state cleaning is not
> enabled.
> // We do not emit retraction and acc message.
> // If state cleaning is enabled, we have to emit messages
> to prevent too early
> // state eviction of downstream operators.
> return;
> } else {
> // retract previous result
> if (generateUpdateBefore) {
> // prepare UPDATE_BEFORE message for previous row
> resultRow
> .replace(currentKey, prevAggValue)
> .setRowKind(RowKind.UPDATE_BEFORE);
> out.collect(resultRow);
> }
> // prepare UPDATE_AFTER message for new row
> resultRow.replace(currentKey,
> newAggValue).setRowKind(RowKind.UPDATE_AFTER);
> }
> {code}
> Therefore, based on the consideration of TTL scenarios, I believe that when
> mini-batch aggregation is enabled, new results should also be issued when the
> aggregated result is the same as the previous one.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)