[ 
https://issues.apache.org/jira/browse/FLINK-25447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he reassigned FLINK-25447:
----------------------------------

    Assignee: Zheng yunhong

> batch query cannot generate plan when a sorted view into multi sinks
> --------------------------------------------------------------------
>
>                 Key: FLINK-25447
>                 URL: https://issues.apache.org/jira/browse/FLINK-25447
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.14.2
>            Reporter: lincoln lee
>            Assignee: Zheng yunhong
>            Priority: Major
>             Fix For: 1.16.0
>
>
> A batch query  write a sorted view into multi sinks will get a cannot plan 
> exception
> {code}
>   @Test
>   def testSortedResultIntoMultiSinks(): Unit = {
>     util.tableEnv.executeSql(
>       s"""
>          |CREATE TABLE Src (
>          |  `a` INT,
>          |  `b` BIGINT,
>          |  `c` STRING,
>          |  `d` STRING,
>          |  `e` STRING
>          |) WITH (
>          |  'connector' = 'values',
>          |  'bounded' = 'true'
>          |)
>        """.stripMargin)
>     val query = "SELECT * FROM Src order by c"
>     val table = util.tableEnv.sqlQuery(query)
>     util.tableEnv.registerTable("sortedTable", table)
>     util.tableEnv.executeSql(
>       s"""
>          |CREATE TABLE sink1 (
>          |  `a` INT,
>          |  `b` BIGINT,
>          |  `c` STRING
>          |) WITH (
>          |  'connector' = 'filesystem',
>          |  'format' = 'testcsv',
>          |  'path' = '/tmp/test'
>          |)
>        """.stripMargin)
>     util.tableEnv.executeSql(
>       s"""
>          |CREATE TABLE sink2 (
>          |  `a` INT,
>          |  `b` BIGINT,
>          |  `c` STRING,
>          |  `d` STRING
>          |) WITH (
>          |  'connector' = 'filesystem',
>          |  'format' = 'testcsv',
>          |  'path' = '/tmp/test'
>          |)
>       """.stripMargin)
>     val stmtSet= util.tableEnv.createStatementSet()
>     stmtSet.addInsertSql(
>       "insert into sink1 select a, b, listagg(d) from sortedTable group by a, 
> b")
>     stmtSet.addInsertSql(
>       "insert into sink2 select a, b, c, d from sortedTable")
>     util.verifyExecPlan(stmtSet)
>   }
> {code}
> {code}
>   org.apache.flink.table.api.TableException: Cannot generate a valid 
> execution plan for the given query: 
>   LogicalSink(table=[default_catalog.default_database.sink2], fields=[a, b, 
> c, d])
>   +- LogicalProject(inputs=[0..3])
>      +- LogicalTableScan(table=[[IntermediateRelTable_0]])
>   This exception indicates that the query uses an unsupported SQL feature.
>   Please check the documentation for the set of currently supported SQL 
> features.
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:76)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>       at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>       at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>       at scala.collection.Iterator.foreach(Iterator.scala:937)
>       at scala.collection.Iterator.foreach$(Iterator.scala:937)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>       at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>       at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>       at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>       at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:88)
>       at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:59)
>       at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:47)
>       at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:47)
>       at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
>       at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:47)
>       at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:309)
>       at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:888)
>       at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:857)
>       at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.verifyExecPlan(TableTestBase.scala:640)
>       at 
> org.apache.flink.table.planner.plan.batch.sql.TableSinkTest.testMultiSinksSplitOnSortedResult(TableSinkTest.scala:188)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258)
>       at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>       at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>       at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>       at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>       at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>       at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>       at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>       at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>       at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>       at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
>       at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
>   Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There 
> are not enough rules to produce a node with desired properties: 
> convention=LOGICAL, FlinkRelDistributionTraitDef=any, sort=[].
>   Missing conversion is LogicalProject[convention: NONE -> LOGICAL]
>   There is 1 empty subset: rel#816:RelSubset#27.LOGICAL.any.[2 
> ASC-nulls-first], the relevant part of the original plan is as follows
>   806:LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
>     162:LogicalTableScan(subset=[rel#805:RelSubset#26.NONE.any.[]], 
> table=[[IntermediateRelTable_0]])
>   Root: rel#810:RelSubset#28.LOGICAL.any.[]
>   Original rel:
>   LogicalSort(subset=[rel#129:RelSubset#1.LOGICAL.any.[2 ASC-nulls-first]], 
> sort0=[$2], dir0=[ASC-nulls-first]): rowcount = 1.0E8, cumulative cost = 
> {1.0E8 rows, 3.684136148790473E10 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 
> 127
>     LogicalTableScan(subset=[rel#126:RelSubset#0.NONE.any.[]], 
> table=[[default_catalog, default_database, SmallTable]]): rowcount = 1.0E8, 
> cumulative cost = {1.0E8 rows, 1.00000001E8 cpu, 0.0 io, 0.0 network, 0.0 
> memory}, id = 1
>   Sets:
>   Set#26, type: RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
> VARCHAR(2147483647) d, VARCHAR(2147483647) e)
>       rel#805:RelSubset#26.NONE.any.[], best=null
>               
> rel#162:LogicalTableScan.NONE.any.[](table=[IntermediateRelTable_0]), 
> rowcount=1.0E8, cumulative cost={inf}
>       rel#813:RelSubset#26.LOGICAL.any.[], best=rel#812
>               
> rel#812:FlinkLogicalIntermediateTableScan.LOGICAL.any.[](table=[IntermediateRelTable_0],fields=a,
>  b, c, d, e), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 4.8E9 
> io, 0.0 network, 0.0 memory}
>   Set#27, type: RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
> VARCHAR(2147483647) d)
>       rel#807:RelSubset#27.NONE.any.[2 ASC-nulls-first], best=null
>               rel#806:LogicalProject.NONE.any.[2 
> ASC-nulls-first](input=RelSubset#805,inputs=0..3), rowcount=1.0E8, cumulative 
> cost={inf}
>       rel#815:RelSubset#27.NONE.any.[], best=null
>               rel#806:LogicalProject.NONE.any.[2 
> ASC-nulls-first](input=RelSubset#805,inputs=0..3), rowcount=1.0E8, cumulative 
> cost={inf}
>               
> rel#814:LogicalCalc.NONE.any.[](input=RelSubset#805,expr#0..4={inputs},proj#0..3={exprs}),
>  rowcount=1.0E8, cumulative cost={inf}
>       rel#816:RelSubset#27.LOGICAL.any.[2 ASC-nulls-first], best=null
>       rel#819:RelSubset#27.LOGICAL.any.[], best=rel#818
>               
> rel#818:FlinkLogicalCalc.LOGICAL.any.[](input=RelSubset#813,select=a, b, c, 
> d), rowcount=1.0E8, cumulative cost={2.0E8 rows, 1.0E8 cpu, 4.8E9 io, 0.0 
> network, 0.0 memory}
>   Set#28, type: RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
> VARCHAR(2147483647) d)
>       rel#809:RelSubset#28.NONE.any.[], best=null
>               
> rel#808:LogicalSink.NONE.any.[](input=RelSubset#807,table=default_catalog.default_database.sink2,fields=a,
>  b, c, d), rowcount=1.0E8, cumulative cost={inf}
>       rel#810:RelSubset#28.LOGICAL.any.[], best=null
>               
> rel#811:AbstractConverter.LOGICAL.any.[](input=RelSubset#809,convention=LOGICAL,FlinkRelDistributionTraitDef=any,sort=[]),
>  rowcount=1.0E8, cumulative cost={inf}
>               
> rel#817:FlinkLogicalSink.LOGICAL.any.[](input=RelSubset#816,table=default_catalog.default_database.sink2,fields=a,
>  b, c, d), rowcount=1.0E8, cumulative cost={inf}
>   Graphviz:
>   digraph G {
>       root [style=filled,label="Root"];
>       subgraph cluster26{
>               label="Set 26 RecordType(INTEGER a, BIGINT b, 
> VARCHAR(2147483647) c, VARCHAR(2147483647) d, VARCHAR(2147483647) e)";
>               rel162 
> [label="rel#162:LogicalTableScan\ntable=[IntermediateRelTable_0]\nrows=1.0E8, 
> cost={inf}",shape=box]
>               rel812 
> [label="rel#812:FlinkLogicalIntermediateTableScan\ntable=[IntermediateRelTable_0],fields=a,
>  b, c, d, e\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 4.8E9 io, 0.0 network, 
> 0.0 memory}",color=blue,shape=box]
>               subset805 [label="rel#805:RelSubset#26.NONE.any.[]"]
>               subset813 [label="rel#813:RelSubset#26.LOGICAL.any.[]"]
>       }
>       subgraph cluster27{
>               label="Set 27 RecordType(INTEGER a, BIGINT b, 
> VARCHAR(2147483647) c, VARCHAR(2147483647) d)";
>               rel806 
> [label="rel#806:LogicalProject\ninput=RelSubset#805,inputs=0..3\nrows=1.0E8, 
> cost={inf}",shape=box]
>               rel814 
> [label="rel#814:LogicalCalc\ninput=RelSubset#805,expr#0..4={inputs},proj#0..3={exprs}\nrows=1.0E8,
>  cost={inf}",shape=box]
>               rel818 
> [label="rel#818:FlinkLogicalCalc\ninput=RelSubset#813,select=a, b, c, 
> d\nrows=1.0E8, cost={2.0E8 rows, 1.0E8 cpu, 4.8E9 io, 0.0 network, 0.0 
> memory}",color=blue,shape=box]
>               subset807 [label="rel#807:RelSubset#27.NONE.any.[2 
> ASC-nulls-first]"]
>               subset815 [label="rel#815:RelSubset#27.NONE.any.[]"]
>               subset816 [label="rel#816:RelSubset#27.LOGICAL.any.[2 
> ASC-nulls-first]",color=red]
>               subset819 [label="rel#819:RelSubset#27.LOGICAL.any.[]"]
>               subset819 -> subset816;         subset815 -> subset807; }
>       subgraph cluster28{
>               label="Set 28 RecordType(INTEGER a, BIGINT b, 
> VARCHAR(2147483647) c, VARCHAR(2147483647) d)";
>               rel808 
> [label="rel#808:LogicalSink\ninput=RelSubset#807,table=default_catalog.default_database.sink2,fields=a,
>  b, c, d\nrows=1.0E8, cost={inf}",shape=box]
>               rel811 
> [label="rel#811:AbstractConverter\ninput=RelSubset#809,convention=LOGICAL,FlinkRelDistributionTraitDef=any,sort=[]\nrows=1.0E8,
>  cost={inf}",shape=box]
>               rel817 
> [label="rel#817:FlinkLogicalSink\ninput=RelSubset#816,table=default_catalog.default_database.sink2,fields=a,
>  b, c, d\nrows=1.0E8, cost={inf}",shape=box]
>               subset809 [label="rel#809:RelSubset#28.NONE.any.[]"]
>               subset810 [label="rel#810:RelSubset#28.LOGICAL.any.[]"]
>       }
>       root -> subset810;
>       subset805 -> rel162;
>       subset813 -> rel812[color=blue];
>       subset807 -> rel806; rel806 -> subset805;
>       subset815 -> rel814; rel814 -> subset805;
>       subset819 -> rel818[color=blue]; rel818 -> subset813[color=blue];
>       subset809 -> rel808; rel808 -> subset807;
>       subset810 -> rel811; rel811 -> subset809;
>       subset810 -> rel817; rel817 -> subset816;
>   }
>       at 
> org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:742)
>       at 
> org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:365)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:520)
>       at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69)
>       ... 55 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to