[
https://issues.apache.org/jira/browse/FLINK-35661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-35661:
-----------------------------------
Labels: pull-request-available (was: )
> MiniBatchGroupAggFunction can silently drop records under certain conditions
> ----------------------------------------------------------------------------
>
> Key: FLINK-35661
> URL: https://issues.apache.org/jira/browse/FLINK-35661
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Reporter: Ivan Burmistrov
> Assignee: Natea Eshetu Beshada
> Priority: Blocker
> Labels: pull-request-available
> Attachments: image-2024-06-20-10-46-51-347.png,
> image-2024-06-20-10-50-20-867.png, image-2024-06-20-11-05-53-253.png,
> image.png
>
>
> h2. The story / Symptoms
> One day we changed a bit our Flink job that utilizes Flink SQL via adding a
> couple of UDF-based aggregations (it's not important what these aggregations
> are doing) and surprisingly the job started working incorrectly - producing
> wrong results for some aggregation keys, or not producing some keys at all.
> The symptoms were really weird. For instance, the read / write access rate
> to accState (the internal state used by Table SQL for group by aggregations)
> dropped sharply. On the screenshot you see the comparison of read rate to
> this state with the similar chart 1d ago - they should behave the same, yet
> we see a big difference that after the change. Similar picture was about
> write rate.
> !image-2024-06-20-10-50-20-867.png|width=770,height=338!
> Another interesting observation was that GroupAggregate operator (the one
> from Table SQL responsible for group by aggregation) behaved weirdly: the
> number of "records out" was disproportionally less than the number of
> "records in". By itself it doesn't mean anything, but combined with our other
> observations about the job producing wrong results - this seems suspicious.
> !image-2024-06-20-11-05-53-253.png|width=767,height=316!
> h2. Digging deeper
> After reverting the change things got back to normal. And we concluded that
> adding new UDF-based aggregations caused the issue. Then we realized that we
> accidentally forgot to implement *merge* method in our UDF and this caused
> the planner to fallback to
> ONE_PHASE aggregation instead of TWO_PHASE. After fixing the mistake and
> implementing *merge* _things got back to normal._
> Moreover, __ we realized that UDF actually has nothing to do with the issue
> (except for causing that ONE_PHASE fallback). So we reverted all the changes
> and tested the job in ONE_PHASE. *The issue was happening in such a mode.*
> So, summarizing: *when the job has mini-batch enabled, ONE_PHASE aggregation
> works incorrectly.*{*}{*}
> h2. The bug
> It was clear that the issue has something to do with
> [MiniBatchGroupAggFunction|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L57]
> because this is what distinguish ONE_PHASE from TWO_PHASE mode.
>
> After reading the code, we found this interesting fragment:
> {code:java}
> @Override
> public void finishBundle(Map<RowData, List<RowData>> buffer,
> Collector<RowData> out)
> throws Exception {
> for (Map.Entry<RowData, List<RowData>> entry : buffer.entrySet()) {
> RowData currentKey = entry.getKey();
> List<RowData> inputRows = entry.getValue();
> boolean firstRow = false;
> // step 1: get the accumulator for the current key
> // set current key to access state under the key
> ctx.setCurrentKey(currentKey);
> RowData acc = accState.value();
> if (acc == null) {
> // Don't create a new accumulator for a retraction message.
> This
> // might happen if the retraction message is the first
> message for the
> // key or after a state clean up.
> Iterator<RowData> inputIter = inputRows.iterator();
> while (inputIter.hasNext()) {
> RowData current = inputIter.next();
> if (isRetractMsg(current)) {
> inputIter.remove(); // remove all the beginning
> retraction messages
> } else {
> break;
> }
> }
> if (inputRows.isEmpty()) {
> return; // !!!! <--- this is bad !!!!
> }
> acc = function.createAccumulators();
> firstRow = true;
> }
> // ...
> }
> {code}
> In this code we iterate over the whole bundle key by key and at some point do
> this:
> {code:java}
> if (inputRows.isEmpty()) {
> return;
> }{code}
> Obviously, what was meant here is continue (i.e.: finish with the current
> key, move to the next), not the full stop.
>
>
> This line is reached when the bundle contains a key that has only retraction
> messages - in this case the code below would result in inputRows being empty:
>
> {code:java}
> while (inputIter.hasNext()) {
> RowData current = inputIter.next();
> if (isRetractMsg(current)) {
> inputIter.remove(); // remove all the beginning
> retraction messages
> } else {
> break;
> }
> }{code}
>
> h2. Summary / conditions
> To summarize, the bug is triggering when:
> # Mini-batch is enabled
> # ONE_PHASE aggregation phase is working
> # Mini-batch bundle contains keys having only retraction messages
> When such conditions are met, MiniBatchGroupAggFunction may drop some records.
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)