[
https://issues.apache.org/jira/browse/FLINK-11070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-11070:
-----------------------------------
Labels: auto-deprioritized-major auto-unassigned (was: auto-unassigned
stale-major)
Priority: Minor (was: Major)
This issue was labeled "stale-major" 7 ago and has not received any updates so
it is being deprioritized. If this ticket is actually Major, please raise the
priority and ask a committer to assign you the issue or revive the public
discussion.
> Add stream-stream non-window cross join
> ---------------------------------------
>
> Key: FLINK-11070
> URL: https://issues.apache.org/jira/browse/FLINK-11070
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / API
> Reporter: Hequn Cheng
> Priority: Minor
> Labels: auto-deprioritized-major, auto-unassigned
>
> Currently, we don't reorder join and rely on the order provided by the user.
> This is fine for most of the cases, however, it limits the set of supported
> SQL queries.
> Example:
> {code:java}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime,
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime,
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime,
> 'proctime.proctime)
> val sqlQuery =
> """
> |SELECT t1.a, t3.b
> |FROM MyTable3 t3, MyTable2 t2, MyTable t1
> |WHERE t1.a = t3.a AND t1.a = t2.a
> |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not
> supported and thus leads to:
> {code:java}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution
> plan for the given query:
> LogicalProject(a=[$8], b=[$1])
> LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
> LogicalJoin(condition=[true], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_2]])
> LogicalTableScan(table=[[_DataStreamTable_1]])
> LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL
> features.
> {code}
> In order to support more queries, it would be nice to have cross join on
> streaming. We can start from a simple version, for example, call
> forceNonParallel() for connectOperator in `DataStreamJoin` when it is a cross
> join. The performance may be bad. But it works fine if the two tables of
> cross join are small ones.
> We can do some optimizations later, such as broadcasting the smaller side,
> etc.
> Any suggestions are greatly appreciated.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)