When using SELECT DISTINCT in Flink SQL I found that when a new message with
the same key arrives, it will emit a retract and a new insert message.
According to  JIRA-FLINK-8566
<https://issues.apache.org/jira/browse/FLINK-8566>   and  JIRA-FLINK-8564
<https://issues.apache.org/jira/browse/FLINK-8564>  , it's a feature to
prevent too early state eviction of downstream operators when state cleaning
is enabled. However, when I enabled  MiniBatch Aggregation
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation>
 
, it stops emitting those retract and new messages. 

According to the source code of  GroupAggFunction
<https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L183>
  
and  MiniBatchGroupAggFunction
<https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L206>
 
, when MiniBatch Aggregation enabled, it does not emit those messages.

But why not? I don't think MiniBatch Aggregation can avoid state eviction of
downstream operators anyway.

*GroupAggFunction.java:*
 1 2 3 4 5 6 7 8 910111213141516
if (!stateCleaningEnabled &amp;&amp; equaliser.equals(prevAggValue,
newAggValue)) { // newRow is the same as before and state cleaning is not
enabled.        // We do not emit retraction and acc message.   // If state 
cleaning
is enabled, we have to emit messages to prevent too early       // state 
eviction
of downstream operators.        return;} else { // retract previous result      
if
(generateUpdateBefore) {                // prepare UPDATE_BEFORE message for 
previous row       
resultRow.replace(currentKey,
prevAggValue).setRowKind(RowKind.UPDATE_BEFORE);                
out.collect(resultRow); }
// prepare UPDATE_AFTER message for new row     resultRow.replace(currentKey,
newAggValue).setRowKind(RowKind.UPDATE_AFTER);}


*MiniBatchGroupAggFunction.java:*
 1 2 3 4 5 6 7 8 9101112
if (!equaliser.equals(prevAggValue, newAggValue)) {     // new row is not same
with prev row   if (generateUpdateBefore) {             // prepare 
UPDATE_BEFORE message
for previous row                resultRow.replace(currentKey,
prevAggValue).setRowKind(RowKind.UPDATE_BEFORE);                
out.collect(resultRow); }
// prepare UPDATE_AFTER message for new row     resultRow.replace(currentKey,
newAggValue).setRowKind(RowKind.UPDATE_AFTER);  out.collect(resultRow);}//
new row is same with prev row, no need to output


Flink Version: 1.12-SNAPSHOT( GitHub Master
<https://github.com/apache/flink>  )
Planner: Blink Planner



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/

Reply via email to