[
https://issues.apache.org/jira/browse/FLINK-24239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17412962#comment-17412962
]
Caizhi Weng commented on FLINK-24239:
-------------------------------------
cc [~Leonard Xu]
> Event time temporal join should support values from array, map, row, etc. as
> join key
> -------------------------------------------------------------------------------------
>
> Key: FLINK-24239
> URL: https://issues.apache.org/jira/browse/FLINK-24239
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Affects Versions: 1.12.6, 1.13.3, 1.15.0, 1.14.1
> Reporter: Caizhi Weng
> Priority: Major
>
> This ticket is from the [mailing
> list|https://lists.apache.org/thread.html/r90cab9c5026e527357d58db70d7e9b5875e57b942738f032bd54bfd3%40%3Cuser-zh.flink.apache.org%3E].
> Currently in event time temporal join when join keys are from an array, map
> or row, an exception will be thrown saying "Currently the join key in
> Temporal Table Join can not be empty". This is quite confusing for users as
> they've already set the join keys.
> Add the following test case to {{TableEnvironmentITCase}} to reproduce this
> issue.
> {code:scala}
> @Test
> def myTest(): Unit = {
> tEnv.executeSql(
> """
> |CREATE TABLE A (
> | a MAP<STRING NOT NULL, INT>,
> | ts TIMESTAMP(3),
> | WATERMARK FOR ts AS ts
> |) WITH (
> | 'connector' = 'values'
> |)
> |""".stripMargin)
> tEnv.executeSql(
> """
> |CREATE TABLE B (
> | id INT,
> | ts TIMESTAMP(3),
> | WATERMARK FOR ts AS ts
> |) WITH (
> | 'connector' = 'values'
> |)
> |""".stripMargin)
> tEnv.executeSql("SELECT * FROM A LEFT JOIN B FOR SYSTEM_TIME AS OF A.ts AS
> b ON A.a['ID'] = id").print()
> }
> {code}
> The exception stack is
> {code:java}
> org.apache.flink.table.api.ValidationException: Currently the join key in
> Temporal Table Join can not be empty.
> at
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
> at
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
> at
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.immutable.Range.foreach(Range.scala:160)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1704)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:807)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1289)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:744)
> at
> org.apache.flink.table.api.TableEnvironmentITCase.myTest(TableEnvironmentITCase.scala:109)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> at
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> at org.junit.runners.Suite.runChild(Suite.java:128)
> at org.junit.runners.Suite.runChild(Suite.java:27)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> {code}
> However if we change the event time temporal join to a lookup join it will be
> good. This is because in the temporal_join_rewrite optimization phase, lookup
> joins and event time temporal joins will go through different rules.
> * Lookup joins will go through
> LogicalCorrelateToJoinFromLookupTableRuleWithFilter (or without filter) which
> directly changes the correlate node to a join node.
> * Event time temporal joins, however, will go through
> LogicalCorrelateToJoinFromTemporalTableRuleWithFilter (or without filter). In
> this rule their join keys will be checked and extracted fromĀ {{joinInfo}}.
> The join keys of {{JoinInfo}} are limited to pure input references (see
> {{RelOptUtil#splitJoinCondition}}) thus causing this problem.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)