[
https://issues.apache.org/jira/browse/FLINK-28986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580080#comment-17580080
]
Jane Chan edited comment on FLINK-28986 at 8/16/22 6:37 AM:
------------------------------------------------------------
The reason is that during the decorrelation rewrite, the top filter rel node
was pushed down to form a nested filter pattern. Since the filter merge rule is
not defined in the default rewrite rule sets, the nested filter rel nodes
rendered the LogicalUnnestRule unmatched. This can be fixed by adding
CoreRules.FILTER_MERGE to FlinkStreamRuleSets. cc [~godfrey]
!image-2022-08-16-14-36-07-061.png|width=1047,height=177!
was (Author: qingyue):
The reason is that during the decorrelation rewrite, the top filter rel node
was pushed down to form a nested filter pattern. Since the filter merge rule is
not defined in the default rewrite rule sets, the nested filter rel nodes
rendered the LogicalUnnestRule unmatched. This can be fixed by adding
CoreRules.FILTER_MERGE to FlinkStreamRuleSets.
!image-2022-08-16-14-36-07-061.png|width=1047,height=177!
> UNNEST function with nested fails to generate plan
> --------------------------------------------------
>
> Key: FLINK-28986
> URL: https://issues.apache.org/jira/browse/FLINK-28986
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.16.0
> Reporter: Jane Chan
> Priority: Major
> Attachments: image-2022-08-16-14-36-07-061.png
>
>
> h3. How to reproduce
> add the following case to TableEnvironmentITCase
> {code:scala}
> @Test
> def debug(): Unit = {
> tEnv.executeSql(
> s"""
> |CREATE TEMPORARY TABLE source_kafka_wip_his_all (
> | GUID varchar,
> | OPERATION varchar,
> | PRODUCTID varchar,
> | LOTNO varchar,
> | SERIALNO varchar,
> | QUERYSERIALNO varchar,
> | SERIALNO1 varchar,
> | SERIALNO2 varchar,
> | WIPORDERNO varchar,
> | WIPORDERTYPE varchar,
> | VIRTUALLOT varchar,
> | PREOPERATION varchar,
> | NORMALPREOPERATION varchar,
> | PROCESSID varchar,
> | EQUIPMENT varchar,
> | INBOUNDDATE varchar,
> | OUTBOUNDDATE varchar,
> | REWORK varchar,
> | REWORKPROCESSID varchar,
> | CONTAINER varchar,
> | WIPCONTENTCLASSID varchar,
> | STATUSCODE varchar,
> | WIPSTATUS varchar,
> | TESTPROCESSID varchar,
> | TESTORDERTYPE varchar,
> | TESTORDER varchar,
> | TEST varchar,
> | SORTINGPROCESSID varchar,
> | SORTINGORDERTYPE varchar,
> | SORTINGORDER varchar,
> | SORTING varchar,
> | MINO varchar,
> | GROUPCODE varchar,
> | HIGHLOWGROUP varchar,
> | PRODUCTNO varchar,
> | FACILITY varchar,
> | WIPLINE varchar,
> | CHILDEQUCODE varchar,
> | STATION varchar,
> | QTY varchar,
> | PASS_FLAG varchar,
> | DEFECTCODELIST varchar,
> | ISFIRST varchar,
> | PARALIST ARRAY<ROW(GUID string,WIP_HIS_GUID string,QUERYSERIALNO
> string,OPERATION string,REWORKPROCESSID string,CHARACTERISTIC
> string,CHARACTERISTICREVISION string,CHARACTERISTICTYPE
> string,CHARACTERISTICCLASS string,UPPERCONTROLLIMIT string,TARGETVALUE
> string,LOWERCONTROLLIMIT string,TESTVALUE string,TESTATTRIBUTE
> string,TESTINGSTARTDATE string,TESTFINISHDATE string,UOMCODE
> string,DEFECTCODE string,SPECPARAMID string,STATION string,GP_TIME
> string,REFERENCEID string,LASTUPDATEON string,LASTUPDATEDBY string,CREATEDON
> string,CREATEDBY string,ACTIVE string,LASTDELETEON string,LASTDELETEDBY
> string,LASTREACTIVATEON string,LASTREACTIVATEDBY string,ARCHIVEID
> string,LASTARCHIVEON string,LASTARCHIVEDBY string,LASTRESTOREON
> string,LASTRESTOREDBY string,ROWVERSIONSTAMP string)>,
> | REFERENCEID varchar,
> | LASTUPDATEON varchar,
> | LASTUPDATEDBY varchar,
> | CREATEDON varchar,
> | CREATEDBY varchar,
> | ACTIVE varchar,
> | LASTDELETEON varchar,
> | LASTDELETEDBY varchar,
> | LASTREACTIVATEON varchar,
> | LASTREACTIVATEDBY varchar,
> | ARCHIVEID varchar,
> | LASTARCHIVEON varchar,
> | LASTARCHIVEDBY varchar,
> | LASTRESTOREON varchar,
> | LASTRESTOREDBY varchar,
> | ROWVERSIONSTAMP varchar,
> | proctime as PROCTIME()
> | ) with (
> | 'connector' = 'datagen'
> |)
> |""".stripMargin)
> tEnv.executeSql(
> s"""
> |create TEMPORARY view transform_main_data as
> |select
> | r.GUID as wip_his_guid,
> | r.EQUIPMENT as equipment,
> | r.WIPLINE as wipline,
> | r.STATION as station,
> | cast(r.PROCESSID as decimal) as processid,
> | r.PRODUCTNO as productno,
> | t.TESTFINISHDATE as testfinishdate,
> | t.OPERATION as operation,
> | t.CHARACTERISTIC as characteristic,
> | t.LOWERCONTROLLIMIT as lowercontrollimit,
> | t.UPPERCONTROLLIMIT as uppercontrollimit,
> | t.TARGETVALUE as targetvalue,
> | t.DEFECTCODE as defectcode,
> | t.TESTVALUE as testvalue,
> | t.CHARACTERISTICTYPE as characteristictype,
> | proctime
> | from
> | (select
> | GUID,
> | EQUIPMENT,
> | WIPLINE,
> | STATION,
> | PROCESSID,
> | PRODUCTNO,
> | PARALIST,
> | proctime
> | FROM source_kafka_wip_his_all) r
> | cross join
> | unnest(PARALIST) as t
> (GUID,WIP_HIS_GUID,QUERYSERIALNO,OPERATION,REWORKPROCESSID,CHARACTERISTIC,CHARACTERISTICREVISION,CHARACTERISTICTYPE,CHARACTERISTICCLASS,UPPERCONTROLLIMIT,TARGETVALUE,LOWERCONTROLLIMIT,TESTVALUE,TESTATTRIBUTE,TESTINGSTARTDATE,TESTFINISHDATE,UOMCODE,DEFECTCODE,SPECPARAMID,STATION,GP_TIME,REFERENCEID,LASTUPDATEON,LASTUPDATEDBY,CREATEDON,CREATEDBY,ACTIVE,LASTDELETEON,LASTDELETEDBY,LASTREACTIVATEON,LASTREACTIVATEDBY,ARCHIVEID,LASTARCHIVEON,LASTARCHIVEDBY,LASTRESTOREON,LASTRESTOREDBY,ROWVERSIONSTAMP)
> | where t.CHARACTERISTICTYPE = '2'
> |""".stripMargin)
> tEnv.executeSql(
> s"""
> |explain plan for
> |select * from transform_main_data
> |where operation not in
> ('G1208','G1209','G1211','G1213','G1206','G1207','G1214','G1215','G1282','G1292','G1216')
> |""".stripMargin).print()
> } {code}
> Stacktrace
> {code:java}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution
> plan for the given query: LogicalProject(inputs=[0..3],
> exprs=[[CAST($4):DECIMAL(10, 0), $5, $23, $11, $13, $19, $17, $18, $25, $20,
> $15, $7]])
> +- LogicalCorrelate(correlation=[$cor1], joinType=[inner],
> requiredColumns=[{6}])
> :- LogicalProject(inputs=[0], exprs=[[$14, $36, $38, $13, $34, $43,
> PROCTIME()]])
> : +- LogicalTableScan(table=[[default_catalog, default_database,
> source_kafka_wip_his_all]])
> +- LogicalFilter(condition=[AND(SEARCH($7,
> Sarg[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), SEARCH($3,
> Sarg[(-∞.._UTF-16LE'G1206'), (_UTF-16LE'G1206'.._UTF-16LE'G1207'),
> (_UTF-16LE'G1207'.._UTF-16LE'G1208'), (_UTF-16LE'G1208'.._UTF-16LE'G1209'),
> (_UTF-16LE'G1209'.._UTF-16LE'G1211'), (_UTF-16LE'G1211'.._UTF-16LE'G1213'),
> (_UTF-16LE'G1213'.._UTF-16LE'G1214'), (_UTF-16LE'G1214'.._UTF-16LE'G1215'),
> (_UTF-16LE'G1215'.._UTF-16LE'G1216'), (_UTF-16LE'G1216'.._UTF-16LE'G1282'),
> (_UTF-16LE'G1282'.._UTF-16LE'G1292'), (_UTF-16LE'G1292'..+∞)]:CHAR(5)
> CHARACTER SET "UTF-16LE"))])
> +- Uncollect
> +- LogicalProject(exprs=[[$cor1.PARALIST]])
> +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0
> }]])This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL
> features.
> org.apache.flink.table.api.TableException: Cannot generate a valid execution
> plan for the given query:
> LogicalProject(inputs=[0..3], exprs=[[CAST($4):DECIMAL(10, 0), $5, $23, $11,
> $13, $19, $17, $18, $25, $20, $15, $7]])
> +- LogicalCorrelate(correlation=[$cor1], joinType=[inner],
> requiredColumns=[{6}])
> :- LogicalProject(inputs=[0], exprs=[[$14, $36, $38, $13, $34, $43,
> PROCTIME()]])
> : +- LogicalTableScan(table=[[default_catalog, default_database,
> source_kafka_wip_his_all]])
> +- LogicalFilter(condition=[AND(SEARCH($7,
> Sarg[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), SEARCH($3,
> Sarg[(-∞.._UTF-16LE'G1206'), (_UTF-16LE'G1206'.._UTF-16LE'G1207'),
> (_UTF-16LE'G1207'.._UTF-16LE'G1208'), (_UTF-16LE'G1208'.._UTF-16LE'G1209'),
> (_UTF-16LE'G1209'.._UTF-16LE'G1211'), (_UTF-16LE'G1211'.._UTF-16LE'G1213'),
> (_UTF-16LE'G1213'.._UTF-16LE'G1214'), (_UTF-16LE'G1214'.._UTF-16LE'G1215'),
> (_UTF-16LE'G1215'.._UTF-16LE'G1216'), (_UTF-16LE'G1216'.._UTF-16LE'G1282'),
> (_UTF-16LE'G1282'.._UTF-16LE'G1292'), (_UTF-16LE'G1292'..+∞)]:CHAR(5)
> CHARACTER SET "UTF-16LE"))])
> +- Uncollect
> +- LogicalProject(exprs=[[$cor1.PARALIST]])
> +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0
> }]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL
> features.
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:70)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:527)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:96)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:51)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:695)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1356)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:733)
> at
> org.apache.flink.table.api.TableEnvironmentITCase.debug(TableEnvironmentITCase.scala:695)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> at
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> at org.junit.runners.Suite.runChild(Suite.java:128)
> at org.junit.runners.Suite.runChild(Suite.java:27)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
> Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There
> are not enough rules to produce a node with desired properties:
> convention=LOGICAL, FlinkRelDistributionTraitDef=any,
> MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE],
> UpdateKindTraitDef=[NONE].
> Missing conversion is Uncollect[convention: NONE -> LOGICAL]
> There is 1 empty subset: rel#485:RelSubset#4.LOGICAL.any.None:
> 0.[NONE].[NONE], the relevant part of the original plan is as follows
> 460:Uncollect
> 458:LogicalProject(subset=[rel#459:RelSubset#3.NONE.any.None:
> 0.[NONE].[NONE]], PARALIST=[$cor1.PARALIST])
> 17:LogicalValues(subset=[rel#457:RelSubset#2.NONE.any.None:
> 0.[NONE].[NONE]], tuples=[[{ 0 }]]){code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)