[
https://issues.apache.org/jira/browse/FLINK-23106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jingsong Lee updated FLINK-23106:
---------------------------------
Fix Version/s: (was: 1.14.0)
> AppendOnlyTopNFunction should send retract first and then send insert
> ---------------------------------------------------------------------
>
> Key: FLINK-23106
> URL: https://issues.apache.org/jira/browse/FLINK-23106
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Reporter: Jingsong Lee
> Priority: Major
>
> Case:
> Source1 ChangeLog --------------- -- -->
> Source2 -> AppendRank(rownum=1) -> Join(key is partitionKey) ->
> RetractRank(partition key is the pk of change log)
> Then will be {{throw new RuntimeException( "Can not retract a non-existent
> record. This should never happen.")}} in RetractableTopNFunction.
>
> In this case, the downstream will produce the wrong result, even the wrong
> retraction (retract message that does not exist)
> * Rank output partition key + rownum , the rank here has row_ rownum=1
> * In the downstream join judgment rank output, rownum is 1, so assuming
> partition key is PK, do PK based optimization
> * Because select rownum, rank thinks that the PK of downstream data must be
> partition key + rownum, the retraction message of partition key is out of
> order, leading to problems in downstream PK based optimization
> Fix: there is a problem with the rank implementation. It should not result in
> disordered retraction messages of partition key.
> Bypass: do not select rownum of rank output
> The bug code is:
> {code:java}
> while (iterator.hasNext() && isInRankEnd(currentRank)) {
> Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
> Collection<RowData> records = entry.getValue();
> // meet its own sort key
> if (!findsSortKey && entry.getKey().equals(sortKey)) {
> currentRank += records.size();
> currentRow = input;
> findsSortKey = true;
> } else if (findsSortKey) {
> Iterator<RowData> recordsIter = records.iterator();
> while (recordsIter.hasNext() && isInRankEnd(currentRank)) {
> RowData prevRow = recordsIter.next();
> collectUpdateBefore(out, prevRow, currentRank);
> collectUpdateAfter(out, currentRow, currentRank);
> currentRow = prevRow;
> currentRank += 1;
> }
> } else {
> currentRank += records.size();
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)