When using SELECT DISTINCT in Flink SQL I found that when a new message with the same key arrives, it will emit a retract and a new insert message. According to JIRA-FLINK-8566 <https://issues.apache.org/jira/browse/FLINK-8566> and JIRA-FLINK-8564 <https://issues.apache.org/jira/browse/FLINK-8564> , it's a feature to prevent too early state eviction of downstream operators when state cleaning is enabled. However, when I enabled MiniBatch Aggregation <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation> , it stops emitting those retract and new messages.
According to the source code of GroupAggFunction <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L183> and MiniBatchGroupAggFunction <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L206> , when MiniBatch Aggregation enabled, it does not emit those messages. But why not? I don't think MiniBatch Aggregation can avoid state eviction of downstream operators anyway. *GroupAggFunction.java:* 1 2 3 4 5 6 7 8 910111213141516 if (!stateCleaningEnabled && equaliser.equals(prevAggValue, newAggValue)) { // newRow is the same as before and state cleaning is not enabled. // We do not emit retraction and acc message. // If state cleaning is enabled, we have to emit messages to prevent too early // state eviction of downstream operators. return;} else { // retract previous result if (generateUpdateBefore) { // prepare UPDATE_BEFORE message for previous row resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.UPDATE_BEFORE); out.collect(resultRow); } // prepare UPDATE_AFTER message for new row resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER);} *MiniBatchGroupAggFunction.java:* 1 2 3 4 5 6 7 8 9101112 if (!equaliser.equals(prevAggValue, newAggValue)) { // new row is not same with prev row if (generateUpdateBefore) { // prepare UPDATE_BEFORE message for previous row resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.UPDATE_BEFORE); out.collect(resultRow); } // prepare UPDATE_AFTER message for new row resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER); out.collect(resultRow);}// new row is same with prev row, no need to output Flink Version: 1.12-SNAPSHOT( GitHub Master <https://github.com/apache/flink> ) Planner: Blink Planner -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/