wuchong commented on a change in pull request #13300:
URL: https://github.com/apache/flink/pull/13300#discussion_r512736775



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
##########
@@ -197,8 +294,121 @@ object TemporalJoinUtil {
     rowtimeJoin
   }
 
-  def isRowTimeTemporalJoinConditionCall(rexCall: RexCall): Boolean = {
-    rexCall.getOperator == TEMPORAL_JOIN_CONDITION && rexCall.operands.length 
> 3
+  def isRowTimeTemporalTableJoinCon(rexCall: RexCall): Boolean = {
+    //(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, LEFT_KEY, RIGHT_KEY, 
PRIMARY_KEY)
+    rexCall.getOperator == TEMPORAL_JOIN_CONDITION && rexCall.operands.length 
== 5
+  }
+
+  def isRowTimeTemporalFunctionJoinCon(rexCall: RexCall): Boolean = {
+    //(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)
+    rexCall.getOperator == TEMPORAL_JOIN_CONDITION && rexCall.operands.length 
== 3
   }
 
+  def isTemporalFunctionJoin(rexBuilder: RexBuilder, joinInfo: JoinInfo): 
Boolean = {
+    val nonEquiJoinRex = joinInfo.getRemaining(rexBuilder)
+    var isTemporalFunctionJoin: Boolean = false
+    val visitor = new RexVisitorImpl[Unit](true) {
+      override def visitCall(call: RexCall): Unit = {
+        if (isTemporalFunctionCon(call)) {
+          isTemporalFunctionJoin = true
+        } else {
+          super.visitCall(call)
+        }
+      }
+    }
+    nonEquiJoinRex.accept(visitor)
+    isTemporalFunctionJoin
+  }
+
+  def isTemporalFunctionCon(rexCall: RexCall): Boolean = {
+    //(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)
+    //(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)
+    rexCall.getOperator == TEMPORAL_JOIN_CONDITION &&
+      (rexCall.operands.length == 2 || rexCall.operands.length == 3)
+  }
+
+  def validateAndExtractTemporalFunctionCondition(

Review comment:
       ```suggestion
     def validateTemporalFunctionCondition(
   ```

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalTableFunctionJoinITCase.scala
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.bridge.scala._
+import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import 
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, 
TestingAppendSink}
+import org.apache.flink.table.planner.utils.TableTestUtil
+import org.apache.flink.types.Row
+
+import org.junit.Assert.assertEquals
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import java.sql.Timestamp
+
+import scala.collection.mutable
+
+@RunWith(classOf[Parameterized])
+class LegacyTemporalJoinITCase(state: StateBackendMode)

Review comment:
       Rename the class name to the file name `TemporalTableFunctionJoinITCase`?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
##########
@@ -197,8 +294,121 @@ object TemporalJoinUtil {
     rowtimeJoin
   }
 
-  def isRowTimeTemporalJoinConditionCall(rexCall: RexCall): Boolean = {
-    rexCall.getOperator == TEMPORAL_JOIN_CONDITION && rexCall.operands.length 
> 3
+  def isRowTimeTemporalTableJoinCon(rexCall: RexCall): Boolean = {
+    //(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, LEFT_KEY, RIGHT_KEY, 
PRIMARY_KEY)
+    rexCall.getOperator == TEMPORAL_JOIN_CONDITION && rexCall.operands.length 
== 5
+  }
+
+  def isRowTimeTemporalFunctionJoinCon(rexCall: RexCall): Boolean = {
+    //(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)
+    rexCall.getOperator == TEMPORAL_JOIN_CONDITION && rexCall.operands.length 
== 3
   }
 
+  def isTemporalFunctionJoin(rexBuilder: RexBuilder, joinInfo: JoinInfo): 
Boolean = {
+    val nonEquiJoinRex = joinInfo.getRemaining(rexBuilder)
+    var isTemporalFunctionJoin: Boolean = false
+    val visitor = new RexVisitorImpl[Unit](true) {
+      override def visitCall(call: RexCall): Unit = {
+        if (isTemporalFunctionCon(call)) {
+          isTemporalFunctionJoin = true
+        } else {
+          super.visitCall(call)
+        }
+      }
+    }
+    nonEquiJoinRex.accept(visitor)
+    isTemporalFunctionJoin
+  }
+
+  def isTemporalFunctionCon(rexCall: RexCall): Boolean = {
+    //(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)
+    //(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)
+    rexCall.getOperator == TEMPORAL_JOIN_CONDITION &&
+      (rexCall.operands.length == 2 || rexCall.operands.length == 3)
+  }
+
+  def validateAndExtractTemporalFunctionCondition(
+      call: RexCall,
+      leftTimeAttribute: Option[RexNode],
+      rightTimeAttribute: Option[RexNode],

Review comment:
       Can be `RexNode` instead of `Option`? 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
##########
@@ -84,7 +84,7 @@ class LogicalCorrelateToJoinFromTemporalTableFunctionRule
 
     val cluster = logicalCorrelate.getCluster
 
-    new GetTemporalTableFunctionCall(cluster.getRexBuilder, leftNode)
+    val join = new GetTemporalTableFunctionCall(cluster.getRexBuilder, 
leftNode)

Review comment:
       The `join` value is never used. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
##########
@@ -94,13 +115,13 @@ class TemporalJoinRewriteWithUniqueKeyRule extends 
RelOptRule(
   }
 
   private def validateRightPrimaryKey(
+      join: FlinkLogicalJoin,
       rightJoinKeyExpression: Seq[RexNode],
       rightPrimaryKeyInputRefs: Option[Seq[RexNode]]): Unit = {
 
     if (rightPrimaryKeyInputRefs.isEmpty) {
-      throw new ValidationException("Event-Time Temporal Table Join requires 
both" +
-        s" primary key and row time attribute in versioned table," +
-        s" but no primary key can be found.")
+      throw new ValidationException(s"Temporal Table Join requires primary key 
in versioned" +
+        s" table, but no primary key can be found in 
${RelOptUtil.toString(join)}.")

Review comment:
       Put the plan tree in a new line. 
   ```scala
         throw new ValidationException(
           "Temporal Table Join requires primary key in versioned table, " +
             s"but no primary key can be found. The physical plan 
is:\n${RelOptUtil.toString(join)}\n")
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to