[
https://issues.apache.org/jira/browse/FLINK-18409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17322994#comment-17322994
]
Flink Jira Bot commented on FLINK-18409:
----------------------------------------
This issue is assigned but has not received an update in 7 days so it has been
labeled "stale-assigned". If you are still working on the issue, please give an
update and remove the label. If you are no longer working on the issue, please
unassign so someone else may work on it. In 7 days the issue will be
automatically unassigned.
> The FilterJoinRule.FILTER_ON_JOIN optimizer rule cause 'Join With a
> Temporal Table' not working properly
> -----------------------------------------------------------------------------------------------------------
>
> Key: FLINK-18409
> URL: https://issues.apache.org/jira/browse/FLINK-18409
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.10.0
> Reporter: Yuhao Zhao
> Assignee: Yuhao Zhao
> Priority: Major
> Labels: stale-assigned
>
> When use 'left join xxx for system_time as of xxx ' ,the
> FilterJoinRule.FILTER_ON_JOIN optimizer rule cause one problem that the eval
> function of LookupTableSource doesn't match and may cause some other problem。
> * Flink Version:1.10
> * Blink Planner
> AsyncTableFunction implement:
> {code:java}
> public class RedisAsyncLookupFunction extends AsyncTableFunction<Row> {
> // key is the redis key
> public void eval(CompletableFuture<Collection<Row>> resultFutre, String
> key) {
> .....
> }
> }
> {code}
> {color:#de350b}Flink SQL-01:{color}
> {code:java}
> // code placeholder
> select
> t1.col_key as col_key,
> t1.col_val as col_val,
> t2.PID as pid,
> t2.DD_APDATE,
> t2.SID,
> t2.TRX_AMOUNT
> from coll_source_1 as t1
> left join redis_dimension_1 FOR SYSTEM_TIME AS OF t1.proctime as t2
> on t1.col_key = t2.PID where t2.SID = '2'
> {code}
> {color:#de350b}Error-01{color}
> {code:java}
> Expected: eval(java.util.concurrent.CompletableFuture, java.lang.String,
> java.lang.String)
> Actual: eval(java.util.concurrent.CompletableFuture, java.lang.String)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin$$anonfun$13.apply(CommonLookupJoin.scala:436)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin$$anonfun$13.apply(CommonLookupJoin.scala:431)
> at scala.Option.getOrElse(Option.scala:121)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin.checkEvalMethodSignature(CommonLookupJoin.scala:431)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin.translateToPlanInternal(CommonLookupJoin.scala:263)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.scala:99)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.scala:40)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlan(StreamExecLookupJoin.scala:40)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> {code}
>
> {color:#de350b}Flink SQL-02{color}
> {code:java}
> select
> t1.col_key as col_key,
> t1.col_val as col_val,
> t2.PID as pid,
> t2.DD_APDATE,
> t2.SID,
> t2.TRX_AMOUNT
> from coll_source_1 as t1
> left join redis_dimension_1 FOR SYSTEM_TIME AS OF t1.proctime as t2
> on t1.col_key = t2.PID where CAST(t2.SID AS INTEGER) = 2
> {code}
> {color:#de350b}Error-02{color}
> {code:java}
> java.lang.RuntimeException: Error while applying rule
> StreamExecSnapshotOnCalcTableScanRule, args
> [rel#260:FlinkLogicalJoin.LOGICAL.any.None:
> 0.false.UNKNOWN(left=RelSubset#254,right=RelSubset#259,condition==($0,
> $2),joinType=inner), rel#253:FlinkLogicalCalc.LOGICAL.any.None:
> 0.false.UNKNOWN(input=RelSubset#252,select=col_key, col_val),
> rel#258:FlinkLogicalSnapshot.LOGICAL.any.None:
> 0.false.UNKNOWN(input=RelSubset#257,period=$cor0.proctime),
> rel#256:FlinkLogicalCalc.LOGICAL.any.None:
> 0.false.UNKNOWN(input=RelSubset#255,select=PID, DD_APDATE, SID,
> TRX_AMOUNT,where==(CAST(SID), 2)),
> rel#216:FlinkLogicalTableSourceScan.LOGICAL.any.None:
> 0.false.UNKNOWN(table=[default_catalog, default_database, redis_dimension_1,
> source: [RedisLookupTableSource(PID, DD_APDATE, SEQNO, SID, UID, btest,
> APPID, CLIENT_ID, TRX_AMOUNT)]],fields=PID, DD_APDATE, SEQNO, SID, UID,
> btest, APPID, CLIENT_ID, TRX_AMOUNT)] at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
> at
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
> at
> com.huawei.fi.rtd.flinksql.connector.redis.RedisDimensionTest.testCollectionJoinRedis(RedisDimensionTest.java:71)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
> at
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
> at
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: scala.MatchError: (CAST($3):INTEGER,2) (of class scala.Tuple2)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin.org$apache$flink$table$planner$plan$nodes$common$CommonLookupJoin$$extractConstantField(CommonLookupJoin.scala:601)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin.extractConstantFieldsFromEquiCondition(CommonLookupJoin.scala:591)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin.analyzeLookupKeys(CommonLookupJoin.scala:551)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin.<init>(CommonLookupJoin.scala:129)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.<init>(StreamExecLookupJoin.scala:49)
> at
> org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecLookupJoinRule$.org$apache$flink$table$planner$plan$rules$physical$stream$StreamExecLookupJoinRule$$doTransform(StreamExecLookupJoinRule.scala:88)
> at
> org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecLookupJoinRule$SnapshotOnCalcTableScanRule.transform(StreamExecLookupJoinRule.scala:65)
> at
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.onMatch(CommonLookupJoinRule.scala:172)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:208)
> ... 45 more
> {code}
> I debug the program,the root cause is that FilterJoinRule.FILTER_ON_JOIN rule
> which could optimize the where clause.
> I think that it may be forbidden when join with a temporal table.
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)