beyond1920 commented on a change in pull request #16739:
URL: https://github.com/apache/flink/pull/16739#discussion_r686670399



##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.scala
##########
@@ -38,40 +38,40 @@ class StreamPhysicalIntervalJoinRule
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val join: FlinkLogicalJoin = call.rel(0)
-    val joinRowType = join.getRowType
 
-    // TODO support SEMI/ANTI join
-    if (!join.getJoinType.projectsRight) {
+    if (!satisfyIntervalJoin(join)) {
       return false
     }
 
-    val (windowBounds, _) = extractWindowBounds(join)
+    // validate the join
+    val windowBounds = extractWindowBounds(join)._1.get
 
-    if (windowBounds.isDefined) {
-      if (windowBounds.get.isEventTime) {
-        val leftTimeAttributeType = join.getLeft.getRowType
-          .getFieldList
-          .get(windowBounds.get.getLeftTimeIdx).getType
-        val rightTimeAttributeType = join.getRight.getRowType
-          .getFieldList
-          .get(windowBounds.get.getRightTimeIdx).getType
-        if (leftTimeAttributeType.getSqlTypeName != 
rightTimeAttributeType.getSqlTypeName) {
-          throw new ValidationException(
-            String.format("Interval join with rowtime attribute requires same 
rowtime types," +
-              " but the types are %s and %s.",
+    if (windowBounds.isEventTime) {
+      val leftTimeAttributeType = join.getLeft.getRowType
+        .getFieldList
+        .get(windowBounds.getLeftTimeIdx).getType
+      val rightTimeAttributeType = join.getRight.getRowType
+        .getFieldList
+        .get(windowBounds.getRightTimeIdx).getType
+      if (leftTimeAttributeType.getSqlTypeName != 
rightTimeAttributeType.getSqlTypeName) {
+        throw new ValidationException(
+          String.format("Interval join with rowtime attribute requires same 
rowtime types," +
+            " but the types are %s and %s.",
             leftTimeAttributeType.toString, rightTimeAttributeType.toString))
-        }
-        true
-      } else {
-        // Check that no event-time attributes are in the input because the 
processing time window
-        // join does not correctly hold back watermarks.
-        // We rely on projection pushdown to remove unused attributes before 
the join.
-        !joinRowType.getFieldList.exists(f => 
FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
       }
     } else {
-      // the given join does not have valid window bounds. We cannot translate 
it.
-      false
+      // Check that no event-time attributes are in the input because the 
processing time window
+      // join does not correctly hold back watermarks.
+      // We rely on projection pushdown to remove unused attributes before the 
join.
+      val joinRowType = join.getRowType
+      val containsRowTime = joinRowType.getFieldList.exists(f => 
isRowtimeIndicatorType(f.getType))
+      if (containsRowTime) {
+        throw new TableException(

Review comment:
       The exception would really trigger by user in some corner case ( for 
example `IntervalJoinTest#testNoRowtimeAttributeInResult`).
   Before the pr, this case would also throw  the following `TableException` 
because the logical join could not translated to any kind of physical join.
   `org.apache.flink.table.api.TableException: Cannot generate a valid 
execution plan for the given query: 
   
   FlinkLogicalCalc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS 
proctime, rowtime, a0, b0, c0, PROCTIME_MATERIALIZE(proctime0) AS proctime0, 
rowtime0])
   +- FlinkLogicalJoin(condition=[AND(=($0, $5), >=($3, -($8, 5000:INTERVAL 
SECOND)), <=($3, $8))], joinType=[inner])
      :- FlinkLogicalDataStreamTableScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
      +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
   
   This exception indicates that the query uses an unsupported SQL feature.
   Please check the documentation for the set of currently supported SQL 
features.
   `
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to