[
https://issues.apache.org/jira/browse/FLINK-28986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
godfrey he closed FLINK-28986.
------------------------------
Resolution: Fixed
Fixed in master: bdffb6bd4ae3ea32bdcd6ec6bce6c6e0e8b92a11
> UNNEST function with nested filter fails to generate plan
> ---------------------------------------------------------
>
> Key: FLINK-28986
> URL: https://issues.apache.org/jira/browse/FLINK-28986
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.16.0
> Reporter: Jane Chan
> Assignee: Jane Chan
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.16.0
>
> Attachments: image-2022-08-16-14-36-07-061.png
>
>
> h3. How to reproduce
> add the following case to TableEnvironmentITCase
> {code:scala}
> @Test
> def debug(): Unit = {
> tEnv.executeSql(
> s"""
> |CREATE TEMPORARY TABLE source_kafka_wip_his_all (
> | GUID varchar,
> | OPERATION varchar,
> | PRODUCTID varchar,
> | LOTNO varchar,
> | SERIALNO varchar,
> | QUERYSERIALNO varchar,
> | SERIALNO1 varchar,
> | SERIALNO2 varchar,
> | WIPORDERNO varchar,
> | WIPORDERTYPE varchar,
> | VIRTUALLOT varchar,
> | PREOPERATION varchar,
> | NORMALPREOPERATION varchar,
> | PROCESSID varchar,
> | EQUIPMENT varchar,
> | INBOUNDDATE varchar,
> | OUTBOUNDDATE varchar,
> | REWORK varchar,
> | REWORKPROCESSID varchar,
> | CONTAINER varchar,
> | WIPCONTENTCLASSID varchar,
> | STATUSCODE varchar,
> | WIPSTATUS varchar,
> | TESTPROCESSID varchar,
> | TESTORDERTYPE varchar,
> | TESTORDER varchar,
> | TEST varchar,
> | SORTINGPROCESSID varchar,
> | SORTINGORDERTYPE varchar,
> | SORTINGORDER varchar,
> | SORTING varchar,
> | MINO varchar,
> | GROUPCODE varchar,
> | HIGHLOWGROUP varchar,
> | PRODUCTNO varchar,
> | FACILITY varchar,
> | WIPLINE varchar,
> | CHILDEQUCODE varchar,
> | STATION varchar,
> | QTY varchar,
> | PASS_FLAG varchar,
> | DEFECTCODELIST varchar,
> | ISFIRST varchar,
> | PARALIST ARRAY<ROW(GUID string,WIP_HIS_GUID string,QUERYSERIALNO
> string,OPERATION string,REWORKPROCESSID string,CHARACTERISTIC
> string,CHARACTERISTICREVISION string,CHARACTERISTICTYPE
> string,CHARACTERISTICCLASS string,UPPERCONTROLLIMIT string,TARGETVALUE
> string,LOWERCONTROLLIMIT string,TESTVALUE string,TESTATTRIBUTE
> string,TESTINGSTARTDATE string,TESTFINISHDATE string,UOMCODE
> string,DEFECTCODE string,SPECPARAMID string,STATION string,GP_TIME
> string,REFERENCEID string,LASTUPDATEON string,LASTUPDATEDBY string,CREATEDON
> string,CREATEDBY string,ACTIVE string,LASTDELETEON string,LASTDELETEDBY
> string,LASTREACTIVATEON string,LASTREACTIVATEDBY string,ARCHIVEID
> string,LASTARCHIVEON string,LASTARCHIVEDBY string,LASTRESTOREON
> string,LASTRESTOREDBY string,ROWVERSIONSTAMP string)>,
> | REFERENCEID varchar,
> | LASTUPDATEON varchar,
> | LASTUPDATEDBY varchar,
> | CREATEDON varchar,
> | CREATEDBY varchar,
> | ACTIVE varchar,
> | LASTDELETEON varchar,
> | LASTDELETEDBY varchar,
> | LASTREACTIVATEON varchar,
> | LASTREACTIVATEDBY varchar,
> | ARCHIVEID varchar,
> | LASTARCHIVEON varchar,
> | LASTARCHIVEDBY varchar,
> | LASTRESTOREON varchar,
> | LASTRESTOREDBY varchar,
> | ROWVERSIONSTAMP varchar,
> | proctime as PROCTIME()
> | ) with (
> | 'connector' = 'datagen'
> |)
> |""".stripMargin)
> tEnv.executeSql(
> s"""
> |create TEMPORARY view transform_main_data as
> |select
> | r.GUID as wip_his_guid,
> | r.EQUIPMENT as equipment,
> | r.WIPLINE as wipline,
> | r.STATION as station,
> | cast(r.PROCESSID as decimal) as processid,
> | r.PRODUCTNO as productno,
> | t.TESTFINISHDATE as testfinishdate,
> | t.OPERATION as operation,
> | t.CHARACTERISTIC as characteristic,
> | t.LOWERCONTROLLIMIT as lowercontrollimit,
> | t.UPPERCONTROLLIMIT as uppercontrollimit,
> | t.TARGETVALUE as targetvalue,
> | t.DEFECTCODE as defectcode,
> | t.TESTVALUE as testvalue,
> | t.CHARACTERISTICTYPE as characteristictype,
> | proctime
> | from
> | (select
> | GUID,
> | EQUIPMENT,
> | WIPLINE,
> | STATION,
> | PROCESSID,
> | PRODUCTNO,
> | PARALIST,
> | proctime
> | FROM source_kafka_wip_his_all) r
> | cross join
> | unnest(PARALIST) as t
> (GUID,WIP_HIS_GUID,QUERYSERIALNO,OPERATION,REWORKPROCESSID,CHARACTERISTIC,CHARACTERISTICREVISION,CHARACTERISTICTYPE,CHARACTERISTICCLASS,UPPERCONTROLLIMIT,TARGETVALUE,LOWERCONTROLLIMIT,TESTVALUE,TESTATTRIBUTE,TESTINGSTARTDATE,TESTFINISHDATE,UOMCODE,DEFECTCODE,SPECPARAMID,STATION,GP_TIME,REFERENCEID,LASTUPDATEON,LASTUPDATEDBY,CREATEDON,CREATEDBY,ACTIVE,LASTDELETEON,LASTDELETEDBY,LASTREACTIVATEON,LASTREACTIVATEDBY,ARCHIVEID,LASTARCHIVEON,LASTARCHIVEDBY,LASTRESTOREON,LASTRESTOREDBY,ROWVERSIONSTAMP)
> | where t.CHARACTERISTICTYPE = '2'
> |""".stripMargin)
> tEnv.executeSql(
> s"""
> |explain plan for
> |select * from transform_main_data
> |where operation not in
> ('G1208','G1209','G1211','G1213','G1206','G1207','G1214','G1215','G1282','G1292','G1216')
> |""".stripMargin).print()
> } {code}
> Stacktrace
> {code:java}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution
> plan for the given query: LogicalProject(inputs=[0..3],
> exprs=[[CAST($4):DECIMAL(10, 0), $5, $23, $11, $13, $19, $17, $18, $25, $20,
> $15, $7]])
> +- LogicalCorrelate(correlation=[$cor1], joinType=[inner],
> requiredColumns=[{6}])
> :- LogicalProject(inputs=[0], exprs=[[$14, $36, $38, $13, $34, $43,
> PROCTIME()]])
> : +- LogicalTableScan(table=[[default_catalog, default_database,
> source_kafka_wip_his_all]])
> +- LogicalFilter(condition=[AND(SEARCH($7,
> Sarg[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), SEARCH($3,
> Sarg[(-∞.._UTF-16LE'G1206'), (_UTF-16LE'G1206'.._UTF-16LE'G1207'),
> (_UTF-16LE'G1207'.._UTF-16LE'G1208'), (_UTF-16LE'G1208'.._UTF-16LE'G1209'),
> (_UTF-16LE'G1209'.._UTF-16LE'G1211'), (_UTF-16LE'G1211'.._UTF-16LE'G1213'),
> (_UTF-16LE'G1213'.._UTF-16LE'G1214'), (_UTF-16LE'G1214'.._UTF-16LE'G1215'),
> (_UTF-16LE'G1215'.._UTF-16LE'G1216'), (_UTF-16LE'G1216'.._UTF-16LE'G1282'),
> (_UTF-16LE'G1282'.._UTF-16LE'G1292'), (_UTF-16LE'G1292'..+∞)]:CHAR(5)
> CHARACTER SET "UTF-16LE"))])
> +- Uncollect
> +- LogicalProject(exprs=[[$cor1.PARALIST]])
> +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0
> }]])This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL
> features.
> org.apache.flink.table.api.TableException: Cannot generate a valid execution
> plan for the given query:
> LogicalProject(inputs=[0..3], exprs=[[CAST($4):DECIMAL(10, 0), $5, $23, $11,
> $13, $19, $17, $18, $25, $20, $15, $7]])
> +- LogicalCorrelate(correlation=[$cor1], joinType=[inner],
> requiredColumns=[{6}])
> :- LogicalProject(inputs=[0], exprs=[[$14, $36, $38, $13, $34, $43,
> PROCTIME()]])
> : +- LogicalTableScan(table=[[default_catalog, default_database,
> source_kafka_wip_his_all]])
> +- LogicalFilter(condition=[AND(SEARCH($7,
> Sarg[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), SEARCH($3,
> Sarg[(-∞.._UTF-16LE'G1206'), (_UTF-16LE'G1206'.._UTF-16LE'G1207'),
> (_UTF-16LE'G1207'.._UTF-16LE'G1208'), (_UTF-16LE'G1208'.._UTF-16LE'G1209'),
> (_UTF-16LE'G1209'.._UTF-16LE'G1211'), (_UTF-16LE'G1211'.._UTF-16LE'G1213'),
> (_UTF-16LE'G1213'.._UTF-16LE'G1214'), (_UTF-16LE'G1214'.._UTF-16LE'G1215'),
> (_UTF-16LE'G1215'.._UTF-16LE'G1216'), (_UTF-16LE'G1216'.._UTF-16LE'G1282'),
> (_UTF-16LE'G1282'.._UTF-16LE'G1292'), (_UTF-16LE'G1292'..+∞)]:CHAR(5)
> CHARACTER SET "UTF-16LE"))])
> +- Uncollect
> +- LogicalProject(exprs=[[$cor1.PARALIST]])
> +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0
> }]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL
> features.
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:70)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:527)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:96)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:51)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:695)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1356)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:733)
> at
> org.apache.flink.table.api.TableEnvironmentITCase.debug(TableEnvironmentITCase.scala:695)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> at
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> at org.junit.runners.Suite.runChild(Suite.java:128)
> at org.junit.runners.Suite.runChild(Suite.java:27)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
> Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There
> are not enough rules to produce a node with desired properties:
> convention=LOGICAL, FlinkRelDistributionTraitDef=any,
> MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE],
> UpdateKindTraitDef=[NONE].
> Missing conversion is Uncollect[convention: NONE -> LOGICAL]
> There is 1 empty subset: rel#485:RelSubset#4.LOGICAL.any.None:
> 0.[NONE].[NONE], the relevant part of the original plan is as follows
> 460:Uncollect
> 458:LogicalProject(subset=[rel#459:RelSubset#3.NONE.any.None:
> 0.[NONE].[NONE]], PARALIST=[$cor1.PARALIST])
> 17:LogicalValues(subset=[rel#457:RelSubset#2.NONE.any.None:
> 0.[NONE].[NONE]], tuples=[[{ 0 }]]){code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)