[ 
https://issues.apache.org/jira/browse/FLINK-35792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17896176#comment-17896176
 ] 

xuyang commented on FLINK-35792:
--------------------------------

Hi, [~jhughes] . 

I have some uncertainties regarding the solution proposed in this issue and 
would like to discuss it with you to gather your thoughts. Please feel free to 
correct me if I'm mistaken.

To provide some context, in streaming, overagg, rank, and deduplicate are 
actually interrelated optimizations. In overagg, the order key can only be 
defined on a single time attr column of either proctime or rowtime, and it must 
be in ascending order. Rank supports ordering by both regular and time attr 
columns (although there is an issue with proctime in this issue). Deduplicate 
only supports ordering by a single time attr column and top1.

Now, let's discuss various potential solutions:

1. ({color:#FF0000}Recommended{color}) Directly throw an error in rank, 
suggesting that users order by functions like now().
2. ({color:#FF0000}Recommended{color}) In this case, avoid converting overagg 
to rank, and in the subsequent process, use the existing error handling from 
overagg.
3. ({color:#FF0000}Less Recommended{color}) Apply special handling for proctime 
within rank. The possible approaches I can think of include:
3.1 Adding a special internal column for the sake of materializing the proctime 
(and potentially the rowtime?) before rank.
3.2 Introducing proctime-related logic in various rank functions.
4. ({color:#FF0000}Not Recommended{color}) Materializing proctime before rank 
within the rule FlinkRelTimeIndicatorProgram. However, historically, overagg, 
rank, and deduplicate have never materialized time attr columns, allowing users 
to continue using these columns for windowing and similar operations afterward. 
Therefore, I’d like to avoid disrupting this behavior and ensure consistency 
between rank, overagg and deduplicate.

I'm looking for your thoughts.

> Sorting by proctime does not work in rank
> -----------------------------------------
>
>                 Key: FLINK-35792
>                 URL: https://issues.apache.org/jira/browse/FLINK-35792
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.20.0, 1.19.1
>            Reporter: xuyang
>            Assignee: xuyang
>            Priority: Major
>
> Take the following sql as an example:
> {code:java}
> @Test
> def test(): Unit = {
>   val sql =
>     """
>       |SELECT *
>       |FROM (
>       |  SELECT a, b, c,
>       |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b, proctime DESC) as 
> rank_num
>       |  FROM MyTable)
>       |WHERE rank_num = 1
>     """.stripMargin
>   // This rank can't be converted into Deduplicated because it also uses `b`  
>  
>   // as order key.    
>   util.verifyExecPlan(sql)
> } {code}
> The rank node will not materialize the `proctime` in 
> `RelTimeIndicatorConverter`, thus the order key `proctime` is always null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to