[
https://issues.apache.org/jira/browse/FLINK-29804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sergey Nuyanzin updated FLINK-29804:
------------------------------------
Description:
It was introduced in https://issues.apache.org/jira/browse/CALCITE-4668
and leads to issues in a number of tests like {{SetOperatorsTest}},
{{CorrelateTest}}, {{SetOperatorsTest}}, {{TemporalTableFunctionJoinTest}} and
probably some integration tests
An example of failure
{noformat}
org.apache.flink.table.api.TableException: Cannot generate a valid execution
plan for the given query:
FlinkLogicalJoin(condition=[true], joinType=[inner])
:- FlinkLogicalCalc(select=[c])
: +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b,
c])
+- FlinkLogicalCalc(select=[d], where=[>(e, 20)])
+-
FlinkLogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)],
rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])
This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:70)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
at scala.collection.immutable.List.foreach(List.scala:388)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
at
org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:982)
at
org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:896)
at
org.apache.flink.table.planner.utils.TableTestUtilBase.verifyExecPlan(TableTestBase.scala:658)
at
org.apache.flink.table.planner.plan.batch.table.CorrelateTest.testCorrelateWithMultiFilterAndWithoutCalcMergeRules(CorrelateTest.scala:106)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
at
com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are
not enough rules to produce a node with desired properties:
convention=BATCH_PHYSICAL, FlinkRelDistributionTraitDef=any, sort=[].
Missing conversion is FlinkLogicalTableFunctionScan[convention: LOGICAL ->
BATCH_PHYSICAL]
There is 1 empty subset: rel#394:RelSubset#8.BATCH_PHYSICAL.any.[], the
relevant part of the original plan is as follows
377:FlinkLogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)],
rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])
Root: rel#388:RelSubset#10.BATCH_PHYSICAL.any.[]
Original rel:
FlinkLogicalJoin(subset=[rel#344:RelSubset#5.LOGICAL.any.[]], condition=[true],
joinType=[inner]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows,
1.00000001E8 cpu, 1.200000001E9 io, 0.0 network, 0.0 memory}, id = 356
FlinkLogicalCalc(subset=[rel#354:RelSubset#1.LOGICAL.any.[]], select=[c]):
rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 0.0 cpu, 0.0 io, 0.0 network,
0.0 memory}, id = 357
FlinkLogicalLegacyTableSourceScan(subset=[rel#347:RelSubset#0.LOGICAL.any.[]],
table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a,
b, c)]]], fields=[a, b, c]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows,
1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}, id = 346
FlinkLogicalCalc(subset=[rel#355:RelSubset#4.LOGICAL.any.[]], select=[d],
where=[>(e, 20)]): rowcount = 1.0, cumulative cost = {1.0 rows, 0.0 cpu, 0.0
io, 0.0 network, 0.0 memory}, id = 363
FlinkLogicalTableFunctionScan(subset=[rel#350:RelSubset#2.LOGICAL.any.[]],
invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)],
rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)]): rowcount = 1.0,
cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 349
Sets:
Set#6, type: RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)
rel#380:RelSubset#6.LOGICAL.any.[], best=rel#346
rel#346:FlinkLogicalLegacyTableSourceScan.LOGICAL.any.[](table=[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b,
c), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0
network, 0.0 memory}
rel#391:RelSubset#6.BATCH_PHYSICAL.any.[], best=rel#390
rel#390:BatchPhysicalLegacyTableSourceScan.BATCH_PHYSICAL.any.[](table=[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b,
c), rowcount=1.0E8, cumulative cost={1.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0
network, 0.0 memory}
Set#7, type: RecordType(VARCHAR(2147483647) c)
rel#382:RelSubset#7.LOGICAL.any.[], best=rel#381
rel#381:FlinkLogicalCalc.LOGICAL.any.[](input=RelSubset#380,select=c),
rowcount=1.0E8, cumulative cost={2.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network,
0.0 memory}
rel#393:RelSubset#7.BATCH_PHYSICAL.any.[], best=rel#392
rel#392:BatchPhysicalCalc.BATCH_PHYSICAL.any.[](input=RelSubset#391,select=c),
rowcount=1.0E8, cumulative cost={2.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 network,
0.0 memory}
Set#8, type: RecordType(VARCHAR(2147483647) d, INTEGER e)
rel#383:RelSubset#8.LOGICAL.any.[], best=rel#377
rel#377:FlinkLogicalTableFunctionScan.LOGICAL.any.[](invocation=*org.apache.flink.table.planner.utils.TableFunc0*($2),rowType=RecordType(VARCHAR(2147483647)
d, INTEGER e)), rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 0.0 io, 0.0
network, 0.0 memory}
rel#394:RelSubset#8.BATCH_PHYSICAL.any.[], best=null
Set#9, type: RecordType(VARCHAR(2147483647) d)
rel#385:RelSubset#9.LOGICAL.any.[], best=rel#384
rel#384:FlinkLogicalCalc.LOGICAL.any.[](input=RelSubset#383,select=d,where=>(e,
20)), rowcount=1.0, cumulative cost={2.0 rows, 1.0 cpu, 0.0 io, 0.0 network,
0.0 memory}
rel#396:RelSubset#9.BATCH_PHYSICAL.any.[], best=null
rel#395:BatchPhysicalCalc.BATCH_PHYSICAL.any.[](input=RelSubset#394,select=d,where=>(e,
20)), rowcount=1.0, cumulative cost={inf}
rel#399:AbstractConverter.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]),
rowcount=1.0, cumulative cost={inf}
rel#403:BatchPhysicalExchange.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,distribution=broadcast),
rowcount=1.0, cumulative cost={inf}
rel#398:RelSubset#9.BATCH_PHYSICAL.broadcast.[], best=null
rel#399:AbstractConverter.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]),
rowcount=1.0, cumulative cost={inf}
rel#403:BatchPhysicalExchange.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,distribution=broadcast),
rowcount=1.0, cumulative cost={inf}
Set#10, type: RecordType(VARCHAR(2147483647) c, VARCHAR(2147483647) d)
rel#387:RelSubset#10.LOGICAL.any.[], best=rel#386
rel#386:FlinkLogicalJoin.LOGICAL.any.[](left=RelSubset#382,right=RelSubset#385,condition=true,joinType=inner),
rowcount=1.0E8, cumulative cost={3.00000002E8 rows, 2.00000002E8 cpu,
3.600000001E9 io, 0.0 network, 0.0 memory}
rel#388:RelSubset#10.BATCH_PHYSICAL.any.[], best=null
rel#389:AbstractConverter.BATCH_PHYSICAL.any.[](input=RelSubset#387,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]),
rowcount=1.0E8, cumulative cost={inf}
rel#400:BatchPhysicalNestedLoopJoin.BATCH_PHYSICAL.any.[](left=RelSubset#393,right=RelSubset#398,joinType=InnerJoin,where=true,select=c,
d,build=right), rowcount=1.0E8, cumulative cost={inf}
Graphviz:
digraph G {
root [style=filled,label="Root"];
subgraph cluster6{
label="Set 6 RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c)";
rel346
[label="rel#346:FlinkLogicalLegacyTableSourceScan\ntable=[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b,
c\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0
memory}",color=blue,shape=box]
rel390
[label="rel#390:BatchPhysicalLegacyTableSourceScan\ntable=[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b,
c\nrows=1.0E8, cost={1.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 network, 0.0
memory}",color=blue,shape=box]
subset380 [label="rel#380:RelSubset#6.LOGICAL.any.[]"]
subset391 [label="rel#391:RelSubset#6.BATCH_PHYSICAL.any.[]"]
}
subgraph cluster7{
label="Set 7 RecordType(VARCHAR(2147483647) c)";
rel381
[label="rel#381:FlinkLogicalCalc\ninput=RelSubset#380,select=c\nrows=1.0E8,
cost={2.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0
memory}",color=blue,shape=box]
rel392
[label="rel#392:BatchPhysicalCalc\ninput=RelSubset#391,select=c\nrows=1.0E8,
cost={2.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 network, 0.0
memory}",color=blue,shape=box]
subset382 [label="rel#382:RelSubset#7.LOGICAL.any.[]"]
subset393 [label="rel#393:RelSubset#7.BATCH_PHYSICAL.any.[]"]
}
subgraph cluster8{
label="Set 8 RecordType(VARCHAR(2147483647) d, INTEGER e)";
rel377
[label="rel#377:FlinkLogicalTableFunctionScan\ninvocation=*org.apache.flink.table.planner.utils.TableFunc0*($2),rowType=RecordType(VARCHAR(2147483647)
d, INTEGER e)\nrows=1.0, cost={1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0
memory}",color=blue,shape=box]
subset383 [label="rel#383:RelSubset#8.LOGICAL.any.[]"]
subset394
[label="rel#394:RelSubset#8.BATCH_PHYSICAL.any.[]",color=red]
}
subgraph cluster9{
label="Set 9 RecordType(VARCHAR(2147483647) d)";
rel384
[label="rel#384:FlinkLogicalCalc\ninput=RelSubset#383,select=d,where=>(e,
20)\nrows=1.0, cost={2.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0
memory}",color=blue,shape=box]
rel395
[label="rel#395:BatchPhysicalCalc\ninput=RelSubset#394,select=d,where=>(e,
20)\nrows=1.0, cost={inf}",shape=box]
rel399
[label="rel#399:AbstractConverter\ninput=RelSubset#396,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]\nrows=1.0,
cost={inf}",shape=box]
rel403
[label="rel#403:BatchPhysicalExchange\ninput=RelSubset#396,distribution=broadcast\nrows=1.0,
cost={inf}",shape=box]
subset385 [label="rel#385:RelSubset#9.LOGICAL.any.[]"]
subset396 [label="rel#396:RelSubset#9.BATCH_PHYSICAL.any.[]"]
subset398
[label="rel#398:RelSubset#9.BATCH_PHYSICAL.broadcast.[]"]
subset396 -> subset398; }
subgraph cluster10{
label="Set 10 RecordType(VARCHAR(2147483647) c,
VARCHAR(2147483647) d)";
rel386
[label="rel#386:FlinkLogicalJoin\nleft=RelSubset#382,right=RelSubset#385,condition=true,joinType=inner\nrows=1.0E8,
cost={3.00000002E8 rows, 2.00000002E8 cpu, 3.600000001E9 io, 0.0 network, 0.0
memory}",color=blue,shape=box]
rel389
[label="rel#389:AbstractConverter\ninput=RelSubset#387,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]\nrows=1.0E8,
cost={inf}",shape=box]
rel400
[label="rel#400:BatchPhysicalNestedLoopJoin\nleft=RelSubset#393,right=RelSubset#398,joinType=InnerJoin,where=true,select=c,
d,build=right\nrows=1.0E8, cost={inf}",shape=box]
subset387 [label="rel#387:RelSubset#10.LOGICAL.any.[]"]
subset388 [label="rel#388:RelSubset#10.BATCH_PHYSICAL.any.[]"]
}
root -> subset388;
subset380 -> rel346[color=blue];
subset391 -> rel390[color=blue];
subset382 -> rel381[color=blue]; rel381 -> subset380[color=blue];
subset393 -> rel392[color=blue]; rel392 -> subset391[color=blue];
subset383 -> rel377[color=blue];
subset385 -> rel384[color=blue]; rel384 -> subset383[color=blue];
subset396 -> rel395; rel395 -> subset394;
subset398 -> rel399; rel399 -> subset396;
subset398 -> rel403; rel403 -> subset396;
subset387 -> rel386[color=blue]; rel386 ->
subset382[color=blue,label="0"]; rel386 -> subset385[color=blue,label="1"];
subset388 -> rel389; rel389 -> subset387;
subset388 -> rel400; rel400 -> subset393[label="0"]; rel400 ->
subset398[label="1"];
}
at
org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:709)
at
org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:390)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:533)
at
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:317)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
... 55 more
{noformat}
was:
It was introduced in https://issues.apache.org/jira/browse/CALCITE-4668
and leads to issues in a number of tests like {{SetOperatorsTest}},
{{CorrelateTest}}, {{SetOperatorsTest}}, {{TemporalTableFunctionJoinTest}} and
probably some integration tests
> Support convertion of Correlate to Join if correlation variable is unused
> introduced in Calcite 1.28
> ----------------------------------------------------------------------------------------------------
>
> Key: FLINK-29804
> URL: https://issues.apache.org/jira/browse/FLINK-29804
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.17.0
> Reporter: Sergey Nuyanzin
> Priority: Major
>
> It was introduced in https://issues.apache.org/jira/browse/CALCITE-4668
> and leads to issues in a number of tests like {{SetOperatorsTest}},
> {{CorrelateTest}}, {{SetOperatorsTest}}, {{TemporalTableFunctionJoinTest}}
> and probably some integration tests
> An example of failure
> {noformat}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution
> plan for the given query:
> FlinkLogicalJoin(condition=[true], joinType=[inner])
> :- FlinkLogicalCalc(select=[c])
> : +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog,
> default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a,
> b, c])
> +- FlinkLogicalCalc(select=[d], where=[>(e, 20)])
> +-
> FlinkLogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)],
> rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL
> features.
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:70)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
> at scala.collection.immutable.List.foreach(List.scala:388)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
> at
> org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:982)
> at
> org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:896)
> at
> org.apache.flink.table.planner.utils.TableTestUtilBase.verifyExecPlan(TableTestBase.scala:658)
> at
> org.apache.flink.table.planner.plan.batch.table.CorrelateTest.testCorrelateWithMultiFilterAndWithoutCalcMergeRules(CorrelateTest.scala:106)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
> at
> com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
> Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There
> are not enough rules to produce a node with desired properties:
> convention=BATCH_PHYSICAL, FlinkRelDistributionTraitDef=any, sort=[].
> Missing conversion is FlinkLogicalTableFunctionScan[convention: LOGICAL ->
> BATCH_PHYSICAL]
> There is 1 empty subset: rel#394:RelSubset#8.BATCH_PHYSICAL.any.[], the
> relevant part of the original plan is as follows
> 377:FlinkLogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)],
> rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])
> Root: rel#388:RelSubset#10.BATCH_PHYSICAL.any.[]
> Original rel:
> FlinkLogicalJoin(subset=[rel#344:RelSubset#5.LOGICAL.any.[]],
> condition=[true], joinType=[inner]): rowcount = 1.0E8, cumulative cost =
> {1.0E8 rows, 1.00000001E8 cpu, 1.200000001E9 io, 0.0 network, 0.0 memory}, id
> = 356
> FlinkLogicalCalc(subset=[rel#354:RelSubset#1.LOGICAL.any.[]], select=[c]):
> rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 0.0 cpu, 0.0 io, 0.0
> network, 0.0 memory}, id = 357
>
> FlinkLogicalLegacyTableSourceScan(subset=[rel#347:RelSubset#0.LOGICAL.any.[]],
> table=[[default_catalog, default_database, MyTable, source:
> [TestTableSource(a, b, c)]]], fields=[a, b, c]): rowcount = 1.0E8, cumulative
> cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}, id = 346
> FlinkLogicalCalc(subset=[rel#355:RelSubset#4.LOGICAL.any.[]], select=[d],
> where=[>(e, 20)]): rowcount = 1.0, cumulative cost = {1.0 rows, 0.0 cpu, 0.0
> io, 0.0 network, 0.0 memory}, id = 363
>
> FlinkLogicalTableFunctionScan(subset=[rel#350:RelSubset#2.LOGICAL.any.[]],
> invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)],
> rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)]): rowcount = 1.0,
> cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id =
> 349
> Sets:
> Set#6, type: RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)
> rel#380:RelSubset#6.LOGICAL.any.[], best=rel#346
>
> rel#346:FlinkLogicalLegacyTableSourceScan.LOGICAL.any.[](table=[default_catalog,
> default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b,
> c), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0
> network, 0.0 memory}
> rel#391:RelSubset#6.BATCH_PHYSICAL.any.[], best=rel#390
>
> rel#390:BatchPhysicalLegacyTableSourceScan.BATCH_PHYSICAL.any.[](table=[default_catalog,
> default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b,
> c), rowcount=1.0E8, cumulative cost={1.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0
> network, 0.0 memory}
> Set#7, type: RecordType(VARCHAR(2147483647) c)
> rel#382:RelSubset#7.LOGICAL.any.[], best=rel#381
>
> rel#381:FlinkLogicalCalc.LOGICAL.any.[](input=RelSubset#380,select=c),
> rowcount=1.0E8, cumulative cost={2.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0
> network, 0.0 memory}
> rel#393:RelSubset#7.BATCH_PHYSICAL.any.[], best=rel#392
>
> rel#392:BatchPhysicalCalc.BATCH_PHYSICAL.any.[](input=RelSubset#391,select=c),
> rowcount=1.0E8, cumulative cost={2.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 network,
> 0.0 memory}
> Set#8, type: RecordType(VARCHAR(2147483647) d, INTEGER e)
> rel#383:RelSubset#8.LOGICAL.any.[], best=rel#377
>
> rel#377:FlinkLogicalTableFunctionScan.LOGICAL.any.[](invocation=*org.apache.flink.table.planner.utils.TableFunc0*($2),rowType=RecordType(VARCHAR(2147483647)
> d, INTEGER e)), rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 0.0 io,
> 0.0 network, 0.0 memory}
> rel#394:RelSubset#8.BATCH_PHYSICAL.any.[], best=null
> Set#9, type: RecordType(VARCHAR(2147483647) d)
> rel#385:RelSubset#9.LOGICAL.any.[], best=rel#384
>
> rel#384:FlinkLogicalCalc.LOGICAL.any.[](input=RelSubset#383,select=d,where=>(e,
> 20)), rowcount=1.0, cumulative cost={2.0 rows, 1.0 cpu, 0.0 io, 0.0 network,
> 0.0 memory}
> rel#396:RelSubset#9.BATCH_PHYSICAL.any.[], best=null
>
> rel#395:BatchPhysicalCalc.BATCH_PHYSICAL.any.[](input=RelSubset#394,select=d,where=>(e,
> 20)), rowcount=1.0, cumulative cost={inf}
>
> rel#399:AbstractConverter.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]),
> rowcount=1.0, cumulative cost={inf}
>
> rel#403:BatchPhysicalExchange.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,distribution=broadcast),
> rowcount=1.0, cumulative cost={inf}
> rel#398:RelSubset#9.BATCH_PHYSICAL.broadcast.[], best=null
>
> rel#399:AbstractConverter.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]),
> rowcount=1.0, cumulative cost={inf}
>
> rel#403:BatchPhysicalExchange.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,distribution=broadcast),
> rowcount=1.0, cumulative cost={inf}
> Set#10, type: RecordType(VARCHAR(2147483647) c, VARCHAR(2147483647) d)
> rel#387:RelSubset#10.LOGICAL.any.[], best=rel#386
>
> rel#386:FlinkLogicalJoin.LOGICAL.any.[](left=RelSubset#382,right=RelSubset#385,condition=true,joinType=inner),
> rowcount=1.0E8, cumulative cost={3.00000002E8 rows, 2.00000002E8 cpu,
> 3.600000001E9 io, 0.0 network, 0.0 memory}
> rel#388:RelSubset#10.BATCH_PHYSICAL.any.[], best=null
>
> rel#389:AbstractConverter.BATCH_PHYSICAL.any.[](input=RelSubset#387,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]),
> rowcount=1.0E8, cumulative cost={inf}
>
> rel#400:BatchPhysicalNestedLoopJoin.BATCH_PHYSICAL.any.[](left=RelSubset#393,right=RelSubset#398,joinType=InnerJoin,where=true,select=c,
> d,build=right), rowcount=1.0E8, cumulative cost={inf}
> Graphviz:
> digraph G {
> root [style=filled,label="Root"];
> subgraph cluster6{
> label="Set 6 RecordType(INTEGER a, BIGINT b,
> VARCHAR(2147483647) c)";
> rel346
> [label="rel#346:FlinkLogicalLegacyTableSourceScan\ntable=[default_catalog,
> default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b,
> c\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0
> memory}",color=blue,shape=box]
> rel390
> [label="rel#390:BatchPhysicalLegacyTableSourceScan\ntable=[default_catalog,
> default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b,
> c\nrows=1.0E8, cost={1.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 network, 0.0
> memory}",color=blue,shape=box]
> subset380 [label="rel#380:RelSubset#6.LOGICAL.any.[]"]
> subset391 [label="rel#391:RelSubset#6.BATCH_PHYSICAL.any.[]"]
> }
> subgraph cluster7{
> label="Set 7 RecordType(VARCHAR(2147483647) c)";
> rel381
> [label="rel#381:FlinkLogicalCalc\ninput=RelSubset#380,select=c\nrows=1.0E8,
> cost={2.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0
> memory}",color=blue,shape=box]
> rel392
> [label="rel#392:BatchPhysicalCalc\ninput=RelSubset#391,select=c\nrows=1.0E8,
> cost={2.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 network, 0.0
> memory}",color=blue,shape=box]
> subset382 [label="rel#382:RelSubset#7.LOGICAL.any.[]"]
> subset393 [label="rel#393:RelSubset#7.BATCH_PHYSICAL.any.[]"]
> }
> subgraph cluster8{
> label="Set 8 RecordType(VARCHAR(2147483647) d, INTEGER e)";
> rel377
> [label="rel#377:FlinkLogicalTableFunctionScan\ninvocation=*org.apache.flink.table.planner.utils.TableFunc0*($2),rowType=RecordType(VARCHAR(2147483647)
> d, INTEGER e)\nrows=1.0, cost={1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0
> memory}",color=blue,shape=box]
> subset383 [label="rel#383:RelSubset#8.LOGICAL.any.[]"]
> subset394
> [label="rel#394:RelSubset#8.BATCH_PHYSICAL.any.[]",color=red]
> }
> subgraph cluster9{
> label="Set 9 RecordType(VARCHAR(2147483647) d)";
> rel384
> [label="rel#384:FlinkLogicalCalc\ninput=RelSubset#383,select=d,where=>(e,
> 20)\nrows=1.0, cost={2.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0
> memory}",color=blue,shape=box]
> rel395
> [label="rel#395:BatchPhysicalCalc\ninput=RelSubset#394,select=d,where=>(e,
> 20)\nrows=1.0, cost={inf}",shape=box]
> rel399
> [label="rel#399:AbstractConverter\ninput=RelSubset#396,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]\nrows=1.0,
> cost={inf}",shape=box]
> rel403
> [label="rel#403:BatchPhysicalExchange\ninput=RelSubset#396,distribution=broadcast\nrows=1.0,
> cost={inf}",shape=box]
> subset385 [label="rel#385:RelSubset#9.LOGICAL.any.[]"]
> subset396 [label="rel#396:RelSubset#9.BATCH_PHYSICAL.any.[]"]
> subset398
> [label="rel#398:RelSubset#9.BATCH_PHYSICAL.broadcast.[]"]
> subset396 -> subset398; }
> subgraph cluster10{
> label="Set 10 RecordType(VARCHAR(2147483647) c,
> VARCHAR(2147483647) d)";
> rel386
> [label="rel#386:FlinkLogicalJoin\nleft=RelSubset#382,right=RelSubset#385,condition=true,joinType=inner\nrows=1.0E8,
> cost={3.00000002E8 rows, 2.00000002E8 cpu, 3.600000001E9 io, 0.0 network,
> 0.0 memory}",color=blue,shape=box]
> rel389
> [label="rel#389:AbstractConverter\ninput=RelSubset#387,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]\nrows=1.0E8,
> cost={inf}",shape=box]
> rel400
> [label="rel#400:BatchPhysicalNestedLoopJoin\nleft=RelSubset#393,right=RelSubset#398,joinType=InnerJoin,where=true,select=c,
> d,build=right\nrows=1.0E8, cost={inf}",shape=box]
> subset387 [label="rel#387:RelSubset#10.LOGICAL.any.[]"]
> subset388 [label="rel#388:RelSubset#10.BATCH_PHYSICAL.any.[]"]
> }
> root -> subset388;
> subset380 -> rel346[color=blue];
> subset391 -> rel390[color=blue];
> subset382 -> rel381[color=blue]; rel381 -> subset380[color=blue];
> subset393 -> rel392[color=blue]; rel392 -> subset391[color=blue];
> subset383 -> rel377[color=blue];
> subset385 -> rel384[color=blue]; rel384 -> subset383[color=blue];
> subset396 -> rel395; rel395 -> subset394;
> subset398 -> rel399; rel399 -> subset396;
> subset398 -> rel403; rel403 -> subset396;
> subset387 -> rel386[color=blue]; rel386 ->
> subset382[color=blue,label="0"]; rel386 -> subset385[color=blue,label="1"];
> subset388 -> rel389; rel389 -> subset387;
> subset388 -> rel400; rel400 -> subset393[label="0"]; rel400 ->
> subset398[label="1"];
> }
> at
> org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:709)
> at
> org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:390)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:533)
> at
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:317)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
> ... 55 more
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)