[
https://issues.apache.org/jira/browse/FLINK-18409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17142875#comment-17142875
]
Jark Wu commented on FLINK-18409:
---------------------------------
I think there are two problems here:
1) Currently, it is not possible to hint the planner to lookup on the given
fields/keys instead of lookup on all the equality fields.
I think this can be resolved by supporting PRIMARY KEY on the lookup source.
The PRIMARY KEY can be used as a optimization hint that if the join keys
contains primary keys, then the lookup key is primary key (exclude other join
fields). This is also good for caching. We can create another issue to discuss
abou this.
2) Exception when {{CAST(t2.SID AS INTEGER) = 2}}.
I think this is a bug in {{CommonLookupJoin#extractConstantField}}. If you are
interested in this, I can assign this issue to you.
> The FilterJoinRule.FILTER_ON_JOIN optimizer rule cause 'Join With a
> Temporal Table' not working properly
> -----------------------------------------------------------------------------------------------------------
>
> Key: FLINK-18409
> URL: https://issues.apache.org/jira/browse/FLINK-18409
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.10.0
> Reporter: Yuhao Zhao
> Priority: Major
>
> When use 'left join xxx for system_time as of xxx ' ,the
> FilterJoinRule.FILTER_ON_JOIN optimizer rule cause one problem that the eval
> function of LookupTableSource doesn't match and may cause some other problem。
> * Flink Version:1.10
> * Blink Planner
> AsyncTableFunction implement:
> {code:java}
> public class RedisAsyncLookupFunction extends AsyncTableFunction<Row> {
> // key is the redis key
> public void eval(CompletableFuture<Collection<Row>> resultFutre, String
> key) {
> .....
> }
> }
> {code}
> {color:#de350b}Flink SQL-01:{color}
> {code:java}
> // code placeholder
> select
> t1.col_key as col_key,
> t1.col_val as col_val,
> t2.PID as pid,
> t2.DD_APDATE,
> t2.SID,
> t2.TRX_AMOUNT
> from coll_source_1 as t1
> left join redis_dimension_1 FOR SYSTEM_TIME AS OF t1.proctime as t2
> on t1.col_key = t2.PID where t2.SID = '2'
> {code}
> {color:#de350b}Error-01{color}
> {code:java}
> Expected: eval(java.util.concurrent.CompletableFuture, java.lang.String,
> java.lang.String)
> Actual: eval(java.util.concurrent.CompletableFuture, java.lang.String)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin$$anonfun$13.apply(CommonLookupJoin.scala:436)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin$$anonfun$13.apply(CommonLookupJoin.scala:431)
> at scala.Option.getOrElse(Option.scala:121)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin.checkEvalMethodSignature(CommonLookupJoin.scala:431)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin.translateToPlanInternal(CommonLookupJoin.scala:263)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.scala:99)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.scala:40)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlan(StreamExecLookupJoin.scala:40)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> {code}
>
> {color:#de350b}Flink SQL-02{color}
> {code:java}
> select
> t1.col_key as col_key,
> t1.col_val as col_val,
> t2.PID as pid,
> t2.DD_APDATE,
> t2.SID,
> t2.TRX_AMOUNT
> from coll_source_1 as t1
> left join redis_dimension_1 FOR SYSTEM_TIME AS OF t1.proctime as t2
> on t1.col_key = t2.PID where CAST(t2.SID AS INTEGER) = 2
> {code}
> {color:#de350b}Error-02{color}
> {code:java}
> java.lang.RuntimeException: Error while applying rule
> StreamExecSnapshotOnCalcTableScanRule, args
> [rel#260:FlinkLogicalJoin.LOGICAL.any.None:
> 0.false.UNKNOWN(left=RelSubset#254,right=RelSubset#259,condition==($0,
> $2),joinType=inner), rel#253:FlinkLogicalCalc.LOGICAL.any.None:
> 0.false.UNKNOWN(input=RelSubset#252,select=col_key, col_val),
> rel#258:FlinkLogicalSnapshot.LOGICAL.any.None:
> 0.false.UNKNOWN(input=RelSubset#257,period=$cor0.proctime),
> rel#256:FlinkLogicalCalc.LOGICAL.any.None:
> 0.false.UNKNOWN(input=RelSubset#255,select=PID, DD_APDATE, SID,
> TRX_AMOUNT,where==(CAST(SID), 2)),
> rel#216:FlinkLogicalTableSourceScan.LOGICAL.any.None:
> 0.false.UNKNOWN(table=[default_catalog, default_database, redis_dimension_1,
> source: [RedisLookupTableSource(PID, DD_APDATE, SEQNO, SID, UID, btest,
> APPID, CLIENT_ID, TRX_AMOUNT)]],fields=PID, DD_APDATE, SEQNO, SID, UID,
> btest, APPID, CLIENT_ID, TRX_AMOUNT)] at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
> at
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
> at
> com.huawei.fi.rtd.flinksql.connector.redis.RedisDimensionTest.testCollectionJoinRedis(RedisDimensionTest.java:71)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
> at
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
> at
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: scala.MatchError: (CAST($3):INTEGER,2) (of class scala.Tuple2)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin.org$apache$flink$table$planner$plan$nodes$common$CommonLookupJoin$$extractConstantField(CommonLookupJoin.scala:601)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin.extractConstantFieldsFromEquiCondition(CommonLookupJoin.scala:591)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin.analyzeLookupKeys(CommonLookupJoin.scala:551)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin.<init>(CommonLookupJoin.scala:129)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.<init>(StreamExecLookupJoin.scala:49)
> at
> org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecLookupJoinRule$.org$apache$flink$table$planner$plan$rules$physical$stream$StreamExecLookupJoinRule$$doTransform(StreamExecLookupJoinRule.scala:88)
> at
> org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecLookupJoinRule$SnapshotOnCalcTableScanRule.transform(StreamExecLookupJoinRule.scala:65)
> at
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.onMatch(CommonLookupJoinRule.scala:172)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:208)
> ... 45 more
> {code}
> I debug the program,the root cause is that FilterJoinRule.FILTER_ON_JOIN rule
> which could optimize the where clause.
> I think that it may be forbidden when join with a temporal table.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)