[ 
https://issues.apache.org/jira/browse/FLINK-23106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-23106:
---------------------------------
    Fix Version/s:     (was: 1.14.0)

> AppendOnlyTopNFunction should send retract first and then send insert
> ---------------------------------------------------------------------
>
>                 Key: FLINK-23106
>                 URL: https://issues.apache.org/jira/browse/FLINK-23106
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>            Reporter: Jingsong Lee
>            Priority: Major
>
> Case:
> Source1 ChangeLog --------------- -- --> 
> Source2 -> AppendRank(rownum=1) -> Join(key is partitionKey) -> 
> RetractRank(partition key is the pk of change log)
> Then will be {{throw new RuntimeException( "Can not retract a non-existent 
> record. This should never happen.")}} in RetractableTopNFunction.
>  
> In this case, the downstream will produce the wrong result, even the wrong 
> retraction (retract message that does not exist)
>  * Rank output partition key + rownum , the rank here has row_ rownum=1
>  * In the downstream join judgment rank output, rownum is 1, so assuming 
> partition key is PK, do PK based optimization
>  * Because select rownum, rank thinks that the PK of downstream data must be 
> partition key + rownum, the retraction message of partition key is out of 
> order, leading to problems in downstream PK based optimization
> Fix: there is a problem with the rank implementation. It should not result in 
> disordered retraction messages of partition key.
> Bypass: do not select rownum of rank output
> The bug code is:
> {code:java}
> while (iterator.hasNext() && isInRankEnd(currentRank)) {
>     Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
>     Collection<RowData> records = entry.getValue();
>     // meet its own sort key
>     if (!findsSortKey && entry.getKey().equals(sortKey)) {
>         currentRank += records.size();
>         currentRow = input;
>         findsSortKey = true;
>     } else if (findsSortKey) {
>         Iterator<RowData> recordsIter = records.iterator();
>         while (recordsIter.hasNext() && isInRankEnd(currentRank)) {
>             RowData prevRow = recordsIter.next();
>             collectUpdateBefore(out, prevRow, currentRank);
>             collectUpdateAfter(out, currentRow, currentRank);
>             currentRow = prevRow;
>             currentRank += 1;
>         }
>     } else {
>         currentRank += records.size();
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to