[ 
https://issues.apache.org/jira/browse/FLINK-23106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-23106:
---------------------------------
    Description: 
Case:

Source1 ChangeLog --------------- -- --> 

Source2 -> AppendRank(rownum=1) -> Join(key is partitionKey) -> 
RetractRank(partition key is the pk of change log)

Then will be {{throw new RuntimeException( "Can not retract a non-existent 
record. This should never happen.")}} in RetractableTopNFunction.

 

In this case, the downstream will produce the wrong result, even the wrong 
retraction (retract message that does not exist)
 * Rank output partition key + rownum , the rank here has row_ rownum=1
 * In the downstream join judgment rank output, rownum is 1, so assuming 
partition key is PK, do PK based optimization
 * Because select rownum, rank thinks that the PK of downstream data must be 
partition key + rownum, the retraction message of partition key is out of 
order, leading to problems in downstream PK based optimization

Fix: there is a problem with the rank implementation. It should not result in 
disordered retraction messages of partition key.

Bypass: do not select rownum of rank output

The bug code is:
{code:java}
while (iterator.hasNext() && isInRankEnd(currentRank)) {
    Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
    Collection<RowData> records = entry.getValue();
    // meet its own sort key
    if (!findsSortKey && entry.getKey().equals(sortKey)) {
        currentRank += records.size();
        currentRow = input;
        findsSortKey = true;
    } else if (findsSortKey) {
        Iterator<RowData> recordsIter = records.iterator();
        while (recordsIter.hasNext() && isInRankEnd(currentRank)) {
            RowData prevRow = recordsIter.next();
            collectUpdateBefore(out, prevRow, currentRank);
            collectUpdateAfter(out, currentRow, currentRank);
            currentRow = prevRow;
            currentRank += 1;
        }
    } else {
        currentRank += records.size();
    }
}
{code}

  was:
Case:

Source1 -------ChangeLog---> 

Source2 -> AppendRank(rownum=1) -> Join(key is partitionKey) -> 
RetractRank(partition key is the pk of change log)

Then will be {{throw new RuntimeException( "Can not retract a non-existent 
record. This should never happen.")}} in RetractableTopNFunction.

 

In this case, the downstream will produce the wrong result, even the wrong 
retraction (retract message that does not exist)
 * Rank output partition key + rownum , the rank here has row_ rownum=1
 * In the downstream join judgment rank output, rownum is 1, so assuming 
partition key is PK, do PK based optimization
 * Because select rownum, rank thinks that the PK of downstream data must be 
partition key + rownum, the retraction message of partition key is out of 
order, leading to problems in downstream PK based optimization

Fix: there is a problem with the rank implementation. It should not result in 
disordered retraction messages of partition key.

Bypass: do not select rownum of rank output

The bug code is:
{code:java}
while (iterator.hasNext() && isInRankEnd(currentRank)) {
    Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
    Collection<RowData> records = entry.getValue();
    // meet its own sort key
    if (!findsSortKey && entry.getKey().equals(sortKey)) {
        currentRank += records.size();
        currentRow = input;
        findsSortKey = true;
    } else if (findsSortKey) {
        Iterator<RowData> recordsIter = records.iterator();
        while (recordsIter.hasNext() && isInRankEnd(currentRank)) {
            RowData prevRow = recordsIter.next();
            collectUpdateBefore(out, prevRow, currentRank);
            collectUpdateAfter(out, currentRow, currentRank);
            currentRow = prevRow;
            currentRank += 1;
        }
    } else {
        currentRank += records.size();
    }
}
{code}


> AppendOnlyTopNFunction should send retract first and then send insert
> ---------------------------------------------------------------------
>
>                 Key: FLINK-23106
>                 URL: https://issues.apache.org/jira/browse/FLINK-23106
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>            Reporter: Jingsong Lee
>            Priority: Major
>             Fix For: 1.14.0
>
>
> Case:
> Source1 ChangeLog --------------- -- --> 
> Source2 -> AppendRank(rownum=1) -> Join(key is partitionKey) -> 
> RetractRank(partition key is the pk of change log)
> Then will be {{throw new RuntimeException( "Can not retract a non-existent 
> record. This should never happen.")}} in RetractableTopNFunction.
>  
> In this case, the downstream will produce the wrong result, even the wrong 
> retraction (retract message that does not exist)
>  * Rank output partition key + rownum , the rank here has row_ rownum=1
>  * In the downstream join judgment rank output, rownum is 1, so assuming 
> partition key is PK, do PK based optimization
>  * Because select rownum, rank thinks that the PK of downstream data must be 
> partition key + rownum, the retraction message of partition key is out of 
> order, leading to problems in downstream PK based optimization
> Fix: there is a problem with the rank implementation. It should not result in 
> disordered retraction messages of partition key.
> Bypass: do not select rownum of rank output
> The bug code is:
> {code:java}
> while (iterator.hasNext() && isInRankEnd(currentRank)) {
>     Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
>     Collection<RowData> records = entry.getValue();
>     // meet its own sort key
>     if (!findsSortKey && entry.getKey().equals(sortKey)) {
>         currentRank += records.size();
>         currentRow = input;
>         findsSortKey = true;
>     } else if (findsSortKey) {
>         Iterator<RowData> recordsIter = records.iterator();
>         while (recordsIter.hasNext() && isInRankEnd(currentRank)) {
>             RowData prevRow = recordsIter.next();
>             collectUpdateBefore(out, prevRow, currentRank);
>             collectUpdateAfter(out, currentRow, currentRank);
>             currentRow = prevRow;
>             currentRank += 1;
>         }
>     } else {
>         currentRank += records.size();
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to