[ 
https://issues.apache.org/jira/browse/FLINK-21893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy updated FLINK-21893:
-------------------------
    Description: 
A ValidationException will be thrown out if partition key of Rank is an 
expression result of input Node. e.g If run the following sql, A 
validationException will be thrown out.

 
{code:java}
//代码占位符
@Test
def test(): Unit = {
  val data = List(
    (2001L, 2L),
    (2002L, 3L)
  )

  val ds = failingDataSource(data).toTable(tEnv, 'video_id, 'cnt, 
'proctime.proctime)
  tEnv.registerTable("T", ds)

  val sql =
    """
      |SELECT
      |  video_id,
      |  cnt,
      |  rownum_2
      |FROM
      |(
      |  SELECT
      |    video_id,
      |    cnt,
      |    ROW_NUMBER() OVER (
      |      ORDER BY cnt DESC
      |    ) AS rownum_2
      |    FROM
      |    (
      |      SELECT
      |        video_id,
      |        cnt,
      |        ROW_NUMBER() OVER (
      |          PARTITION BY bucket_id
      |          ORDER BY cnt DESC
      |        ) AS rownum_1
      |        FROM
      |        (
      |           SELECT
      |             video_id,
      |             cnt,
      |             MOD(video_id, 64) as bucket_id
      |             FROM T
      |         )
      |    )
      |    WHERE rownum_1 <= 1000
      |)
      |WHERE rownum_2 <= 1000
      |""".stripMargin

  val sink = new TestingRetractSink
  tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
  env.execute()
}
{code}
Exception detail
{code:java}
//代码占位符
org.apache.flink.table.api.ValidationException: Field names must be unique. 
Found duplicates: [$2]org.apache.flink.table.api.ValidationException: Field 
names must be unique. Found duplicates: [$2]
 at 
org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:277) 
at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:158) at 
org.apache.flink.table.types.logical.RowType.<init>(RowType.java:162) at 
org.apache.flink.table.types.logical.RowType.of(RowType.java:294) at 
org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:503)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.scala:212)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.scala:53)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlan(StreamExecRank.scala:53)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 {code}
 

When apply `FlinkLogicalRankRuleBase`  on 
`FlinkLogicalCalc`-`FlinkLogicalOverAggregate`,  a `FlinkLogicalRank` with 
following rowType will be get. Then a `StreamExecRank` with same rowType would 
be generated 

!image-2021-03-21-21-07-43-230.png!

  was:
A ValidationException will be thrown out when deriveRowType of Rank if 
partition key is an expression result of Input Node. e.g If run the following 
sql, A validationException will be thrown out.

 
{code:java}
//代码占位符
@Test
def test(): Unit = {
  val data = List(
    (2001L, 2L),
    (2002L, 3L)
  )

  val ds = failingDataSource(data).toTable(tEnv, 'video_id, 'cnt, 
'proctime.proctime)
  tEnv.registerTable("T", ds)

  val sql =
    """
      |SELECT
      |  video_id,
      |  cnt,
      |  rownum_2
      |FROM
      |(
      |  SELECT
      |    video_id,
      |    cnt,
      |    ROW_NUMBER() OVER (
      |      ORDER BY cnt DESC
      |    ) AS rownum_2
      |    FROM
      |    (
      |      SELECT
      |        video_id,
      |        cnt,
      |        ROW_NUMBER() OVER (
      |          PARTITION BY bucket_id
      |          ORDER BY cnt DESC
      |        ) AS rownum_1
      |        FROM
      |        (
      |           SELECT
      |             video_id,
      |             cnt,
      |             MOD(video_id, 64) as bucket_id
      |             FROM T
      |         )
      |    )
      |    WHERE rownum_1 <= 1000
      |)
      |WHERE rownum_2 <= 1000
      |""".stripMargin

  val sink = new TestingRetractSink
  tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
  env.execute()
}
{code}
Exception detail
{code:java}
//代码占位符
org.apache.flink.table.api.ValidationException: Field names must be unique. 
Found duplicates: [$2]org.apache.flink.table.api.ValidationException: Field 
names must be unique. Found duplicates: [$2]
 at 
org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:277) 
at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:158) at 
org.apache.flink.table.types.logical.RowType.<init>(RowType.java:162) at 
org.apache.flink.table.types.logical.RowType.of(RowType.java:294) at 
org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:503)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.scala:212)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.scala:53)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlan(StreamExecRank.scala:53)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 {code}
 


> A ValidationException will be thrown out if partition key of Rank is an 
> expression result of input Node.
> --------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21893
>                 URL: https://issues.apache.org/jira/browse/FLINK-21893
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Andy
>            Priority: Major
>         Attachments: image-2021-03-21-21-07-43-230.png
>
>
> A ValidationException will be thrown out if partition key of Rank is an 
> expression result of input Node. e.g If run the following sql, A 
> validationException will be thrown out.
>  
> {code:java}
> //代码占位符
> @Test
> def test(): Unit = {
>   val data = List(
>     (2001L, 2L),
>     (2002L, 3L)
>   )
>   val ds = failingDataSource(data).toTable(tEnv, 'video_id, 'cnt, 
> 'proctime.proctime)
>   tEnv.registerTable("T", ds)
>   val sql =
>     """
>       |SELECT
>       |  video_id,
>       |  cnt,
>       |  rownum_2
>       |FROM
>       |(
>       |  SELECT
>       |    video_id,
>       |    cnt,
>       |    ROW_NUMBER() OVER (
>       |      ORDER BY cnt DESC
>       |    ) AS rownum_2
>       |    FROM
>       |    (
>       |      SELECT
>       |        video_id,
>       |        cnt,
>       |        ROW_NUMBER() OVER (
>       |          PARTITION BY bucket_id
>       |          ORDER BY cnt DESC
>       |        ) AS rownum_1
>       |        FROM
>       |        (
>       |           SELECT
>       |             video_id,
>       |             cnt,
>       |             MOD(video_id, 64) as bucket_id
>       |             FROM T
>       |         )
>       |    )
>       |    WHERE rownum_1 <= 1000
>       |)
>       |WHERE rownum_2 <= 1000
>       |""".stripMargin
>   val sink = new TestingRetractSink
>   tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
>   env.execute()
> }
> {code}
> Exception detail
> {code:java}
> //代码占位符
> org.apache.flink.table.api.ValidationException: Field names must be unique. 
> Found duplicates: [$2]org.apache.flink.table.api.ValidationException: Field 
> names must be unique. Found duplicates: [$2]
>  at 
> org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:277) 
> at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:158) at 
> org.apache.flink.table.types.logical.RowType.<init>(RowType.java:162) at 
> org.apache.flink.table.types.logical.RowType.of(RowType.java:294) at 
> org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:503)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.scala:212)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.scala:53)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlan(StreamExecRank.scala:53)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>  {code}
>  
> When apply `FlinkLogicalRankRuleBase`  on 
> `FlinkLogicalCalc`-`FlinkLogicalOverAggregate`,  a `FlinkLogicalRank` with 
> following rowType will be get. Then a `StreamExecRank` with same rowType 
> would be generated 
> !image-2021-03-21-21-07-43-230.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to