[
https://issues.apache.org/jira/browse/FLINK-10734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17374838#comment-17374838
]
Timo Walther commented on FLINK-10734:
--------------------------------------
[~Leonard Xu] I guess this is still an issue? I updated the component for now.
Or shall we close it?
> Temporal joins on heavily filtered tables might fail in planning
> ----------------------------------------------------------------
>
> Key: FLINK-10734
> URL: https://issues.apache.org/jira/browse/FLINK-10734
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.7.0
> Reporter: Piotr Nowojski
> Priority: Minor
>
> Following query:
> {code}
> val sqlQuery =
> """
> |SELECT
> | o.amount * r.rate AS amount
> |FROM
> | Orders AS o,
> | LATERAL TABLE (Rates(o.rowtime)) AS r
> |WHERE r.currency = o.currency
> |""".stripMargin
> {code}
> with {{Rates}} defined as follows:
> {code}
> tEnv.registerTable("EuroRatesHistory",
> tEnv.scan("RatesHistory").filter('currency === "Euro"))
> tEnv.registerFunction(
> "Rates",
> tEnv.scan("EuroRatesHistory").createTemporalTableFunction('rowtime,
> 'currency))
> {code}
> Will fail with:
> {noformat}
> org.apache.flink.table.api.ValidationException: Only single column join key
> is supported. Found [] in [InnerJoin(where:
> (__TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, currency)), join: (amount,
> rowtime, currency, rate, rowtime0))]
> at
> org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.validateRightPrimaryKey(DataStreamTemporalJoinToCoProcessTranslator.scala:215)
> at
> org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:183)
> at
> org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:152)
> {noformat}
> The problem is that filtering condition {{('currency === "Euro")}} interferes
> with joining condition, simplifying it to nothing. Note how top
> {{LogicalFilter(condition=[=($3, $1)])}} changes during optimising and
> finally disappears:
> {noformat}
> LogicalProject(amount=[*($0, $4)])
> LogicalFilter(condition=[=($3, $1)])
> LogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($2, $5,
> $3)], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_0]])
> LogicalFilter(condition=[=($0, _UTF-16LE'Euro')])
> LogicalTableScan(table=[[_DataStreamTable_1]])
> {noformat}
> {noformat}
> LogicalProject(amount=[*($0, $4)])
> LogicalFilter(condition=[=(_UTF-16LE'Euro', $1)])
> LogicalProject(amount=[$0], currency=[$1], rowtime=[$2], currency0=[$3],
> rate=[$4], rowtime0=[CAST($5):TIMESTAMP(3) NOT NULL])
> LogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($2, $5,
> $3)], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_0]])
> LogicalFilter(condition=[=($0, _UTF-16LE'Euro')])
> LogicalTableScan(table=[[_DataStreamTable_1]])
> {noformat}
> {noformat}
> FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[*($t0, $t3)], amount=[$t5])
> FlinkLogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($1, $4,
> $2)], joinType=[inner])
> FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[_UTF-16LE'Euro'],
> expr#4=[=($t3, $t1)], amount=[$t0], rowtime=[$t2], $condition=[$t4])
> FlinkLogicalNativeTableScan(table=[[_DataStreamTable_0]])
> FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[_UTF-16LE'Euro'],
> expr#4=[=($t0, $t3)], proj#0..2=[{exprs}], $condition=[$t4])
> FlinkLogicalNativeTableScan(table=[[_DataStreamTable_1]])
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)