[
https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15656831#comment-15656831
]
Alexander Shoshin commented on FLINK-4541:
------------------------------------------
NOT IN operator does not work with nested queries because of the cross join in
a logical plan
{code}
...
LogicalJoin(condition=[true], joinType=[inner])
EnumerableTableScan(table=[[Words]])
LogicalAggregate(group=[{}], agg#0=[COUNT()], agg#1=[COUNT($0)])
...
{code}
Flink does not have a {{Cross Join}} translation rule at the moment and as we
discussed with [~fhueske] adding this rule is not good for the next reason:
with support for cross joins it is possible to write by accident queries that
run for days and produce vast amounts of data filling up all disks.
The right solution might be to add a translation rule, that converts a {{Cross
Join}} (with {{condition=\[true]}}) into a {{DataSetSingleRowCross}} only if
one input of the join is a global aggregation (as in the logical plan above).
The {{DataSetSingleRowCross}} would be implemented as {{MapFunction}} with
{{BroadcastSet}} input (for the single record input) and internally call a code
generated {{JoinFunction}}.
> Support for SQL NOT IN operator
> -------------------------------
>
> Key: FLINK-4541
> URL: https://issues.apache.org/jira/browse/FLINK-4541
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Alexander Shoshin
>
> This should work:
> {code}
> def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao",
> 1))
> // register the DataSet as table "WordCount"
> tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
> tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word,
> 'frequency).select('word).filter('word !== "hello"))
> // run a SQL query on the Table and retrieve the result as a new Table
> val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE
> word NOT IN (SELECT word FROM WordCount2) GROUP BY word")
> table.toDataSet[WC].print()
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)