[ 
https://issues.apache.org/jira/browse/FLINK-23106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-23106:
---------------------------------
    Description: 
Case:

Source1 -------ChangeLog---> 

Source2 -> AppendRank(rownum=1) -> Join(key is partitionKey) -> 
RetractRank(partition key is the pk of change log)

Then will be {{throw new RuntimeException( "Can not retract a non-existent 
record. This should never happen.")}} in RetractableTopNFunction.

 

In this case, the downstream will produce the wrong result, even the wrong 
retraction (retract message that does not exist)
 * Rank output partition key + rownum , the rank here has row_ rownum=1
 * In the downstream join judgment rank output, rownum is 1, so assuming 
partition key is PK, do PK based optimization
 * Because select rownum, rank thinks that the PK of downstream data must be 
partition key + rownum, the retraction message of partition key is out of 
order, leading to problems in downstream PK based optimization

Fix: there is a problem with the rank implementation. It should not result in 
disordered retraction messages of partition key.

Bypass: do not select rownum of rank output

  was:
Case:

Source1 -------ChangeLog---> 

Source2 -> Rank(rownum=1) -> Join(key is partitionKey) -> Other_Rank(partition 
key is the pk of change log)

Then will be {{throw new RuntimeException( "Can not retract a non-existent 
record. This should never happen.")}} in RetractableTopNFunction.

 

In this case, the downstream will produce the wrong result, even the wrong 
retraction (retract message that does not exist)
 * Rank output partition key + rownum , the rank here has row_ rownum=1
 * In the downstream join judgment rank output, rownum is 1, so assuming 
partition key is PK, do PK based optimization
 * Because select rownum, rank thinks that the PK of downstream data must be 
partition key + rownum, the retraction message of partition key is out of 
order, leading to problems in downstream PK based optimization

Fix: there is a problem with the rank implementation. It should not result in 
disordered retraction messages of partition key.

Bypass: do not select rownum of rank output


> AppendOnlyTopNFunction should send retract first and then send insert
> ---------------------------------------------------------------------
>
>                 Key: FLINK-23106
>                 URL: https://issues.apache.org/jira/browse/FLINK-23106
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>            Reporter: Jingsong Lee
>            Priority: Major
>             Fix For: 1.14.0
>
>
> Case:
> Source1 -------ChangeLog---> 
> Source2 -> AppendRank(rownum=1) -> Join(key is partitionKey) -> 
> RetractRank(partition key is the pk of change log)
> Then will be {{throw new RuntimeException( "Can not retract a non-existent 
> record. This should never happen.")}} in RetractableTopNFunction.
>  
> In this case, the downstream will produce the wrong result, even the wrong 
> retraction (retract message that does not exist)
>  * Rank output partition key + rownum , the rank here has row_ rownum=1
>  * In the downstream join judgment rank output, rownum is 1, so assuming 
> partition key is PK, do PK based optimization
>  * Because select rownum, rank thinks that the PK of downstream data must be 
> partition key + rownum, the retraction message of partition key is out of 
> order, leading to problems in downstream PK based optimization
> Fix: there is a problem with the rank implementation. It should not result in 
> disordered retraction messages of partition key.
> Bypass: do not select rownum of rank output



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to