[ 
https://issues.apache.org/jira/browse/FLINK-29804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-29804:
------------------------------------
    Description: 
It was introduced in https://issues.apache.org/jira/browse/CALCITE-4668
 and leads to issues in a number of tests like {{SetOperatorsTest}}, 
{{CorrelateTest}}, {{SetOperatorsTest}}, {{TemporalTableFunctionJoinTest}} and 
probably some integration tests

An example of failure
{noformat}

org.apache.flink.table.api.TableException: Cannot generate a valid execution 
plan for the given query: 

FlinkLogicalJoin(condition=[true], joinType=[inner])
:- FlinkLogicalCalc(select=[c])
:  +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
+- FlinkLogicalCalc(select=[d], where=[>(e, 20)])
   +- 
FlinkLogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)],
 rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.

        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:70)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
        at scala.collection.immutable.List.foreach(List.scala:388)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
        at 
org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:982)
        at 
org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:896)
        at 
org.apache.flink.table.planner.utils.TableTestUtilBase.verifyExecPlan(TableTestBase.scala:658)
        at 
org.apache.flink.table.planner.plan.batch.table.CorrelateTest.testCorrelateWithMultiFilterAndWithoutCalcMergeRules(CorrelateTest.scala:106)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at 
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258)
        at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
        at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
        at 
com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
        at 
com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
        at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
        at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
        at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are 
not enough rules to produce a node with desired properties: 
convention=BATCH_PHYSICAL, FlinkRelDistributionTraitDef=any, sort=[].
Missing conversion is FlinkLogicalTableFunctionScan[convention: LOGICAL -> 
BATCH_PHYSICAL]
There is 1 empty subset: rel#394:RelSubset#8.BATCH_PHYSICAL.any.[], the 
relevant part of the original plan is as follows
377:FlinkLogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)],
 rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])

Root: rel#388:RelSubset#10.BATCH_PHYSICAL.any.[]
Original rel:
FlinkLogicalJoin(subset=[rel#344:RelSubset#5.LOGICAL.any.[]], condition=[true], 
joinType=[inner]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 
1.00000001E8 cpu, 1.200000001E9 io, 0.0 network, 0.0 memory}, id = 356
  FlinkLogicalCalc(subset=[rel#354:RelSubset#1.LOGICAL.any.[]], select=[c]): 
rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 0.0 cpu, 0.0 io, 0.0 network, 
0.0 memory}, id = 357
    
FlinkLogicalLegacyTableSourceScan(subset=[rel#347:RelSubset#0.LOGICAL.any.[]], 
table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, 
b, c)]]], fields=[a, b, c]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 
1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}, id = 346
  FlinkLogicalCalc(subset=[rel#355:RelSubset#4.LOGICAL.any.[]], select=[d], 
where=[>(e, 20)]): rowcount = 1.0, cumulative cost = {1.0 rows, 0.0 cpu, 0.0 
io, 0.0 network, 0.0 memory}, id = 363
    FlinkLogicalTableFunctionScan(subset=[rel#350:RelSubset#2.LOGICAL.any.[]], 
invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], 
rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)]): rowcount = 1.0, 
cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 349

Sets:
Set#6, type: RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)
        rel#380:RelSubset#6.LOGICAL.any.[], best=rel#346
                
rel#346:FlinkLogicalLegacyTableSourceScan.LOGICAL.any.[](table=[default_catalog,
 default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b, 
c), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 
network, 0.0 memory}
        rel#391:RelSubset#6.BATCH_PHYSICAL.any.[], best=rel#390
                
rel#390:BatchPhysicalLegacyTableSourceScan.BATCH_PHYSICAL.any.[](table=[default_catalog,
 default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b, 
c), rowcount=1.0E8, cumulative cost={1.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 
network, 0.0 memory}
Set#7, type: RecordType(VARCHAR(2147483647) c)
        rel#382:RelSubset#7.LOGICAL.any.[], best=rel#381
                
rel#381:FlinkLogicalCalc.LOGICAL.any.[](input=RelSubset#380,select=c), 
rowcount=1.0E8, cumulative cost={2.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 
0.0 memory}
        rel#393:RelSubset#7.BATCH_PHYSICAL.any.[], best=rel#392
                
rel#392:BatchPhysicalCalc.BATCH_PHYSICAL.any.[](input=RelSubset#391,select=c), 
rowcount=1.0E8, cumulative cost={2.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 network, 
0.0 memory}
Set#8, type: RecordType(VARCHAR(2147483647) d, INTEGER e)
        rel#383:RelSubset#8.LOGICAL.any.[], best=rel#377
                
rel#377:FlinkLogicalTableFunctionScan.LOGICAL.any.[](invocation=*org.apache.flink.table.planner.utils.TableFunc0*($2),rowType=RecordType(VARCHAR(2147483647)
 d, INTEGER e)), rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 0.0 io, 0.0 
network, 0.0 memory}
        rel#394:RelSubset#8.BATCH_PHYSICAL.any.[], best=null
Set#9, type: RecordType(VARCHAR(2147483647) d)
        rel#385:RelSubset#9.LOGICAL.any.[], best=rel#384
                
rel#384:FlinkLogicalCalc.LOGICAL.any.[](input=RelSubset#383,select=d,where=>(e, 
20)), rowcount=1.0, cumulative cost={2.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 
0.0 memory}
        rel#396:RelSubset#9.BATCH_PHYSICAL.any.[], best=null
                
rel#395:BatchPhysicalCalc.BATCH_PHYSICAL.any.[](input=RelSubset#394,select=d,where=>(e,
 20)), rowcount=1.0, cumulative cost={inf}
                
rel#399:AbstractConverter.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]),
 rowcount=1.0, cumulative cost={inf}
                
rel#403:BatchPhysicalExchange.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,distribution=broadcast),
 rowcount=1.0, cumulative cost={inf}
        rel#398:RelSubset#9.BATCH_PHYSICAL.broadcast.[], best=null
                
rel#399:AbstractConverter.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]),
 rowcount=1.0, cumulative cost={inf}
                
rel#403:BatchPhysicalExchange.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,distribution=broadcast),
 rowcount=1.0, cumulative cost={inf}
Set#10, type: RecordType(VARCHAR(2147483647) c, VARCHAR(2147483647) d)
        rel#387:RelSubset#10.LOGICAL.any.[], best=rel#386
                
rel#386:FlinkLogicalJoin.LOGICAL.any.[](left=RelSubset#382,right=RelSubset#385,condition=true,joinType=inner),
 rowcount=1.0E8, cumulative cost={3.00000002E8 rows, 2.00000002E8 cpu, 
3.600000001E9 io, 0.0 network, 0.0 memory}
        rel#388:RelSubset#10.BATCH_PHYSICAL.any.[], best=null
                
rel#389:AbstractConverter.BATCH_PHYSICAL.any.[](input=RelSubset#387,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]),
 rowcount=1.0E8, cumulative cost={inf}
                
rel#400:BatchPhysicalNestedLoopJoin.BATCH_PHYSICAL.any.[](left=RelSubset#393,right=RelSubset#398,joinType=InnerJoin,where=true,select=c,
 d,build=right), rowcount=1.0E8, cumulative cost={inf}

Graphviz:
digraph G {
        root [style=filled,label="Root"];
        subgraph cluster6{
                label="Set 6 RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c)";
                rel346 
[label="rel#346:FlinkLogicalLegacyTableSourceScan\ntable=[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b, 
c\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 
memory}",color=blue,shape=box]
                rel390 
[label="rel#390:BatchPhysicalLegacyTableSourceScan\ntable=[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b, 
c\nrows=1.0E8, cost={1.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 network, 0.0 
memory}",color=blue,shape=box]
                subset380 [label="rel#380:RelSubset#6.LOGICAL.any.[]"]
                subset391 [label="rel#391:RelSubset#6.BATCH_PHYSICAL.any.[]"]
        }
        subgraph cluster7{
                label="Set 7 RecordType(VARCHAR(2147483647) c)";
                rel381 
[label="rel#381:FlinkLogicalCalc\ninput=RelSubset#380,select=c\nrows=1.0E8, 
cost={2.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 
memory}",color=blue,shape=box]
                rel392 
[label="rel#392:BatchPhysicalCalc\ninput=RelSubset#391,select=c\nrows=1.0E8, 
cost={2.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 network, 0.0 
memory}",color=blue,shape=box]
                subset382 [label="rel#382:RelSubset#7.LOGICAL.any.[]"]
                subset393 [label="rel#393:RelSubset#7.BATCH_PHYSICAL.any.[]"]
        }
        subgraph cluster8{
                label="Set 8 RecordType(VARCHAR(2147483647) d, INTEGER e)";
                rel377 
[label="rel#377:FlinkLogicalTableFunctionScan\ninvocation=*org.apache.flink.table.planner.utils.TableFunc0*($2),rowType=RecordType(VARCHAR(2147483647)
 d, INTEGER e)\nrows=1.0, cost={1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 
memory}",color=blue,shape=box]
                subset383 [label="rel#383:RelSubset#8.LOGICAL.any.[]"]
                subset394 
[label="rel#394:RelSubset#8.BATCH_PHYSICAL.any.[]",color=red]
        }
        subgraph cluster9{
                label="Set 9 RecordType(VARCHAR(2147483647) d)";
                rel384 
[label="rel#384:FlinkLogicalCalc\ninput=RelSubset#383,select=d,where=>(e, 
20)\nrows=1.0, cost={2.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 
memory}",color=blue,shape=box]
                rel395 
[label="rel#395:BatchPhysicalCalc\ninput=RelSubset#394,select=d,where=>(e, 
20)\nrows=1.0, cost={inf}",shape=box]
                rel399 
[label="rel#399:AbstractConverter\ninput=RelSubset#396,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]\nrows=1.0,
 cost={inf}",shape=box]
                rel403 
[label="rel#403:BatchPhysicalExchange\ninput=RelSubset#396,distribution=broadcast\nrows=1.0,
 cost={inf}",shape=box]
                subset385 [label="rel#385:RelSubset#9.LOGICAL.any.[]"]
                subset396 [label="rel#396:RelSubset#9.BATCH_PHYSICAL.any.[]"]
                subset398 
[label="rel#398:RelSubset#9.BATCH_PHYSICAL.broadcast.[]"]
                subset396 -> subset398; }
        subgraph cluster10{
                label="Set 10 RecordType(VARCHAR(2147483647) c, 
VARCHAR(2147483647) d)";
                rel386 
[label="rel#386:FlinkLogicalJoin\nleft=RelSubset#382,right=RelSubset#385,condition=true,joinType=inner\nrows=1.0E8,
 cost={3.00000002E8 rows, 2.00000002E8 cpu, 3.600000001E9 io, 0.0 network, 0.0 
memory}",color=blue,shape=box]
                rel389 
[label="rel#389:AbstractConverter\ninput=RelSubset#387,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]\nrows=1.0E8,
 cost={inf}",shape=box]
                rel400 
[label="rel#400:BatchPhysicalNestedLoopJoin\nleft=RelSubset#393,right=RelSubset#398,joinType=InnerJoin,where=true,select=c,
 d,build=right\nrows=1.0E8, cost={inf}",shape=box]
                subset387 [label="rel#387:RelSubset#10.LOGICAL.any.[]"]
                subset388 [label="rel#388:RelSubset#10.BATCH_PHYSICAL.any.[]"]
        }
        root -> subset388;
        subset380 -> rel346[color=blue];
        subset391 -> rel390[color=blue];
        subset382 -> rel381[color=blue]; rel381 -> subset380[color=blue];
        subset393 -> rel392[color=blue]; rel392 -> subset391[color=blue];
        subset383 -> rel377[color=blue];
        subset385 -> rel384[color=blue]; rel384 -> subset383[color=blue];
        subset396 -> rel395; rel395 -> subset394;
        subset398 -> rel399; rel399 -> subset396;
        subset398 -> rel403; rel403 -> subset396;
        subset387 -> rel386[color=blue]; rel386 -> 
subset382[color=blue,label="0"]; rel386 -> subset385[color=blue,label="1"];
        subset388 -> rel389; rel389 -> subset387;
        subset388 -> rel400; rel400 -> subset393[label="0"]; rel400 -> 
subset398[label="1"];
}
        at 
org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:709)
        at 
org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:390)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:533)
        at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:317)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
        ... 55 more

{noformat}


  was:
It was introduced in https://issues.apache.org/jira/browse/CALCITE-4668
 and leads to issues in a number of tests like {{SetOperatorsTest}}, 
{{CorrelateTest}}, {{SetOperatorsTest}}, {{TemporalTableFunctionJoinTest}} and 
probably some integration tests



> Support convertion of Correlate to Join if correlation variable is unused 
> introduced in Calcite 1.28
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-29804
>                 URL: https://issues.apache.org/jira/browse/FLINK-29804
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.17.0
>            Reporter: Sergey Nuyanzin
>            Priority: Major
>
> It was introduced in https://issues.apache.org/jira/browse/CALCITE-4668
>  and leads to issues in a number of tests like {{SetOperatorsTest}}, 
> {{CorrelateTest}}, {{SetOperatorsTest}}, {{TemporalTableFunctionJoinTest}} 
> and probably some integration tests
> An example of failure
> {noformat}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> FlinkLogicalJoin(condition=[true], joinType=[inner])
> :- FlinkLogicalCalc(select=[c])
> :  +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, 
> default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, 
> b, c])
> +- FlinkLogicalCalc(select=[d], where=[>(e, 20)])
>    +- 
> FlinkLogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)],
>  rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:70)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
>       at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>       at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>       at scala.collection.Iterator.foreach(Iterator.scala:937)
>       at scala.collection.Iterator.foreach$(Iterator.scala:937)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>       at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>       at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>       at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
>       at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
>       at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
>       at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
>       at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
>       at scala.collection.immutable.List.foreach(List.scala:388)
>       at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
>       at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
>       at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:982)
>       at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:896)
>       at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.verifyExecPlan(TableTestBase.scala:658)
>       at 
> org.apache.flink.table.planner.plan.batch.table.CorrelateTest.testCorrelateWithMultiFilterAndWithoutCalcMergeRules(CorrelateTest.scala:106)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258)
>       at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>       at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>       at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>       at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>       at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>       at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>       at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>       at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>       at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
>       at 
> com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
>       at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
>       at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
>       at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
> Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There 
> are not enough rules to produce a node with desired properties: 
> convention=BATCH_PHYSICAL, FlinkRelDistributionTraitDef=any, sort=[].
> Missing conversion is FlinkLogicalTableFunctionScan[convention: LOGICAL -> 
> BATCH_PHYSICAL]
> There is 1 empty subset: rel#394:RelSubset#8.BATCH_PHYSICAL.any.[], the 
> relevant part of the original plan is as follows
> 377:FlinkLogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)],
>  rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])
> Root: rel#388:RelSubset#10.BATCH_PHYSICAL.any.[]
> Original rel:
> FlinkLogicalJoin(subset=[rel#344:RelSubset#5.LOGICAL.any.[]], 
> condition=[true], joinType=[inner]): rowcount = 1.0E8, cumulative cost = 
> {1.0E8 rows, 1.00000001E8 cpu, 1.200000001E9 io, 0.0 network, 0.0 memory}, id 
> = 356
>   FlinkLogicalCalc(subset=[rel#354:RelSubset#1.LOGICAL.any.[]], select=[c]): 
> rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 0.0 cpu, 0.0 io, 0.0 
> network, 0.0 memory}, id = 357
>     
> FlinkLogicalLegacyTableSourceScan(subset=[rel#347:RelSubset#0.LOGICAL.any.[]],
>  table=[[default_catalog, default_database, MyTable, source: 
> [TestTableSource(a, b, c)]]], fields=[a, b, c]): rowcount = 1.0E8, cumulative 
> cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}, id = 346
>   FlinkLogicalCalc(subset=[rel#355:RelSubset#4.LOGICAL.any.[]], select=[d], 
> where=[>(e, 20)]): rowcount = 1.0, cumulative cost = {1.0 rows, 0.0 cpu, 0.0 
> io, 0.0 network, 0.0 memory}, id = 363
>     
> FlinkLogicalTableFunctionScan(subset=[rel#350:RelSubset#2.LOGICAL.any.[]], 
> invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], 
> rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)]): rowcount = 1.0, 
> cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 
> 349
> Sets:
> Set#6, type: RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)
>       rel#380:RelSubset#6.LOGICAL.any.[], best=rel#346
>               
> rel#346:FlinkLogicalLegacyTableSourceScan.LOGICAL.any.[](table=[default_catalog,
>  default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b, 
> c), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 
> network, 0.0 memory}
>       rel#391:RelSubset#6.BATCH_PHYSICAL.any.[], best=rel#390
>               
> rel#390:BatchPhysicalLegacyTableSourceScan.BATCH_PHYSICAL.any.[](table=[default_catalog,
>  default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b, 
> c), rowcount=1.0E8, cumulative cost={1.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 
> network, 0.0 memory}
> Set#7, type: RecordType(VARCHAR(2147483647) c)
>       rel#382:RelSubset#7.LOGICAL.any.[], best=rel#381
>               
> rel#381:FlinkLogicalCalc.LOGICAL.any.[](input=RelSubset#380,select=c), 
> rowcount=1.0E8, cumulative cost={2.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 
> network, 0.0 memory}
>       rel#393:RelSubset#7.BATCH_PHYSICAL.any.[], best=rel#392
>               
> rel#392:BatchPhysicalCalc.BATCH_PHYSICAL.any.[](input=RelSubset#391,select=c),
>  rowcount=1.0E8, cumulative cost={2.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 network, 
> 0.0 memory}
> Set#8, type: RecordType(VARCHAR(2147483647) d, INTEGER e)
>       rel#383:RelSubset#8.LOGICAL.any.[], best=rel#377
>               
> rel#377:FlinkLogicalTableFunctionScan.LOGICAL.any.[](invocation=*org.apache.flink.table.planner.utils.TableFunc0*($2),rowType=RecordType(VARCHAR(2147483647)
>  d, INTEGER e)), rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 0.0 io, 
> 0.0 network, 0.0 memory}
>       rel#394:RelSubset#8.BATCH_PHYSICAL.any.[], best=null
> Set#9, type: RecordType(VARCHAR(2147483647) d)
>       rel#385:RelSubset#9.LOGICAL.any.[], best=rel#384
>               
> rel#384:FlinkLogicalCalc.LOGICAL.any.[](input=RelSubset#383,select=d,where=>(e,
>  20)), rowcount=1.0, cumulative cost={2.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 
> 0.0 memory}
>       rel#396:RelSubset#9.BATCH_PHYSICAL.any.[], best=null
>               
> rel#395:BatchPhysicalCalc.BATCH_PHYSICAL.any.[](input=RelSubset#394,select=d,where=>(e,
>  20)), rowcount=1.0, cumulative cost={inf}
>               
> rel#399:AbstractConverter.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]),
>  rowcount=1.0, cumulative cost={inf}
>               
> rel#403:BatchPhysicalExchange.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,distribution=broadcast),
>  rowcount=1.0, cumulative cost={inf}
>       rel#398:RelSubset#9.BATCH_PHYSICAL.broadcast.[], best=null
>               
> rel#399:AbstractConverter.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]),
>  rowcount=1.0, cumulative cost={inf}
>               
> rel#403:BatchPhysicalExchange.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,distribution=broadcast),
>  rowcount=1.0, cumulative cost={inf}
> Set#10, type: RecordType(VARCHAR(2147483647) c, VARCHAR(2147483647) d)
>       rel#387:RelSubset#10.LOGICAL.any.[], best=rel#386
>               
> rel#386:FlinkLogicalJoin.LOGICAL.any.[](left=RelSubset#382,right=RelSubset#385,condition=true,joinType=inner),
>  rowcount=1.0E8, cumulative cost={3.00000002E8 rows, 2.00000002E8 cpu, 
> 3.600000001E9 io, 0.0 network, 0.0 memory}
>       rel#388:RelSubset#10.BATCH_PHYSICAL.any.[], best=null
>               
> rel#389:AbstractConverter.BATCH_PHYSICAL.any.[](input=RelSubset#387,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]),
>  rowcount=1.0E8, cumulative cost={inf}
>               
> rel#400:BatchPhysicalNestedLoopJoin.BATCH_PHYSICAL.any.[](left=RelSubset#393,right=RelSubset#398,joinType=InnerJoin,where=true,select=c,
>  d,build=right), rowcount=1.0E8, cumulative cost={inf}
> Graphviz:
> digraph G {
>       root [style=filled,label="Root"];
>       subgraph cluster6{
>               label="Set 6 RecordType(INTEGER a, BIGINT b, 
> VARCHAR(2147483647) c)";
>               rel346 
> [label="rel#346:FlinkLogicalLegacyTableSourceScan\ntable=[default_catalog, 
> default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b, 
> c\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 
> memory}",color=blue,shape=box]
>               rel390 
> [label="rel#390:BatchPhysicalLegacyTableSourceScan\ntable=[default_catalog, 
> default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b, 
> c\nrows=1.0E8, cost={1.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 network, 0.0 
> memory}",color=blue,shape=box]
>               subset380 [label="rel#380:RelSubset#6.LOGICAL.any.[]"]
>               subset391 [label="rel#391:RelSubset#6.BATCH_PHYSICAL.any.[]"]
>       }
>       subgraph cluster7{
>               label="Set 7 RecordType(VARCHAR(2147483647) c)";
>               rel381 
> [label="rel#381:FlinkLogicalCalc\ninput=RelSubset#380,select=c\nrows=1.0E8, 
> cost={2.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 
> memory}",color=blue,shape=box]
>               rel392 
> [label="rel#392:BatchPhysicalCalc\ninput=RelSubset#391,select=c\nrows=1.0E8, 
> cost={2.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 network, 0.0 
> memory}",color=blue,shape=box]
>               subset382 [label="rel#382:RelSubset#7.LOGICAL.any.[]"]
>               subset393 [label="rel#393:RelSubset#7.BATCH_PHYSICAL.any.[]"]
>       }
>       subgraph cluster8{
>               label="Set 8 RecordType(VARCHAR(2147483647) d, INTEGER e)";
>               rel377 
> [label="rel#377:FlinkLogicalTableFunctionScan\ninvocation=*org.apache.flink.table.planner.utils.TableFunc0*($2),rowType=RecordType(VARCHAR(2147483647)
>  d, INTEGER e)\nrows=1.0, cost={1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 
> memory}",color=blue,shape=box]
>               subset383 [label="rel#383:RelSubset#8.LOGICAL.any.[]"]
>               subset394 
> [label="rel#394:RelSubset#8.BATCH_PHYSICAL.any.[]",color=red]
>       }
>       subgraph cluster9{
>               label="Set 9 RecordType(VARCHAR(2147483647) d)";
>               rel384 
> [label="rel#384:FlinkLogicalCalc\ninput=RelSubset#383,select=d,where=>(e, 
> 20)\nrows=1.0, cost={2.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 
> memory}",color=blue,shape=box]
>               rel395 
> [label="rel#395:BatchPhysicalCalc\ninput=RelSubset#394,select=d,where=>(e, 
> 20)\nrows=1.0, cost={inf}",shape=box]
>               rel399 
> [label="rel#399:AbstractConverter\ninput=RelSubset#396,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]\nrows=1.0,
>  cost={inf}",shape=box]
>               rel403 
> [label="rel#403:BatchPhysicalExchange\ninput=RelSubset#396,distribution=broadcast\nrows=1.0,
>  cost={inf}",shape=box]
>               subset385 [label="rel#385:RelSubset#9.LOGICAL.any.[]"]
>               subset396 [label="rel#396:RelSubset#9.BATCH_PHYSICAL.any.[]"]
>               subset398 
> [label="rel#398:RelSubset#9.BATCH_PHYSICAL.broadcast.[]"]
>               subset396 -> subset398; }
>       subgraph cluster10{
>               label="Set 10 RecordType(VARCHAR(2147483647) c, 
> VARCHAR(2147483647) d)";
>               rel386 
> [label="rel#386:FlinkLogicalJoin\nleft=RelSubset#382,right=RelSubset#385,condition=true,joinType=inner\nrows=1.0E8,
>  cost={3.00000002E8 rows, 2.00000002E8 cpu, 3.600000001E9 io, 0.0 network, 
> 0.0 memory}",color=blue,shape=box]
>               rel389 
> [label="rel#389:AbstractConverter\ninput=RelSubset#387,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]\nrows=1.0E8,
>  cost={inf}",shape=box]
>               rel400 
> [label="rel#400:BatchPhysicalNestedLoopJoin\nleft=RelSubset#393,right=RelSubset#398,joinType=InnerJoin,where=true,select=c,
>  d,build=right\nrows=1.0E8, cost={inf}",shape=box]
>               subset387 [label="rel#387:RelSubset#10.LOGICAL.any.[]"]
>               subset388 [label="rel#388:RelSubset#10.BATCH_PHYSICAL.any.[]"]
>       }
>       root -> subset388;
>       subset380 -> rel346[color=blue];
>       subset391 -> rel390[color=blue];
>       subset382 -> rel381[color=blue]; rel381 -> subset380[color=blue];
>       subset393 -> rel392[color=blue]; rel392 -> subset391[color=blue];
>       subset383 -> rel377[color=blue];
>       subset385 -> rel384[color=blue]; rel384 -> subset383[color=blue];
>       subset396 -> rel395; rel395 -> subset394;
>       subset398 -> rel399; rel399 -> subset396;
>       subset398 -> rel403; rel403 -> subset396;
>       subset387 -> rel386[color=blue]; rel386 -> 
> subset382[color=blue,label="0"]; rel386 -> subset385[color=blue,label="1"];
>       subset388 -> rel389; rel389 -> subset387;
>       subset388 -> rel400; rel400 -> subset393[label="0"]; rel400 -> 
> subset398[label="1"];
> }
>       at 
> org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:709)
>       at 
> org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:390)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:533)
>       at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:317)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
>       ... 55 more
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to