[ 
https://issues.apache.org/jira/browse/FLINK-11020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16706580#comment-16706580
 ] 

Hequn Cheng commented on FLINK-11020:
-------------------------------------

Maybe we can implement cross join with parallelism of 1? For example, call 
forceNonParallel() for connectOperator in `DataStreamJoin` when it is cross 
join. We can do some optimization later, such as broadcasting the smaller side, 
etc.

> Reorder joins only to eliminate cross joins 
> --------------------------------------------
>
>                 Key: FLINK-11020
>                 URL: https://issues.apache.org/jira/browse/FLINK-11020
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: Timo Walther
>            Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>       """
>         |SELECT t1.a, t3.b
>         |FROM MyTable3 t3, MyTable2 t2, MyTable t1
>         |WHERE t1.a = t3.a AND t1.a = t2.a
>         |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
>     LogicalJoin(condition=[true], joinType=[inner])
>       LogicalJoin(condition=[true], joinType=[inner])
>         LogicalTableScan(table=[[_DataStreamTable_2]])
>         LogicalTableScan(table=[[_DataStreamTable_1]])
>       LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> {code}
> Introducing {{JoinPushThroughJoinRule}} would help but should only be applied 
> if a cross join is the only alternative.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to