Feng Jin created FLINK-33936:
--------------------------------
Summary: Mini-batch should output the result when the result is
same as last if TTL is setted.
Key: FLINK-33936
URL: https://issues.apache.org/jira/browse/FLINK-33936
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.18.0
Reporter: Feng Jin
If mini-batch is enabled currently, and if the aggregated result is the same as
the previous output, this time's aggregation result will not be sent
downstream. The specific logic is as follows. This will cause downstream nodes
to not receive updated data. If there is a TTL set for states at this time, the
TTL of downstream will not be updated either.
https://github.com/hackergin/flink/blob/a18c0cd3f0cdfd7e0acb53283f40cd2033a86472/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L224
{code:java}
if (!equaliser.equals(prevAggValue, newAggValue)) {
// new row is not same with prev row
if (generateUpdateBefore) {
// prepare UPDATE_BEFORE message for previous row
resultRow
.replace(currentKey, prevAggValue)
.setRowKind(RowKind.UPDATE_BEFORE);
out.collect(resultRow);
}
// prepare UPDATE_AFTER message for new row
resultRow.replace(currentKey,
newAggValue).setRowKind(RowKind.UPDATE_AFTER);
out.collect(resultRow);
}
// new row is same with prev row, no need to output
{code}
When mini-batch is not enabled, even if the aggregation result of this time is
the same as last time, new results will still be sent if TTL is set.
https://github.com/hackergin/flink/blob/e9353319ad625baa5b2c20fa709ab5b23f83c0f4/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L170
{code:java}
if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue,
newAggValue)) {
// newRow is the same as before and state cleaning is not
enabled.
// We do not emit retraction and acc message.
// If state cleaning is enabled, we have to emit messages
to prevent too early
// state eviction of downstream operators.
return;
} else {
// retract previous result
if (generateUpdateBefore) {
// prepare UPDATE_BEFORE message for previous row
resultRow
.replace(currentKey, prevAggValue)
.setRowKind(RowKind.UPDATE_BEFORE);
out.collect(resultRow);
}
// prepare UPDATE_AFTER message for new row
resultRow.replace(currentKey,
newAggValue).setRowKind(RowKind.UPDATE_AFTER);
}
{code}
Therefore, based on the consideration of TTL scenarios, I believe that when
mini-batch aggregation is enabled, new results should also be issued when the
aggregated result is the same as the previous one.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)