[
https://issues.apache.org/jira/browse/FLINK-21893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andy updated FLINK-21893:
-------------------------
Description:
A ValidationException will be thrown out if partition key of Rank is an
expression result of input Node. e.g If run the following sql, A
validationException will be thrown out.
{code:java}
//代码占位符
@Test
def test(): Unit = {
val data = List(
(2001L, 2L),
(2002L, 3L)
)
val ds = failingDataSource(data).toTable(tEnv, 'video_id, 'cnt,
'proctime.proctime)
tEnv.registerTable("T", ds)
val sql =
"""
|SELECT
| video_id,
| cnt,
| rownum_2
|FROM
|(
| SELECT
| video_id,
| cnt,
| ROW_NUMBER() OVER (
| ORDER BY cnt DESC
| ) AS rownum_2
| FROM
| (
| SELECT
| video_id,
| cnt,
| ROW_NUMBER() OVER (
| PARTITION BY bucket_id
| ORDER BY cnt DESC
| ) AS rownum_1
| FROM
| (
| SELECT
| video_id,
| cnt,
| MOD(video_id, 64) as bucket_id
| FROM T
| )
| )
| WHERE rownum_1 <= 1000
|)
|WHERE rownum_2 <= 1000
|""".stripMargin
val sink = new TestingRetractSink
tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
env.execute()
}
{code}
Exception detail
{code:java}
//代码占位符
org.apache.flink.table.api.ValidationException: Field names must be unique.
Found duplicates: [$2]org.apache.flink.table.api.ValidationException: Field
names must be unique. Found duplicates: [$2]
at
org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:277)
at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:158) at
org.apache.flink.table.types.logical.RowType.<init>(RowType.java:162) at
org.apache.flink.table.types.logical.RowType.of(RowType.java:294) at
org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:503)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.scala:212)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.scala:53)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlan(StreamExecRank.scala:53)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
{code}
When apply `FlinkLogicalRankRuleBase` on
`FlinkLogicalCalc`-`FlinkLogicalOverAggregate`, a `FlinkLogicalRank` with
following rowType will be get. Then a `StreamExecRank` with same rowType would
be generated
!image-2021-03-21-21-07-43-230.png!
was:
A ValidationException will be thrown out when deriveRowType of Rank if
partition key is an expression result of Input Node. e.g If run the following
sql, A validationException will be thrown out.
{code:java}
//代码占位符
@Test
def test(): Unit = {
val data = List(
(2001L, 2L),
(2002L, 3L)
)
val ds = failingDataSource(data).toTable(tEnv, 'video_id, 'cnt,
'proctime.proctime)
tEnv.registerTable("T", ds)
val sql =
"""
|SELECT
| video_id,
| cnt,
| rownum_2
|FROM
|(
| SELECT
| video_id,
| cnt,
| ROW_NUMBER() OVER (
| ORDER BY cnt DESC
| ) AS rownum_2
| FROM
| (
| SELECT
| video_id,
| cnt,
| ROW_NUMBER() OVER (
| PARTITION BY bucket_id
| ORDER BY cnt DESC
| ) AS rownum_1
| FROM
| (
| SELECT
| video_id,
| cnt,
| MOD(video_id, 64) as bucket_id
| FROM T
| )
| )
| WHERE rownum_1 <= 1000
|)
|WHERE rownum_2 <= 1000
|""".stripMargin
val sink = new TestingRetractSink
tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
env.execute()
}
{code}
Exception detail
{code:java}
//代码占位符
org.apache.flink.table.api.ValidationException: Field names must be unique.
Found duplicates: [$2]org.apache.flink.table.api.ValidationException: Field
names must be unique. Found duplicates: [$2]
at
org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:277)
at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:158) at
org.apache.flink.table.types.logical.RowType.<init>(RowType.java:162) at
org.apache.flink.table.types.logical.RowType.of(RowType.java:294) at
org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:503)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.scala:212)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.scala:53)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlan(StreamExecRank.scala:53)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
{code}
> A ValidationException will be thrown out if partition key of Rank is an
> expression result of input Node.
> --------------------------------------------------------------------------------------------------------
>
> Key: FLINK-21893
> URL: https://issues.apache.org/jira/browse/FLINK-21893
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: Andy
> Priority: Major
> Attachments: image-2021-03-21-21-07-43-230.png
>
>
> A ValidationException will be thrown out if partition key of Rank is an
> expression result of input Node. e.g If run the following sql, A
> validationException will be thrown out.
>
> {code:java}
> //代码占位符
> @Test
> def test(): Unit = {
> val data = List(
> (2001L, 2L),
> (2002L, 3L)
> )
> val ds = failingDataSource(data).toTable(tEnv, 'video_id, 'cnt,
> 'proctime.proctime)
> tEnv.registerTable("T", ds)
> val sql =
> """
> |SELECT
> | video_id,
> | cnt,
> | rownum_2
> |FROM
> |(
> | SELECT
> | video_id,
> | cnt,
> | ROW_NUMBER() OVER (
> | ORDER BY cnt DESC
> | ) AS rownum_2
> | FROM
> | (
> | SELECT
> | video_id,
> | cnt,
> | ROW_NUMBER() OVER (
> | PARTITION BY bucket_id
> | ORDER BY cnt DESC
> | ) AS rownum_1
> | FROM
> | (
> | SELECT
> | video_id,
> | cnt,
> | MOD(video_id, 64) as bucket_id
> | FROM T
> | )
> | )
> | WHERE rownum_1 <= 1000
> |)
> |WHERE rownum_2 <= 1000
> |""".stripMargin
> val sink = new TestingRetractSink
> tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
> env.execute()
> }
> {code}
> Exception detail
> {code:java}
> //代码占位符
> org.apache.flink.table.api.ValidationException: Field names must be unique.
> Found duplicates: [$2]org.apache.flink.table.api.ValidationException: Field
> names must be unique. Found duplicates: [$2]
> at
> org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:277)
> at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:158) at
> org.apache.flink.table.types.logical.RowType.<init>(RowType.java:162) at
> org.apache.flink.table.types.logical.RowType.of(RowType.java:294) at
> org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:503)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.scala:212)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.scala:53)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlan(StreamExecRank.scala:53)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> {code}
>
> When apply `FlinkLogicalRankRuleBase` on
> `FlinkLogicalCalc`-`FlinkLogicalOverAggregate`, a `FlinkLogicalRank` with
> following rowType will be get. Then a `StreamExecRank` with same rowType
> would be generated
> !image-2021-03-21-21-07-43-230.png!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)