[ 
https://issues.apache.org/jira/browse/FLINK-24835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17440890#comment-17440890
 ] 

JING ZHANG commented on FLINK-24835:
------------------------------------

[~xuyangzhong] Thanks for reporting this problem.

The case is interesting, the sql looks like a interval join query at first 
glance, but it is a regular join because right side is a unbounded aggregate 
which has no time attribute field.

In theory, the query would translated into regular join instead of interval 
join because the time attribute of right side would be materialized because it 
is an unbounded aggregate.

For this case, both left side and right side of join  should be materialized. 
But there is a bug in `RelTimeIndicatorConverter#visitJoin`. The bug would 
mistaken regard the join as interval join, so it skip materialized time 
attribute of left side which leads an exception in `StreamPhysicalJoinRule`.

I would fix this bug soon, thanks again for reporting this bug.

> "group by" in the interval join will throw a exception
> ------------------------------------------------------
>
>                 Key: FLINK-24835
>                 URL: https://issues.apache.org/jira/browse/FLINK-24835
>             Project: Flink
>          Issue Type: Bug
>            Reporter: xuyang
>            Priority: Minor
>
> Can reproduce this bug by the following code added into 
> IntervalJoinTest.scala:
> {code:java}
> @Test
> def testSemiIntervalJoinWithSimpleConditionAndGroup(): Unit = {
>   val sql =
>     """
>       |SELECT t1.a FROM MyTable t1 WHERE t1.a IN (
>       | SELECT t2.a FROM MyTable2 t2
>       |   WHERE t1.b = t2.b AND t1.rowtime between t2.rowtime and t2.rowtime 
> + INTERVAL '5' MINUTE
>       |   GROUP BY t2.a
>       |)
>     """.stripMargin
>   util.verifyExecPlan(sql)
> } {code}
> The exception is :
> {code:java}
> java.lang.IllegalStateException
>     at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
>     at 
> org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalJoinRule.matches(StreamPhysicalJoinRule.scala:64)
>     at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:284)
>     at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411)
>     at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411)
>     at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:268)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:985)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1245)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
>     at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
>     at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
>     at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
>     at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:486)
>     at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:309)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:64)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:60)
>     at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:165)
>     at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
>     at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:309)
>     at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:894)
>     at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:790)
>     at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.verifyExecPlan(TableTestBase.scala:591)
>     at 
> org.apache.flink.table.planner.plan.stream.sql.join.IntervalJoinTest.testSemiIntervalJoinWithSimpleConditionAndGroup(IntervalJoinTest.scala:76)
>  {code}
> It is caused by that the agg casts the time indicator type to the common 
> timestamp.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to