[ 
https://issues.apache.org/jira/browse/FLINK-35661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35661:
-----------------------------------
    Labels: pull-request-available  (was: )

> MiniBatchGroupAggFunction can silently drop records under certain conditions
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-35661
>                 URL: https://issues.apache.org/jira/browse/FLINK-35661
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>            Reporter: Ivan Burmistrov
>            Assignee: Natea Eshetu Beshada
>            Priority: Blocker
>              Labels: pull-request-available
>         Attachments: image-2024-06-20-10-46-51-347.png, 
> image-2024-06-20-10-50-20-867.png, image-2024-06-20-11-05-53-253.png, 
> image.png
>
>
> h2. The story / Symptoms
> One day we changed a bit our Flink job that utilizes Flink SQL via adding a 
> couple of UDF-based aggregations (it's not important what these aggregations 
> are doing) and surprisingly the job started working incorrectly - producing 
> wrong results for some aggregation keys, or not producing some keys at all.
> The symptoms were really weird.  For instance, the read / write access rate 
> to accState (the internal state used by Table SQL for group by aggregations) 
> dropped sharply. On the screenshot you see the comparison of read rate to 
> this state with the similar chart 1d ago - they should behave the same, yet 
> we see a big difference that after the change. Similar picture was about 
> write rate.
> !image-2024-06-20-10-50-20-867.png|width=770,height=338!
> Another interesting observation was that GroupAggregate operator (the one 
> from Table SQL responsible for group by aggregation) behaved weirdly: the 
> number of "records out" was disproportionally less than the number of 
> "records in". By itself it doesn't mean anything, but combined with our other 
> observations about the job producing wrong results - this seems suspicious.
> !image-2024-06-20-11-05-53-253.png|width=767,height=316!
> h2. Digging deeper
> After reverting the change things got back to normal. And we concluded that 
> adding new UDF-based aggregations caused the issue. Then we realized that we 
> accidentally forgot to implement *merge* method in our UDF and this caused 
> the planner to fallback to 
> ONE_PHASE aggregation instead of TWO_PHASE. After fixing the mistake and 
> implementing *merge* _things got back to normal._ 
> Moreover, __ we realized that UDF actually has nothing to do with the issue 
> (except for causing that ONE_PHASE fallback). So we reverted all the changes 
> and tested the job in ONE_PHASE. *The issue was happening in such a mode.* 
> So, summarizing: *when the job has mini-batch enabled, ONE_PHASE aggregation 
> works incorrectly.*{*}{*}
> h2. The bug
> It was clear that the issue has something to do with 
> [MiniBatchGroupAggFunction|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L57]
>  because this is what distinguish ONE_PHASE from TWO_PHASE mode.
>  
> After reading the code, we found this interesting fragment:
> {code:java}
>     @Override
>     public void finishBundle(Map<RowData, List<RowData>> buffer, 
> Collector<RowData> out)
>             throws Exception {
>         for (Map.Entry<RowData, List<RowData>> entry : buffer.entrySet()) {
>             RowData currentKey = entry.getKey();
>             List<RowData> inputRows = entry.getValue();
>             boolean firstRow = false;
>             // step 1: get the accumulator for the current key
>             // set current key to access state under the key
>             ctx.setCurrentKey(currentKey);
>             RowData acc = accState.value();
>             if (acc == null) {
>                 // Don't create a new accumulator for a retraction message. 
> This
>                 // might happen if the retraction message is the first 
> message for the
>                 // key or after a state clean up.
>                 Iterator<RowData> inputIter = inputRows.iterator();
>                 while (inputIter.hasNext()) {
>                     RowData current = inputIter.next();
>                     if (isRetractMsg(current)) {
>                         inputIter.remove(); // remove all the beginning 
> retraction messages
>                     } else {
>                         break;
>                     }
>                 }
>                 if (inputRows.isEmpty()) {
>                     return; // !!!! <--- this is bad !!!!
>                 }
>                 acc = function.createAccumulators();
>                 firstRow = true;
>             }
>    // ...
> }
> {code}
> In this code we iterate over the whole bundle key by key and at some point do 
> this:
> {code:java}
> if (inputRows.isEmpty()) {
>      return; 
> }{code}
> Obviously, what was meant here is continue (i.e.: finish with the current 
> key, move to the next), not the full stop.
>  
>  
> This line is reached when the bundle contains a key that has only retraction 
> messages - in this case the code below would result in inputRows being empty:
>  
> {code:java}
> while (inputIter.hasNext()) {
>                     RowData current = inputIter.next();
>                     if (isRetractMsg(current)) {
>                         inputIter.remove(); // remove all the beginning 
> retraction messages
>                     } else {
>                         break;
>                     }
>                 }{code}
>  
> h2. Summary / conditions
> To summarize, the bug is triggering when:
>  # Mini-batch is enabled
>  # ONE_PHASE aggregation phase is working
>  # Mini-batch bundle contains keys having only retraction messages
> When such conditions are met, MiniBatchGroupAggFunction may drop some records.
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to