[ 
https://issues.apache.org/jira/browse/FLINK-24239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Caizhi Weng updated FLINK-24239:
--------------------------------
    Description: 
This ticket is from the [mailing 
list|https://lists.apache.org/thread.html/r90cab9c5026e527357d58db70d7e9b5875e57b942738f032bd54bfd3%40%3Cuser-zh.flink.apache.org%3E].

Currently in event time temporal join when join keys are from an array, map or 
row, an exception will be thrown saying "Currently the join key in Temporal 
Table Join can not be empty". This is quite confusing for users as they've 
already set the join keys.

Add the following test case to {{TableEnvironmentITCase}} to reproduce this 
issue.
{code:scala}
@Test
def myTest(): Unit = {
  tEnv.executeSql(
    """
      |CREATE TABLE A (
      |  a MAP<STRING NOT NULL, INT>,
      |  ts TIMESTAMP(3),
      |  WATERMARK FOR ts AS ts
      |) WITH (
      |  'connector' = 'values'
      |)
      |""".stripMargin)
  tEnv.executeSql(
    """
      |CREATE TABLE B (
      |  id INT,
      |  ts TIMESTAMP(3),
      |  WATERMARK FOR ts AS ts
      |) WITH (
      |  'connector' = 'values'
      |)
      |""".stripMargin)
  tEnv.executeSql("SELECT * FROM A LEFT JOIN B FOR SYSTEM_TIME AS OF A.ts AS b 
ON A.a['ID'] = id").print()
}
{code}
The exception stack is
{code:java}
org.apache.flink.table.api.ValidationException: Currently the join key in 
Temporal Table Join can not be empty.

        at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272)
        at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
        at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
        at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
        at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
        at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.immutable.Range.foreach(Range.scala:160)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:77)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1704)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:807)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1289)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:744)
        at 
org.apache.flink.table.api.TableEnvironmentITCase.myTest(TableEnvironmentITCase.scala:109)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at 
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258)
        at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
        at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
        at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at org.junit.runners.Suite.runChild(Suite.java:128)
        at org.junit.runners.Suite.runChild(Suite.java:27)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
        at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
        at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
        at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
{code}
However if we change the event time temporal join to a lookup join it will be 
good. This is because in the temporal_join_rewrite optimization phase, lookup 
joins and event time temporal joins will go through different rules.
 * Lookup joins will go through 
LogicalCorrelateToJoinFromLookupTableRuleWithFilter (or without filter) which 
directly changes the correlate node to a join node.
 * Event time temporal joins, however, will go through 
LogicalCorrelateToJoinFromTemporalTableRuleWithFilter (or without filter). In 
this rule their join keys will be checked and extracted from {{joinInfo}}. The 
join keys of {{JoinInfo}} are limited to pure input references (see 
{{RelOptUtil#splitJoinCondition}}) thus causing this problem.

Currently users can create a view as a workaround. For example
{code:sql}
CREATE VIEW myView AS SELECT A.a['ID'] AS id, ts FROM A;
SELECT * FROM myView AS a LEFT JOIN B FOR SYSTEM_TIME AS OF a.ts AS b ON a.id = 
b.id;
{code}
but this is sort of inconvenient.

  was:
This ticket is from the [mailing 
list|https://lists.apache.org/thread.html/r90cab9c5026e527357d58db70d7e9b5875e57b942738f032bd54bfd3%40%3Cuser-zh.flink.apache.org%3E].

Currently in event time temporal join when join keys are from an array, map or 
row, an exception will be thrown saying "Currently the join key in Temporal 
Table Join can not be empty". This is quite confusing for users as they've 
already set the join keys.

Add the following test case to {{TableEnvironmentITCase}} to reproduce this 
issue.
{code:scala}
@Test
def myTest(): Unit = {
  tEnv.executeSql(
    """
      |CREATE TABLE A (
      |  a MAP<STRING NOT NULL, INT>,
      |  ts TIMESTAMP(3),
      |  WATERMARK FOR ts AS ts
      |) WITH (
      |  'connector' = 'values'
      |)
      |""".stripMargin)
  tEnv.executeSql(
    """
      |CREATE TABLE B (
      |  id INT,
      |  ts TIMESTAMP(3),
      |  WATERMARK FOR ts AS ts
      |) WITH (
      |  'connector' = 'values'
      |)
      |""".stripMargin)
  tEnv.executeSql("SELECT * FROM A LEFT JOIN B FOR SYSTEM_TIME AS OF A.ts AS b 
ON A.a['ID'] = id").print()
}
{code}
The exception stack is
{code:java}
org.apache.flink.table.api.ValidationException: Currently the join key in 
Temporal Table Join can not be empty.

        at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272)
        at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
        at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
        at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
        at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
        at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.immutable.Range.foreach(Range.scala:160)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:77)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1704)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:807)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1289)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:744)
        at 
org.apache.flink.table.api.TableEnvironmentITCase.myTest(TableEnvironmentITCase.scala:109)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at 
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258)
        at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
        at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
        at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at org.junit.runners.Suite.runChild(Suite.java:128)
        at org.junit.runners.Suite.runChild(Suite.java:27)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
        at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
        at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
        at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
{code}
However if we change the event time temporal join to a lookup join it will be 
good. This is because in the temporal_join_rewrite optimization phase, lookup 
joins and event time temporal joins will go through different rules.
 * Lookup joins will go through 
LogicalCorrelateToJoinFromLookupTableRuleWithFilter (or without filter) which 
directly changes the correlate node to a join node.
 * Event time temporal joins, however, will go through 
LogicalCorrelateToJoinFromTemporalTableRuleWithFilter (or without filter). In 
this rule their join keys will be checked and extracted from {{joinInfo}}. The 
join keys of {{JoinInfo}} are limited to pure input references (see 
{{RelOptUtil#splitJoinCondition}}) thus causing this problem.


> Event time temporal join should support values from array, map, row, etc. as 
> join key
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-24239
>                 URL: https://issues.apache.org/jira/browse/FLINK-24239
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 1.12.6, 1.13.3, 1.15.0, 1.14.1
>            Reporter: Caizhi Weng
>            Priority: Major
>
> This ticket is from the [mailing 
> list|https://lists.apache.org/thread.html/r90cab9c5026e527357d58db70d7e9b5875e57b942738f032bd54bfd3%40%3Cuser-zh.flink.apache.org%3E].
> Currently in event time temporal join when join keys are from an array, map 
> or row, an exception will be thrown saying "Currently the join key in 
> Temporal Table Join can not be empty". This is quite confusing for users as 
> they've already set the join keys.
> Add the following test case to {{TableEnvironmentITCase}} to reproduce this 
> issue.
> {code:scala}
> @Test
> def myTest(): Unit = {
>   tEnv.executeSql(
>     """
>       |CREATE TABLE A (
>       |  a MAP<STRING NOT NULL, INT>,
>       |  ts TIMESTAMP(3),
>       |  WATERMARK FOR ts AS ts
>       |) WITH (
>       |  'connector' = 'values'
>       |)
>       |""".stripMargin)
>   tEnv.executeSql(
>     """
>       |CREATE TABLE B (
>       |  id INT,
>       |  ts TIMESTAMP(3),
>       |  WATERMARK FOR ts AS ts
>       |) WITH (
>       |  'connector' = 'values'
>       |)
>       |""".stripMargin)
>   tEnv.executeSql("SELECT * FROM A LEFT JOIN B FOR SYSTEM_TIME AS OF A.ts AS 
> b ON A.a['ID'] = id").print()
> }
> {code}
> The exception stack is
> {code:java}
> org.apache.flink.table.api.ValidationException: Currently the join key in 
> Temporal Table Join can not be empty.
>       at 
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272)
>       at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>       at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>       at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>       at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>       at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>       at 
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
>       at 
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at scala.collection.immutable.Range.foreach(Range.scala:160)
>       at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>       at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>       at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:77)
>       at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1704)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:807)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1289)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:744)
>       at 
> org.apache.flink.table.api.TableEnvironmentITCase.myTest(TableEnvironmentITCase.scala:109)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>       at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258)
>       at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>       at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
>       at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>       at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>       at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>       at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>       at org.junit.runners.Suite.runChild(Suite.java:128)
>       at org.junit.runners.Suite.runChild(Suite.java:27)
>       at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>       at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>       at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>       at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>       at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>       at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>       at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>       at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> {code}
> However if we change the event time temporal join to a lookup join it will be 
> good. This is because in the temporal_join_rewrite optimization phase, lookup 
> joins and event time temporal joins will go through different rules.
>  * Lookup joins will go through 
> LogicalCorrelateToJoinFromLookupTableRuleWithFilter (or without filter) which 
> directly changes the correlate node to a join node.
>  * Event time temporal joins, however, will go through 
> LogicalCorrelateToJoinFromTemporalTableRuleWithFilter (or without filter). In 
> this rule their join keys will be checked and extracted from {{joinInfo}}. 
> The join keys of {{JoinInfo}} are limited to pure input references (see 
> {{RelOptUtil#splitJoinCondition}}) thus causing this problem.
> Currently users can create a view as a workaround. For example
> {code:sql}
> CREATE VIEW myView AS SELECT A.a['ID'] AS id, ts FROM A;
> SELECT * FROM myView AS a LEFT JOIN B FOR SYSTEM_TIME AS OF a.ts AS b ON a.id 
> = b.id;
> {code}
> but this is sort of inconvenient.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to