godfreyhe commented on a change in pull request #13299:
URL: https://github.com/apache/flink/pull/13299#discussion_r495764467
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLookupJoin.scala
##########
@@ -34,7 +34,7 @@ import java.util
import scala.collection.JavaConversions._
/**
- * Batch physical RelNode for temporal table join.
+ * Batch physical RelNode for temporal table join that implements by lookup..
Review comment:
nit: redundant period
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
##########
@@ -17,19 +17,30 @@
*/
package org.apache.flink.table.planner.plan.rules.logical
-import
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecLookupJoin,
StreamExecTemporalJoin}
-
import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.hep.HepRelVertex
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.logical.{LogicalCorrelate, LogicalFilter,
LogicalSnapshot}
-import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess, RexInputRef,
RexNode, RexShuttle}
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical._
+import org.apache.calcite.rex._
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.connector.source.LookupTableSource
+import
org.apache.flink.table.planner.calcite.FlinkTypeFactory.isRowtimeIndicatorType
+import
org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalLegacyTableSourceScan,
FlinkLogicalTableSourceScan}
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecLookupJoin,
StreamExecTemporalJoin}
+import org.apache.flink.table.planner.plan.schema.{LegacyTableSourceTable,
TableSourceTable, TimeIndicatorRelDataType}
+import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
+import org.apache.flink.table.sources.LookupableTableSource
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
/**
* The initial temporal table join (FOR SYSTEM_TIME AS OF) is a Correlate,
rewrite it into a Join
* to make join condition can be pushed-down. The join will be translated into
- * [[StreamExecLookupJoin]] in physical and might be translated into
[[StreamExecTemporalJoin]]
- * in the future.
+ * [[StreamExecLookupJoin]] in physical or translated into
[[StreamExecTemporalJoin]].
Review comment:
please add some comments about the limitation like: this rule can only
be used in HepPlanner
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
##########
@@ -40,12 +51,105 @@ abstract class LogicalCorrelateToJoinFromTemporalTableRule(
def getLogicalSnapshot(call: RelOptRuleCall): LogicalSnapshot
+ /** Trim out the HepRelVertex wrapper and get current relational expression.
*/
+ protected def trimHep(node: RelNode): RelNode = {
+ node match {
+ case hepRelVertex: HepRelVertex =>
+ hepRelVertex.getCurrentRel
+ case _ => node
+ }
+ }
+
+ protected def validateSnapshotInCorrelate(
+ snapshot: LogicalSnapshot,
+ correlate: LogicalCorrelate): Unit = {
+ // period specification check
+ snapshot.getPeriod.getType match {
+ // validate type is event-time or processing time
Review comment:
nit: redundant blank
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
##########
@@ -40,12 +51,105 @@ abstract class LogicalCorrelateToJoinFromTemporalTableRule(
def getLogicalSnapshot(call: RelOptRuleCall): LogicalSnapshot
+ /** Trim out the HepRelVertex wrapper and get current relational expression.
*/
+ protected def trimHep(node: RelNode): RelNode = {
+ node match {
+ case hepRelVertex: HepRelVertex =>
+ hepRelVertex.getCurrentRel
+ case _ => node
+ }
+ }
+
+ protected def validateSnapshotInCorrelate(
+ snapshot: LogicalSnapshot,
+ correlate: LogicalCorrelate): Unit = {
+ // period specification check
+ snapshot.getPeriod.getType match {
+ // validate type is event-time or processing time
+ case t: TimeIndicatorRelDataType =>
+ case _ =>
+ throw new ValidationException("Temporal table join currently only
supports " +
+ "'FOR SYSTEM_TIME AS OF' left table's time attribute field")
Review comment:
left table -> right table ?
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
##########
@@ -40,12 +51,105 @@ abstract class LogicalCorrelateToJoinFromTemporalTableRule(
def getLogicalSnapshot(call: RelOptRuleCall): LogicalSnapshot
+ /** Trim out the HepRelVertex wrapper and get current relational expression.
*/
+ protected def trimHep(node: RelNode): RelNode = {
+ node match {
+ case hepRelVertex: HepRelVertex =>
+ hepRelVertex.getCurrentRel
+ case _ => node
+ }
+ }
+
+ protected def validateSnapshotInCorrelate(
+ snapshot: LogicalSnapshot,
+ correlate: LogicalCorrelate): Unit = {
+ // period specification check
+ snapshot.getPeriod.getType match {
+ // validate type is event-time or processing time
+ case t: TimeIndicatorRelDataType =>
+ case _ =>
+ throw new ValidationException("Temporal table join currently only
supports " +
+ "'FOR SYSTEM_TIME AS OF' left table's time attribute field")
+ }
+
+ snapshot.getPeriod match {
+ // validate period comes from left table's field
+ case r: RexFieldAccess if
r.getReferenceExpr.isInstanceOf[RexCorrelVariable] &&
+
correlate.getCorrelationId.equals(r.getReferenceExpr.asInstanceOf[RexCorrelVariable].id)
=>
Review comment:
ditto
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
##########
@@ -40,12 +51,105 @@ abstract class LogicalCorrelateToJoinFromTemporalTableRule(
def getLogicalSnapshot(call: RelOptRuleCall): LogicalSnapshot
+ /** Trim out the HepRelVertex wrapper and get current relational expression.
*/
+ protected def trimHep(node: RelNode): RelNode = {
+ node match {
+ case hepRelVertex: HepRelVertex =>
+ hepRelVertex.getCurrentRel
+ case _ => node
+ }
+ }
+
+ protected def validateSnapshotInCorrelate(
+ snapshot: LogicalSnapshot,
+ correlate: LogicalCorrelate): Unit = {
+ // period specification check
+ snapshot.getPeriod.getType match {
+ // validate type is event-time or processing time
+ case t: TimeIndicatorRelDataType =>
+ case _ =>
+ throw new ValidationException("Temporal table join currently only
supports " +
+ "'FOR SYSTEM_TIME AS OF' left table's time attribute field")
+ }
+
+ snapshot.getPeriod match {
+ // validate period comes from left table's field
+ case r: RexFieldAccess if
r.getReferenceExpr.isInstanceOf[RexCorrelVariable] &&
+
correlate.getCorrelationId.equals(r.getReferenceExpr.asInstanceOf[RexCorrelVariable].id)
=>
+ case _ =>
+ throw new ValidationException("Temporal table join currently only
supports " +
+ "'FOR SYSTEM_TIME AS OF' left table's time attribute field'")
+ }
+ }
+
+ protected def isLookupJoin(snapshot: LogicalSnapshot, snapshotInput:
RelNode): Boolean = {
+ val isProcessingTime = snapshot.getPeriod.getType match {
+ case t: TimeIndicatorRelDataType if !t.isEventTime => true
+ case _ => false
+ }
+
+ val tableScan = getTableScan(snapshotInput)
+ val snapshotOnLookupSource = tableScan match {
+ case Some(scan) => isTableSourceScan(scan) && isLookupTableSource(scan)
+ case _ => false
+ }
+
+ isProcessingTime && snapshotOnLookupSource
+ }
+
+ private def getTableScan(snapshotInput: RelNode): Option[TableScan] = {
+ snapshotInput match {
+ case tableScan: TableScan
+ => Some(tableScan)
+ // computed column on lookup table
+ case project: LogicalProject if
trimHep(project.getInput).isInstanceOf[TableScan]
Review comment:
remove `if` and call `getTableScan(trimHep(project.getInput))` directly
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
##########
@@ -74,23 +178,150 @@ abstract class
LogicalCorrelateToJoinFromTemporalTableRule(
val rel = builder.build()
call.transformTo(rel)
}
+}
+
+
+/**
+ * General temporal table join rule to rewrite the original Correlate into a
Join.
+ */
+abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule(
+ operand: RelOptRuleOperand,
+ description: String)
+ extends LogicalCorrelateToJoinFromTemporalTableRule(operand, description) {
+
+ protected def extractRightTimeInputRef(
+ leftInput: RelNode,
+ snapshot: LogicalSnapshot): Option[RexNode] = {
+ val rightFields = snapshot.getRowType.getFieldList.asScala
+ val timeAttributeFields = rightFields.filter(
+ f => f.getType.isInstanceOf[TimeIndicatorRelDataType])
+ val rexBuilder = snapshot.getCluster.getRexBuilder
+
+ if (timeAttributeFields != null && timeAttributeFields.length == 1) {
+ val leftFieldCnt = leftInput.getRowType.getFieldCount
Review comment:
let `leftFieldCnt` be the parameter directly
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
##########
@@ -74,23 +178,150 @@ abstract class
LogicalCorrelateToJoinFromTemporalTableRule(
val rel = builder.build()
call.transformTo(rel)
}
+}
+
+
Review comment:
nit: redundant blank
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
##########
@@ -74,23 +178,150 @@ abstract class
LogicalCorrelateToJoinFromTemporalTableRule(
val rel = builder.build()
call.transformTo(rel)
}
+}
+
+
+/**
+ * General temporal table join rule to rewrite the original Correlate into a
Join.
+ */
+abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule(
+ operand: RelOptRuleOperand,
+ description: String)
+ extends LogicalCorrelateToJoinFromTemporalTableRule(operand, description) {
+
+ protected def extractRightTimeInputRef(
+ leftInput: RelNode,
+ snapshot: LogicalSnapshot): Option[RexNode] = {
+ val rightFields = snapshot.getRowType.getFieldList.asScala
+ val timeAttributeFields = rightFields.filter(
+ f => f.getType.isInstanceOf[TimeIndicatorRelDataType])
+ val rexBuilder = snapshot.getCluster.getRexBuilder
+
+ if (timeAttributeFields != null && timeAttributeFields.length == 1) {
+ val leftFieldCnt = leftInput.getRowType.getFieldCount
+ val timeColIndex = leftFieldCnt +
rightFields.indexOf(timeAttributeFields.get(0))
+ val timeColDataType = timeAttributeFields.get(0).getType
+ Some(rexBuilder.makeInputRef(timeColDataType, timeColIndex))
+ } else {
+ None
+ }
+ }
+
+ protected def extractSnapshotTimeInputRef(
+ leftInput: RelNode,
+ snapshot: LogicalSnapshot): Option[RexInputRef] = {
+ val leftRowType = leftInput.getRowType
+ val leftFields = leftRowType.getFieldList
+ val periodField = snapshot.getPeriod.asInstanceOf[RexFieldAccess].getField
+ if (leftFields.contains(periodField)) {
+ val index = leftRowType.getFieldList.indexOf(periodField)
+ Some(RexInputRef.of(index, leftRowType))
+ } else {
+ None
+ }
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val correlate: LogicalCorrelate = call.rel(0)
+ val leftInput: RelNode = call.rel(1)
+ val filterCondition = getFilterCondition(call)
+ val snapshot = getLogicalSnapshot(call)
+
+ val leftRowType = leftInput.getRowType
+ val joinCondition = filterCondition.accept(new RexShuttle() {
+ // change correlate variable expression to normal RexInputRef (which is
from left side)
+ override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
+ fieldAccess.getReferenceExpr match {
+ case corVar: RexCorrelVariable =>
+ require(correlate.getCorrelationId.equals(corVar.id))
+ val index = leftRowType.getFieldList.indexOf(fieldAccess.getField)
+ RexInputRef.of(index, leftRowType)
+ case _ => super.visitFieldAccess(fieldAccess)
+ }
+ }
+ // update the field index from right side
+ override def visitInputRef(inputRef: RexInputRef): RexNode = {
+ val rightIndex = leftRowType.getFieldCount + inputRef.getIndex
+ new RexInputRef(rightIndex, inputRef.getType)
+ }
+ })
+
+ validateSnapshotInCorrelate(snapshot, correlate)
+
+ val (leftJoinKey, rightJoinKey) = {
+ val rexBuilder = correlate.getCluster.getRexBuilder
+ val relBuilder = call.builder()
+ relBuilder.push(leftInput)
+ relBuilder.push(snapshot)
+ val rewriteJoin = relBuilder.join(correlate.getJoinType,
joinCondition).build()
+ val joinInfo = rewriteJoin.asInstanceOf[LogicalJoin].analyzeCondition()
+ val leftJoinKey = joinInfo.leftKeys.map(i =>
rexBuilder.makeInputRef(leftInput, i))
+ val rightJoinKey = joinInfo.rightKeys.map(i => {
+ val leftFieldCnt = leftInput.getRowType.getFieldCount
+ val leftKeyType = snapshot.getRowType.getFieldList.get(i).getType
+ rexBuilder.makeInputRef(leftKeyType, leftFieldCnt + i)
+ })
+ (leftJoinKey, rightJoinKey)
+ }
+
+ val snapshotTimeInputRef = extractSnapshotTimeInputRef(leftInput, snapshot)
+ .getOrElse(throw new ValidationException("Temporal Table Join requires
time attribute in the " +
+ s"left table, but no row time attribute found."))
+
+ val rexBuilder = correlate.getCluster.getRexBuilder
Review comment:
we can reuse rexBuilder
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacyTemporalJoin.scala
##########
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Join, JoinInfo, JoinRelType}
+import org.apache.calcite.rex._
Review comment:
move these imports behind `org.apache.flink.`
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
##########
@@ -244,7 +243,25 @@ class FlinkRelMdUniqueKeys private extends
MetadataHandler[BuiltInMetadata.Uniqu
ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
val inputUniqueKeys = mq.getUniqueKeys(rel.getInput, ignoreNulls)
val rankFunColumnIndex =
RankUtil.getRankNumberColumnIndex(rel).getOrElse(-1)
- if (rankFunColumnIndex < 0) {
+ //TODO current deduplicate on row time is still a Rank,
+ // remove this after support deduplicate on row time
+ val canConvertToDeduplicate: Boolean = {
Review comment:
it's better we can put them into a separate commit
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -339,75 +367,50 @@ object StreamExecTemporalJoinToCoProcessTranslator {
var rightTimeAttribute: Option[RexNode] = None
- var rightPrimaryKeyExpression: Option[RexNode] = None
+ var rightPrimaryKeyExpression: Option[Array[RexNode]] = None
override def visitCall(call: RexCall): RexNode = {
if (call.getOperator != TEMPORAL_JOIN_CONDITION) {
return super.visitCall(call)
}
- checkState(
- leftTimeAttribute.isEmpty
- && rightPrimaryKeyExpression.isEmpty
- && rightTimeAttribute.isEmpty,
- "Multiple %s functions in [%s]",
- TEMPORAL_JOIN_CONDITION,
- textualRepresentation)
-
- if (TemporalJoinUtil.isRowtimeCall(call)) {
+ if (TemporalJoinUtil.isRowTimeTemporalJoinConditionCall(call)) {
leftTimeAttribute = Some(call.getOperands.get(0))
rightTimeAttribute = Some(call.getOperands.get(1))
-
- rightPrimaryKeyExpression =
Some(validateRightPrimaryKey(call.getOperands.get(2)))
-
- if (!isRowtimeIndicatorType(rightTimeAttribute.get.getType)) {
- throw new ValidationException(
- s"Non rowtime timeAttribute [${rightTimeAttribute.get.getType}] " +
- s"used to create TemporalTableFunction")
- }
- if (!isRowtimeIndicatorType(leftTimeAttribute.get.getType)) {
- throw new ValidationException(
- s"Non rowtime timeAttribute [${leftTimeAttribute.get.getType}] " +
- s"passed as the argument to TemporalTableFunction")
- }
- }
- else if (TemporalJoinUtil.isProctimeCall(call)) {
+ rightPrimaryKeyExpression =
Some(validateRightPrimaryKey(call.getOperands.get(4)))
+ } else {
leftTimeAttribute = Some(call.getOperands.get(0))
- rightPrimaryKeyExpression =
Some(validateRightPrimaryKey(call.getOperands.get(1)))
-
- if (!isProctimeIndicatorType(leftTimeAttribute.get.getType)) {
- throw new ValidationException(
- s"Non processing timeAttribute [${leftTimeAttribute.get.getType}]
" +
- s"passed as the argument to TemporalTableFunction")
- }
- }
- else {
- throw new IllegalStateException(
- s"Unsupported invocation $call in [$textualRepresentation]")
}
rexBuilder.makeLiteral(true)
}
- private def validateRightPrimaryKey(rightPrimaryKey: RexNode): RexNode = {
- if (joinInfo.rightKeys.size() != 1) {
+ private def validateRightPrimaryKey(rightPrimaryKey: RexNode):
Array[RexNode] = {
+ if (!rightPrimaryKey.isInstanceOf[RexCall] ||
+ rightPrimaryKey.asInstanceOf[RexCall].getOperator !=
TEMPORAL_JOIN_CONDITION_PRIMARY_KEY) {
throw new ValidationException(
- s"Only single column join key is supported. " +
- s"Found ${joinInfo.rightKeys} in [$textualRepresentation]")
- }
- val rightJoinKeyInputReference = joinInfo.rightKeys.get(0) +
rightKeysStartingOffset
+ s"No primary key [${rightPrimaryKey.asInstanceOf[RexCall]}] " +
+ s"defined in versioned table of Event-time temporal table join")
+ }
- val rightPrimaryKeyInputReference = extractInputReference(
+ val rightJoinKeyInputRefs = joinInfo.rightKeys
+ .map(index => index + rightKeysStartingOffset)
+ .toArray
+
+ val rightPrimaryKeyInputRefs = extractInputRefs(
rightPrimaryKey,
textualRepresentation)
- if (rightPrimaryKeyInputReference != rightJoinKeyInputReference) {
+ val primaryKeyContainedInJoinKey = rightPrimaryKeyInputRefs.zipWithIndex
+ .map(r => (r._1, rightJoinKeyInputRefs(r._2)))
Review comment:
use `(ref, index)` instead of `r` in `map`
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
##########
@@ -244,7 +243,25 @@ class FlinkRelMdUniqueKeys private extends
MetadataHandler[BuiltInMetadata.Uniqu
ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
val inputUniqueKeys = mq.getUniqueKeys(rel.getInput, ignoreNulls)
val rankFunColumnIndex =
RankUtil.getRankNumberColumnIndex(rel).getOrElse(-1)
- if (rankFunColumnIndex < 0) {
+ //TODO current deduplicate on row time is still a Rank,
+ // remove this after support deduplicate on row time
+ val canConvertToDeduplicate: Boolean = {
+ val rankRange = rel.rankRange
+ val isRowNumberType = rel.rankType == RankType.ROW_NUMBER
+ val isLimit1 = rankRange match {
+ case rankRange: ConstantRankRange =>
+ rankRange.getRankStart() == 1 && rankRange.getRankEnd() == 1
+ case _ => false
+ }
+ !rel.outputRankNumber && isRowNumberType && isLimit1
Review comment:
`!rel.outputRankNumber` is unnecessary ? because even Rank output rank
number, its value is always 1
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
##########
@@ -40,12 +51,105 @@ abstract class LogicalCorrelateToJoinFromTemporalTableRule(
def getLogicalSnapshot(call: RelOptRuleCall): LogicalSnapshot
+ /** Trim out the HepRelVertex wrapper and get current relational expression.
*/
+ protected def trimHep(node: RelNode): RelNode = {
+ node match {
+ case hepRelVertex: HepRelVertex =>
+ hepRelVertex.getCurrentRel
+ case _ => node
+ }
+ }
+
+ protected def validateSnapshotInCorrelate(
+ snapshot: LogicalSnapshot,
+ correlate: LogicalCorrelate): Unit = {
+ // period specification check
+ snapshot.getPeriod.getType match {
+ // validate type is event-time or processing time
+ case t: TimeIndicatorRelDataType =>
Review comment:
it's better to add some comments like "do nothing" at the end of this
line
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexBuilder, RexCall, RexNode, RexShuttle}
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalJoin,
FlinkLogicalRel, FlinkLogicalSnapshot}
+import
org.apache.flink.table.planner.plan.rules.physical.common.CommonTemporalTableJoinRule
+import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType
+import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
+
+import scala.collection.JavaConversions._
+
+/**
+ * Planner rule that rewrites temporal join with extracted primary key,
Event-time temporal
+ * table join requires primary key and row time attribute of versioned table.
The versioned table
+ * could be a table source or a view only if it contains the unique key and
time attribute.
+ *
+ * <p> Flink support extract the primary key and row time attribute from the
view if the view comes
+ * from [[LogicalRank]] node which can convert to a [[Deduplicate]] node.
+ */
+class TemporalJoinRewriteWithUniqueKeyRule extends RelOptRule(
+ operand(classOf[FlinkLogicalJoin],
+ operand(classOf[FlinkLogicalRel], any()),
+ operand(classOf[FlinkLogicalSnapshot],
+ operand(classOf[FlinkLogicalRel], any()))),
+ "TemporalJoinRewriteWithUniqueKeyRule")
+ with CommonTemporalTableJoinRule {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val join = call.rel[FlinkLogicalJoin](0)
+ val snapshot = call.rel[FlinkLogicalSnapshot](2)
+ val snapshotInput = call.rel[FlinkLogicalRel](3)
+
+ val isTemporalJoin = matches(snapshot)
+ val canConvertToLookup = canConvertToLookupJoin(snapshot, snapshotInput)
+ val supportedJoinTypes = Seq(JoinRelType.INNER)
+
+ isTemporalJoin && !canConvertToLookup &&
supportedJoinTypes.contains(join.getJoinType)
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val join = call.rel[FlinkLogicalJoin](0)
+ val leftInput = call.rel[FlinkLogicalRel](1)
+ val snapshot = call.rel[FlinkLogicalSnapshot](2)
+
+ val joinCondition = join.getCondition
+
+ val newJoinCondition = joinCondition.accept(new RexShuttle {
+ override def visitCall(call: RexCall): RexNode = {
+ if (call.getOperator == TemporalJoinUtil.TEMPORAL_JOIN_CONDITION &&
+ isRowTimeTemporalTableJoin(snapshot)) {
+ val snapshotTimeInputRef = call.operands(0)
+ val rightTimeInputRef = call.operands(1)
+ val leftJoinKey = call.operands(2).asInstanceOf[RexCall].operands
+ val rightJoinKey = call.operands(3).asInstanceOf[RexCall].operands
+
+ val rexBuilder = join.getCluster.getRexBuilder
+ val primaryKeyInputRefs = extractPrimaryKeyInputRefs(leftInput,
snapshot, rexBuilder)
+ if (primaryKeyInputRefs.isEmpty) {
+ throw new ValidationException("Event-Time Temporal Table Join
requires both" +
+ s" primary key and row time attribute in versioned table," +
+ s" but no primary key found.")
+ }
+ TemporalJoinUtil.makeRowTimeTemporalJoinConditionCall(rexBuilder,
snapshotTimeInputRef,
+ rightTimeInputRef, leftJoinKey, rightJoinKey,
primaryKeyInputRefs.get)
+ }
+ else {
+ super.visitCall(call)
+ }
+ }
+ })
+ val rewriteJoin = FlinkLogicalJoin.create(
+ leftInput, snapshot, newJoinCondition, join.getJoinType)
+ call.transformTo(rewriteJoin)
+ }
+
+ private def extractPrimaryKeyInputRefs(
+ leftInput: RelNode,
+ snapshot: FlinkLogicalSnapshot,
+ rexBuilder: RexBuilder): Option[Seq[RexNode]] = {
+ val rightFields = snapshot.getRowType.getFieldList
+ val fmq =
FlinkRelMetadataQuery.reuseOrCreate(snapshot.getCluster.getMetadataQuery)
+
+ val uniqueKeys = fmq.getUniqueKeys(snapshot.getInput())
+ val fields = snapshot.getRowType.getFieldList
+
+ if (uniqueKeys != null && uniqueKeys.size() > 0) {
+ val leftFieldCnt = leftInput.getRowType.getFieldCount
+ uniqueKeys
+ .filter(_.nonEmpty)
+ .map(_.toArray
+ .map(fields)
+ .map(f => rexBuilder.makeInputRef(
+ f.getType,
+ leftFieldCnt + rightFields.indexOf(f)))
+ .toSeq)
+ .toArray
+ .sortBy(_.length)
+ .headOption
+ } else {
+ None
+ }
+ }
+
+ private def isRowTimeTemporalTableJoin(snapshot: FlinkLogicalSnapshot):
Boolean =
+ snapshot.getPeriod.getType match {
+ case t: TimeIndicatorRelDataType if t.isEventTime => true
+ case _ => false
+ }
+}
+
+object TemporalJoinRewriteWithUniqueKeyRule {
+ val INSTANCE: TemporalJoinRewriteWithUniqueKeyRule = new
TemporalJoinRewriteWithUniqueKeyRule
+}
+
+
Review comment:
nit: redundant blank
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala
##########
@@ -17,129 +17,256 @@
*/
package org.apache.flink.table.planner.plan.stream.sql.join
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api._
+import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.planner.utils.{StreamTableTestUtil,
TableTestBase}
-
-import org.hamcrest.Matchers.containsString
+import org.junit.Assert.{assertTrue, fail}
import org.junit.Test
-import java.sql.Timestamp
-
+/**
+ * Test temporal join in stream mode.
+ */
class TemporalJoinTest extends TableTestBase {
val util: StreamTableTestUtil = streamTestUtil()
- private val orders = util.addDataStream[(Long, String)](
- "Orders", 'o_amount, 'o_currency, 'o_rowtime.rowtime)
+ util.addTable(
+ """
+ |CREATE TABLE Orders (
+ | o_amount INT,
+ | o_currency STRING,
+ | o_rowtime TIMESTAMP(3),
+ | o_proctime as PROCTIME(),
+ | WATERMARK FOR o_rowtime AS o_rowtime
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin)
+ util.addTable(
+ """
+ |CREATE TABLE RatesHistory (
+ | currency STRING,
+ | rate INT,
+ | rowtime TIMESTAMP(3),
+ | WATERMARK FOR rowtime AS rowtime
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin)
- private val ratesHistory = util.addDataStream[(String, Int, Timestamp)](
- "RatesHistory", 'currency, 'rate, 'rowtime.rowtime)
+ util.addTable(
+ """
+ |CREATE TABLE RatesHistoryWithPK (
+ | currency STRING,
+ | rate INT,
+ | rowtime TIMESTAMP(3),
+ | WATERMARK FOR rowtime AS rowtime,
+ | PRIMARY KEY(currency) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin)
- util.addFunction(
- "Rates",
- ratesHistory.createTemporalTableFunction($"rowtime", $"currency"))
+ util.addTable(
+ """
+ |CREATE TABLE RatesOnly (
+ | currency STRING,
+ | rate INT,
+ | proctime AS PROCTIME()
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin)
- private val proctimeOrders = util.addDataStream[(Long, String)](
- "ProctimeOrders", 'o_amount, 'o_currency, 'o_proctime.proctime)
+ util.addTable(
+ " CREATE VIEW DeduplicatedView as SELECT currency, rate, rowtime FROM " +
+ " (SELECT *, " +
+ " ROW_NUMBER() OVER (PARTITION BY currency ORDER BY rowtime
DESC) AS rowNum " +
+ " FROM RatesHistory" +
+ " ) T " +
+ " WHERE rowNum = 1")
- private val proctimeRatesHistory = util.addDataStream[(String, Int)](
- "ProctimeRatesHistory", 'currency, 'rate, 'proctime.proctime)
+ util.addTable(
+ " CREATE VIEW latestView as SELECT T.currency, T.rate, T.proctime FROM " +
+ " (SELECT *, " +
+ " ROW_NUMBER() OVER (PARTITION BY currency ORDER BY proctime
DESC) AS rowNum " +
+ " FROM RatesOnly" +
+ " ) T " +
+ " WHERE T.rowNum = 1")
- util.addFunction(
- "ProctimeRates",
- proctimeRatesHistory.createTemporalTableFunction($"proctime", $"currency"))
+ util.addTable("CREATE VIEW latest_rates AS SELECT currency, LAST_VALUE(rate)
AS rate " +
+ "FROM RatesHistory " +
+ "GROUP BY currency ")
@Test
- def testSimpleJoin(): Unit = {
+ def testEventTimeTemporalJoin(): Unit = {
val sqlQuery = "SELECT " +
"o_amount * rate as rate " +
- "FROM Orders AS o, " +
- "LATERAL TABLE (Rates(o.o_rowtime)) AS r " +
- "WHERE currency = o_currency"
+ "FROM Orders AS o JOIN " +
+ "RatesHistoryWithPK FOR SYSTEM_TIME AS OF o.o_rowtime as r " +
+ "on o.o_currency = r.currency"
util.verifyPlan(sqlQuery)
}
@Test
- def testSimpleProctimeJoin(): Unit = {
+ def testEventTimeTemporalJoinWithView(): Unit = {
val sqlQuery = "SELECT " +
"o_amount * rate as rate " +
- "FROM ProctimeOrders AS o, " +
- "LATERAL TABLE (ProctimeRates(o.o_proctime)) AS r " +
- "WHERE currency = o_currency"
+ "FROM Orders AS o JOIN " +
+ "DeduplicatedView " +
+ "FOR SYSTEM_TIME AS OF o.o_rowtime as r1 " +
+ "on o.o_currency = r1.currency"
util.verifyPlan(sqlQuery)
}
@Test
- def testJoinOnQueryLeft(): Unit = {
- val orders = util.tableEnv.sqlQuery("SELECT * FROM Orders WHERE o_amount >
1000")
- util.tableEnv.createTemporaryView("Orders2", orders)
-
+ def testProcTimeTemporalJoin(): Unit = {
val sqlQuery = "SELECT " +
"o_amount * rate as rate " +
- "FROM Orders2 AS o, " +
- "LATERAL TABLE (Rates(o.o_rowtime)) AS r " +
- "WHERE currency = o_currency"
+ "FROM Orders AS o JOIN " +
+ "latestView " +
+ "FOR SYSTEM_TIME AS OF o.o_proctime as r1 " +
+ "on o.o_currency = r1.currency"
util.verifyPlan(sqlQuery)
}
- /**
- * Test versioned joins with more complicated query.
- * Important thing here is that we have complex OR join condition
- * and there are some columns that are not being used (are being pruned).
- */
@Test
- def testComplexJoin(): Unit = {
- val util = streamTestUtil()
- util.addDataStream[(String, Int)]("Table3", 't3_comment, 't3_secondary_key)
- util.addDataStream[(Timestamp, String, Long, String, Int)](
- "Orders", 'o_rowtime.rowtime, 'o_comment, 'o_amount, 'o_currency,
'o_secondary_key)
-
- util.addDataStream[(Timestamp, String, String, Int, Int)](
- "RatesHistory", 'rowtime.rowtime, 'comment, 'currency, 'rate,
'secondary_key)
- val rates = util.tableEnv
- .sqlQuery("SELECT * FROM RatesHistory WHERE rate > 110")
- .createTemporalTableFunction($"rowtime", $"currency")
- util.addTemporarySystemFunction("Rates", rates)
-
- val sqlQuery =
- "SELECT * FROM " +
- "(SELECT " +
- "o_amount * rate as rate, " +
- "secondary_key as secondary_key " +
- "FROM Orders AS o, " +
- "LATERAL TABLE (Rates(o_rowtime)) AS r " +
- "WHERE currency = o_currency OR secondary_key = o_secondary_key), " +
- "Table3 " +
- "WHERE t3_secondary_key = secondary_key"
+ def testProcTimeTemporalJoinWithView(): Unit = {
Review comment:
other cases: non-equal join condition, equal join condition with other
predicates, etc
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
##########
@@ -40,12 +51,105 @@ abstract class LogicalCorrelateToJoinFromTemporalTableRule(
def getLogicalSnapshot(call: RelOptRuleCall): LogicalSnapshot
+ /** Trim out the HepRelVertex wrapper and get current relational expression.
*/
+ protected def trimHep(node: RelNode): RelNode = {
+ node match {
+ case hepRelVertex: HepRelVertex =>
+ hepRelVertex.getCurrentRel
+ case _ => node
+ }
+ }
+
+ protected def validateSnapshotInCorrelate(
+ snapshot: LogicalSnapshot,
+ correlate: LogicalCorrelate): Unit = {
+ // period specification check
+ snapshot.getPeriod.getType match {
+ // validate type is event-time or processing time
+ case t: TimeIndicatorRelDataType =>
+ case _ =>
+ throw new ValidationException("Temporal table join currently only
supports " +
+ "'FOR SYSTEM_TIME AS OF' left table's time attribute field")
+ }
+
+ snapshot.getPeriod match {
+ // validate period comes from left table's field
+ case r: RexFieldAccess if
r.getReferenceExpr.isInstanceOf[RexCorrelVariable] &&
+
correlate.getCorrelationId.equals(r.getReferenceExpr.asInstanceOf[RexCorrelVariable].id)
=>
+ case _ =>
+ throw new ValidationException("Temporal table join currently only
supports " +
+ "'FOR SYSTEM_TIME AS OF' left table's time attribute field'")
+ }
+ }
+
+ protected def isLookupJoin(snapshot: LogicalSnapshot, snapshotInput:
RelNode): Boolean = {
+ val isProcessingTime = snapshot.getPeriod.getType match {
+ case t: TimeIndicatorRelDataType if !t.isEventTime => true
+ case _ => false
+ }
+
+ val tableScan = getTableScan(snapshotInput)
+ val snapshotOnLookupSource = tableScan match {
+ case Some(scan) => isTableSourceScan(scan) && isLookupTableSource(scan)
+ case _ => false
+ }
+
+ isProcessingTime && snapshotOnLookupSource
+ }
+
+ private def getTableScan(snapshotInput: RelNode): Option[TableScan] = {
+ snapshotInput match {
+ case tableScan: TableScan
+ => Some(tableScan)
Review comment:
nit: move `=>` into above line ?
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
##########
@@ -40,12 +51,105 @@ abstract class LogicalCorrelateToJoinFromTemporalTableRule(
def getLogicalSnapshot(call: RelOptRuleCall): LogicalSnapshot
+ /** Trim out the HepRelVertex wrapper and get current relational expression.
*/
+ protected def trimHep(node: RelNode): RelNode = {
+ node match {
+ case hepRelVertex: HepRelVertex =>
+ hepRelVertex.getCurrentRel
+ case _ => node
+ }
+ }
+
+ protected def validateSnapshotInCorrelate(
+ snapshot: LogicalSnapshot,
+ correlate: LogicalCorrelate): Unit = {
+ // period specification check
+ snapshot.getPeriod.getType match {
+ // validate type is event-time or processing time
+ case t: TimeIndicatorRelDataType =>
+ case _ =>
+ throw new ValidationException("Temporal table join currently only
supports " +
+ "'FOR SYSTEM_TIME AS OF' left table's time attribute field")
+ }
+
+ snapshot.getPeriod match {
+ // validate period comes from left table's field
+ case r: RexFieldAccess if
r.getReferenceExpr.isInstanceOf[RexCorrelVariable] &&
+
correlate.getCorrelationId.equals(r.getReferenceExpr.asInstanceOf[RexCorrelVariable].id)
=>
+ case _ =>
+ throw new ValidationException("Temporal table join currently only
supports " +
+ "'FOR SYSTEM_TIME AS OF' left table's time attribute field'")
+ }
+ }
+
+ protected def isLookupJoin(snapshot: LogicalSnapshot, snapshotInput:
RelNode): Boolean = {
+ val isProcessingTime = snapshot.getPeriod.getType match {
+ case t: TimeIndicatorRelDataType if !t.isEventTime => true
+ case _ => false
+ }
+
+ val tableScan = getTableScan(snapshotInput)
+ val snapshotOnLookupSource = tableScan match {
+ case Some(scan) => isTableSourceScan(scan) && isLookupTableSource(scan)
+ case _ => false
+ }
+
+ isProcessingTime && snapshotOnLookupSource
+ }
+
+ private def getTableScan(snapshotInput: RelNode): Option[TableScan] = {
+ snapshotInput match {
+ case tableScan: TableScan
+ => Some(tableScan)
+ // computed column on lookup table
+ case project: LogicalProject if
trimHep(project.getInput).isInstanceOf[TableScan]
+ => Some(trimHep(project.getInput).asInstanceOf[TableScan])
+ case _ => None
+ }
+ }
+
+ private def isTableSourceScan(relNode: RelNode): Boolean = {
+ relNode match {
+ case _: LogicalTableScan | _: FlinkLogicalLegacyTableSourceScan |
Review comment:
we should also check whether the `table` in a `LogicalTableScan` is a
`TableSourceTable`
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
##########
@@ -74,23 +178,150 @@ abstract class
LogicalCorrelateToJoinFromTemporalTableRule(
val rel = builder.build()
call.transformTo(rel)
}
+}
+
+
+/**
+ * General temporal table join rule to rewrite the original Correlate into a
Join.
+ */
+abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule(
+ operand: RelOptRuleOperand,
+ description: String)
+ extends LogicalCorrelateToJoinFromTemporalTableRule(operand, description) {
+
+ protected def extractRightTimeInputRef(
+ leftInput: RelNode,
+ snapshot: LogicalSnapshot): Option[RexNode] = {
+ val rightFields = snapshot.getRowType.getFieldList.asScala
+ val timeAttributeFields = rightFields.filter(
+ f => f.getType.isInstanceOf[TimeIndicatorRelDataType])
+ val rexBuilder = snapshot.getCluster.getRexBuilder
+
+ if (timeAttributeFields != null && timeAttributeFields.length == 1) {
+ val leftFieldCnt = leftInput.getRowType.getFieldCount
+ val timeColIndex = leftFieldCnt +
rightFields.indexOf(timeAttributeFields.get(0))
+ val timeColDataType = timeAttributeFields.get(0).getType
+ Some(rexBuilder.makeInputRef(timeColDataType, timeColIndex))
+ } else {
+ None
+ }
+ }
+
+ protected def extractSnapshotTimeInputRef(
+ leftInput: RelNode,
+ snapshot: LogicalSnapshot): Option[RexInputRef] = {
+ val leftRowType = leftInput.getRowType
+ val leftFields = leftRowType.getFieldList
+ val periodField = snapshot.getPeriod.asInstanceOf[RexFieldAccess].getField
+ if (leftFields.contains(periodField)) {
+ val index = leftRowType.getFieldList.indexOf(periodField)
+ Some(RexInputRef.of(index, leftRowType))
+ } else {
+ None
+ }
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val correlate: LogicalCorrelate = call.rel(0)
+ val leftInput: RelNode = call.rel(1)
+ val filterCondition = getFilterCondition(call)
+ val snapshot = getLogicalSnapshot(call)
+
+ val leftRowType = leftInput.getRowType
+ val joinCondition = filterCondition.accept(new RexShuttle() {
+ // change correlate variable expression to normal RexInputRef (which is
from left side)
+ override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
+ fieldAccess.getReferenceExpr match {
+ case corVar: RexCorrelVariable =>
+ require(correlate.getCorrelationId.equals(corVar.id))
+ val index = leftRowType.getFieldList.indexOf(fieldAccess.getField)
+ RexInputRef.of(index, leftRowType)
+ case _ => super.visitFieldAccess(fieldAccess)
+ }
+ }
+ // update the field index from right side
+ override def visitInputRef(inputRef: RexInputRef): RexNode = {
+ val rightIndex = leftRowType.getFieldCount + inputRef.getIndex
+ new RexInputRef(rightIndex, inputRef.getType)
+ }
+ })
+
+ validateSnapshotInCorrelate(snapshot, correlate)
+
+ val (leftJoinKey, rightJoinKey) = {
+ val rexBuilder = correlate.getCluster.getRexBuilder
+ val relBuilder = call.builder()
+ relBuilder.push(leftInput)
+ relBuilder.push(snapshot)
+ val rewriteJoin = relBuilder.join(correlate.getJoinType,
joinCondition).build()
+ val joinInfo = rewriteJoin.asInstanceOf[LogicalJoin].analyzeCondition()
+ val leftJoinKey = joinInfo.leftKeys.map(i =>
rexBuilder.makeInputRef(leftInput, i))
+ val rightJoinKey = joinInfo.rightKeys.map(i => {
+ val leftFieldCnt = leftInput.getRowType.getFieldCount
+ val leftKeyType = snapshot.getRowType.getFieldList.get(i).getType
+ rexBuilder.makeInputRef(leftKeyType, leftFieldCnt + i)
+ })
+ (leftJoinKey, rightJoinKey)
+ }
+
+ val snapshotTimeInputRef = extractSnapshotTimeInputRef(leftInput, snapshot)
+ .getOrElse(throw new ValidationException("Temporal Table Join requires
time attribute in the " +
+ s"left table, but no row time attribute found."))
+
+ val rexBuilder = correlate.getCluster.getRexBuilder
+ val temporalCondition = if(isRowTimeTemporalTableJoin(snapshot)) {
+ val rightTimeInputRef = extractRightTimeInputRef(leftInput, snapshot)
+ if (rightTimeInputRef.isEmpty ||
!isRowtimeIndicatorType(rightTimeInputRef.get.getType)) {
+ throw new ValidationException("Event-Time Temporal Table Join
requires both" +
+ s" primary key and row time attribute in versioned table," +
+ s" but no row time attribute found.")
+ }
+ // Deal primary key in TemporalJoinRewriteUniqueKeyRule
+ TemporalJoinUtil.makeRowTimeTemporalJoinConditionCall(rexBuilder,
snapshotTimeInputRef,
+ rightTimeInputRef.get, leftJoinKey, rightJoinKey)
+ } else {
+ TemporalJoinUtil.makeProcTimeTemporalJoinConditionCall(
+ rexBuilder, snapshotTimeInputRef, leftJoinKey, rightJoinKey)
+ }
+
+ val builder = call.builder()
+ val condition = builder.and(joinCondition, temporalCondition)
Review comment:
nit: redundant blank
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
##########
@@ -74,23 +178,150 @@ abstract class
LogicalCorrelateToJoinFromTemporalTableRule(
val rel = builder.build()
call.transformTo(rel)
}
+}
+
+
+/**
+ * General temporal table join rule to rewrite the original Correlate into a
Join.
+ */
+abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule(
+ operand: RelOptRuleOperand,
+ description: String)
+ extends LogicalCorrelateToJoinFromTemporalTableRule(operand, description) {
+
+ protected def extractRightTimeInputRef(
+ leftInput: RelNode,
+ snapshot: LogicalSnapshot): Option[RexNode] = {
+ val rightFields = snapshot.getRowType.getFieldList.asScala
+ val timeAttributeFields = rightFields.filter(
+ f => f.getType.isInstanceOf[TimeIndicatorRelDataType])
+ val rexBuilder = snapshot.getCluster.getRexBuilder
+
+ if (timeAttributeFields != null && timeAttributeFields.length == 1) {
+ val leftFieldCnt = leftInput.getRowType.getFieldCount
+ val timeColIndex = leftFieldCnt +
rightFields.indexOf(timeAttributeFields.get(0))
+ val timeColDataType = timeAttributeFields.get(0).getType
+ Some(rexBuilder.makeInputRef(timeColDataType, timeColIndex))
+ } else {
+ None
+ }
+ }
+
+ protected def extractSnapshotTimeInputRef(
+ leftInput: RelNode,
+ snapshot: LogicalSnapshot): Option[RexInputRef] = {
+ val leftRowType = leftInput.getRowType
+ val leftFields = leftRowType.getFieldList
+ val periodField = snapshot.getPeriod.asInstanceOf[RexFieldAccess].getField
+ if (leftFields.contains(periodField)) {
+ val index = leftRowType.getFieldList.indexOf(periodField)
+ Some(RexInputRef.of(index, leftRowType))
+ } else {
+ None
+ }
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val correlate: LogicalCorrelate = call.rel(0)
+ val leftInput: RelNode = call.rel(1)
+ val filterCondition = getFilterCondition(call)
+ val snapshot = getLogicalSnapshot(call)
+
+ val leftRowType = leftInput.getRowType
+ val joinCondition = filterCondition.accept(new RexShuttle() {
+ // change correlate variable expression to normal RexInputRef (which is
from left side)
+ override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
+ fieldAccess.getReferenceExpr match {
+ case corVar: RexCorrelVariable =>
+ require(correlate.getCorrelationId.equals(corVar.id))
+ val index = leftRowType.getFieldList.indexOf(fieldAccess.getField)
+ RexInputRef.of(index, leftRowType)
+ case _ => super.visitFieldAccess(fieldAccess)
+ }
+ }
+ // update the field index from right side
+ override def visitInputRef(inputRef: RexInputRef): RexNode = {
+ val rightIndex = leftRowType.getFieldCount + inputRef.getIndex
+ new RexInputRef(rightIndex, inputRef.getType)
+ }
+ })
+
+ validateSnapshotInCorrelate(snapshot, correlate)
+
+ val (leftJoinKey, rightJoinKey) = {
+ val rexBuilder = correlate.getCluster.getRexBuilder
+ val relBuilder = call.builder()
+ relBuilder.push(leftInput)
+ relBuilder.push(snapshot)
+ val rewriteJoin = relBuilder.join(correlate.getJoinType,
joinCondition).build()
+ val joinInfo = rewriteJoin.asInstanceOf[LogicalJoin].analyzeCondition()
+ val leftJoinKey = joinInfo.leftKeys.map(i =>
rexBuilder.makeInputRef(leftInput, i))
+ val rightJoinKey = joinInfo.rightKeys.map(i => {
+ val leftFieldCnt = leftInput.getRowType.getFieldCount
Review comment:
move this line out of `map`
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
##########
@@ -103,18 +334,86 @@ class
LogicalCorrelateToJoinFromTemporalTableRuleWithFilter
}
/**
- * Planner rule that matches temporal table join which join condition is true,
- * that means the right input of the Correlate is a Snapshot.
- * e.g. SELECT * FROM MyTable AS T JOIN temporalTest FOR SYSTEM_TIME AS OF
T.proctime AS D ON true
+ * Planner rule that matches temporal table join which implemented by lookup
join, the join
+ * condition is true, that means the right input of the Correlate is a
Snapshot.
+ * e.g. SELECT * FROM MyTable AS T JOIN lookupTable FOR SYSTEM_TIME AS OF
T.proctime AS D ON true
*/
+class LogicalCorrelateToJoinFromLookupTableRuleWithoutFilter
+ extends LogicalCorrelateToJoinFromLookupTemporalTableRule(
+ operand(classOf[LogicalCorrelate],
+ operand(classOf[RelNode], any()),
+ operand(classOf[LogicalSnapshot],
+ operand(classOf[RelNode], any()))),
+ "LogicalCorrelateToJoinFromLookupTableRuleWithoutFilter"
+ ) {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val snapshot: LogicalSnapshot = call.rel(2)
+ val snapshotInput: RelNode = trimHep(call.rel(3))
+ isLookupJoin(snapshot, snapshotInput)
+ }
+
+ override def getFilterCondition(call: RelOptRuleCall): RexNode = {
+ call.builder().literal(true)
+ }
+
+ override def getLogicalSnapshot(call: RelOptRuleCall): LogicalSnapshot = {
+ call.rels(2).asInstanceOf[LogicalSnapshot]
+ }
+}
+
+/**
+ * Planner rule that matches general temporal table join except lookup join,
the join
+ * condition is not true, that means the right input of the Correlate is a
Filter.
+ * e.g. SELECT * FROM MyTable AS T JOIN lookupTable FOR SYSTEM_TIME AS OF
T.rowtime AS D
Review comment:
lookupTable => temporalTable
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -275,27 +280,47 @@ object StreamExecTemporalJoinToCoProcessTranslator {
joinInfo: JoinInfo,
rexBuilder: RexBuilder): StreamExecTemporalJoinToCoProcessTranslator = {
- checkState(
- !joinInfo.isEqui,
- "Missing %s in join condition",
- TEMPORAL_JOIN_CONDITION)
val leftType = FlinkTypeFactory.toLogicalRowType(leftInput.getRowType)
val rightType = FlinkTypeFactory.toLogicalRowType(rightInput.getRowType)
- val nonEquiJoinRex: RexNode = joinInfo.getRemaining(rexBuilder)
+
val temporalJoinConditionExtractor = new TemporalJoinConditionExtractor(
textualRepresentation,
leftType.getFieldCount,
joinInfo,
rexBuilder)
+ val nonEquiJoinRex: RexNode = joinInfo.getRemaining(rexBuilder)
val remainingNonEquiJoinPredicates =
temporalJoinConditionExtractor.apply(nonEquiJoinRex)
- checkState(
- temporalJoinConditionExtractor.leftTimeAttribute.isDefined &&
- temporalJoinConditionExtractor.rightPrimaryKeyExpression.isDefined,
- "Missing %s in join condition",
- TEMPORAL_JOIN_CONDITION)
+ val (leftTimeAttributeInputRef, rightTimeAttributeInputRef) =
+ if (TemporalJoinUtil.isRowTimeJoin(rexBuilder, joinInfo)) {
+ checkState(
+ temporalJoinConditionExtractor.leftTimeAttribute.isDefined &&
+ temporalJoinConditionExtractor.rightPrimaryKeyExpression.isDefined,
+ "Missing %s in Event-Time temporal join condition",
+ TEMPORAL_JOIN_CONDITION)
+ (extractInputRef(
Review comment:
use local variable to store each value of the Tuple2, which could
improve code readability
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
##########
@@ -74,23 +178,150 @@ abstract class
LogicalCorrelateToJoinFromTemporalTableRule(
val rel = builder.build()
call.transformTo(rel)
}
+}
+
+
+/**
+ * General temporal table join rule to rewrite the original Correlate into a
Join.
+ */
+abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule(
+ operand: RelOptRuleOperand,
+ description: String)
+ extends LogicalCorrelateToJoinFromTemporalTableRule(operand, description) {
+
+ protected def extractRightTimeInputRef(
+ leftInput: RelNode,
+ snapshot: LogicalSnapshot): Option[RexNode] = {
+ val rightFields = snapshot.getRowType.getFieldList.asScala
+ val timeAttributeFields = rightFields.filter(
+ f => f.getType.isInstanceOf[TimeIndicatorRelDataType])
+ val rexBuilder = snapshot.getCluster.getRexBuilder
Review comment:
move it to where it is used
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
##########
@@ -74,23 +178,150 @@ abstract class
LogicalCorrelateToJoinFromTemporalTableRule(
val rel = builder.build()
call.transformTo(rel)
}
+}
+
+
+/**
+ * General temporal table join rule to rewrite the original Correlate into a
Join.
+ */
+abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule(
+ operand: RelOptRuleOperand,
+ description: String)
+ extends LogicalCorrelateToJoinFromTemporalTableRule(operand, description) {
+
+ protected def extractRightTimeInputRef(
+ leftInput: RelNode,
+ snapshot: LogicalSnapshot): Option[RexNode] = {
+ val rightFields = snapshot.getRowType.getFieldList.asScala
+ val timeAttributeFields = rightFields.filter(
+ f => f.getType.isInstanceOf[TimeIndicatorRelDataType])
+ val rexBuilder = snapshot.getCluster.getRexBuilder
+
+ if (timeAttributeFields != null && timeAttributeFields.length == 1) {
Review comment:
`timeAttributeFields ` is never null
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonTemporalTableJoinRule.scala
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.rules.physical.common
+
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.{LogicalProject, LogicalTableScan}
+import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess}
+
+import org.apache.flink.table.planner.plan.nodes.logical._
+import org.apache.flink.table.planner.plan.schema.{LegacyTableSourceTable,
TableSourceTable, TimeIndicatorRelDataType}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.connector.source.LookupTableSource
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTemporalJoin
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin
+import org.apache.flink.table.sources.LookupableTableSource
+
+/**
+ * Base implementation that matches temporal join node.
+ *
+ * <p> The temporal join node is a [[FlinkLogicalJoin]] whose left input is a
[[FlinkLogicalRel]]
+ * and right input is a [[FlinkLogicalSnapshot]].
+ * It may be translated into [[StreamExecLookupJoin]] or
[[StreamExecLegacyTemporalJoin]] in
+ * physical phase.
+ */
+trait CommonTemporalTableJoinRule {
Review comment:
Similar comments like `LogicalCorrelateToJoinFromTemporalTableRule`
and create a `common` package which both logical rules and physical rules
can both use common utils
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
##########
@@ -74,23 +178,150 @@ abstract class
LogicalCorrelateToJoinFromTemporalTableRule(
val rel = builder.build()
call.transformTo(rel)
}
+}
+
+
+/**
+ * General temporal table join rule to rewrite the original Correlate into a
Join.
+ */
+abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule(
+ operand: RelOptRuleOperand,
+ description: String)
+ extends LogicalCorrelateToJoinFromTemporalTableRule(operand, description) {
+
+ protected def extractRightTimeInputRef(
+ leftInput: RelNode,
+ snapshot: LogicalSnapshot): Option[RexNode] = {
+ val rightFields = snapshot.getRowType.getFieldList.asScala
+ val timeAttributeFields = rightFields.filter(
+ f => f.getType.isInstanceOf[TimeIndicatorRelDataType])
+ val rexBuilder = snapshot.getCluster.getRexBuilder
+
+ if (timeAttributeFields != null && timeAttributeFields.length == 1) {
+ val leftFieldCnt = leftInput.getRowType.getFieldCount
+ val timeColIndex = leftFieldCnt +
rightFields.indexOf(timeAttributeFields.get(0))
+ val timeColDataType = timeAttributeFields.get(0).getType
+ Some(rexBuilder.makeInputRef(timeColDataType, timeColIndex))
+ } else {
+ None
+ }
+ }
+
+ protected def extractSnapshotTimeInputRef(
+ leftInput: RelNode,
+ snapshot: LogicalSnapshot): Option[RexInputRef] = {
+ val leftRowType = leftInput.getRowType
+ val leftFields = leftRowType.getFieldList
+ val periodField = snapshot.getPeriod.asInstanceOf[RexFieldAccess].getField
+ if (leftFields.contains(periodField)) {
+ val index = leftRowType.getFieldList.indexOf(periodField)
+ Some(RexInputRef.of(index, leftRowType))
+ } else {
+ None
+ }
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val correlate: LogicalCorrelate = call.rel(0)
+ val leftInput: RelNode = call.rel(1)
+ val filterCondition = getFilterCondition(call)
+ val snapshot = getLogicalSnapshot(call)
+
+ val leftRowType = leftInput.getRowType
+ val joinCondition = filterCondition.accept(new RexShuttle() {
+ // change correlate variable expression to normal RexInputRef (which is
from left side)
+ override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
+ fieldAccess.getReferenceExpr match {
+ case corVar: RexCorrelVariable =>
+ require(correlate.getCorrelationId.equals(corVar.id))
+ val index = leftRowType.getFieldList.indexOf(fieldAccess.getField)
+ RexInputRef.of(index, leftRowType)
+ case _ => super.visitFieldAccess(fieldAccess)
+ }
+ }
+ // update the field index from right side
+ override def visitInputRef(inputRef: RexInputRef): RexNode = {
+ val rightIndex = leftRowType.getFieldCount + inputRef.getIndex
+ new RexInputRef(rightIndex, inputRef.getType)
+ }
+ })
+
+ validateSnapshotInCorrelate(snapshot, correlate)
+
+ val (leftJoinKey, rightJoinKey) = {
+ val rexBuilder = correlate.getCluster.getRexBuilder
+ val relBuilder = call.builder()
+ relBuilder.push(leftInput)
+ relBuilder.push(snapshot)
+ val rewriteJoin = relBuilder.join(correlate.getJoinType,
joinCondition).build()
+ val joinInfo = rewriteJoin.asInstanceOf[LogicalJoin].analyzeCondition()
+ val leftJoinKey = joinInfo.leftKeys.map(i =>
rexBuilder.makeInputRef(leftInput, i))
+ val rightJoinKey = joinInfo.rightKeys.map(i => {
+ val leftFieldCnt = leftInput.getRowType.getFieldCount
+ val leftKeyType = snapshot.getRowType.getFieldList.get(i).getType
+ rexBuilder.makeInputRef(leftKeyType, leftFieldCnt + i)
+ })
+ (leftJoinKey, rightJoinKey)
+ }
+
+ val snapshotTimeInputRef = extractSnapshotTimeInputRef(leftInput, snapshot)
+ .getOrElse(throw new ValidationException("Temporal Table Join requires
time attribute in the " +
+ s"left table, but no row time attribute found."))
+
+ val rexBuilder = correlate.getCluster.getRexBuilder
+ val temporalCondition = if(isRowTimeTemporalTableJoin(snapshot)) {
+ val rightTimeInputRef = extractRightTimeInputRef(leftInput, snapshot)
+ if (rightTimeInputRef.isEmpty ||
!isRowtimeIndicatorType(rightTimeInputRef.get.getType)) {
+ throw new ValidationException("Event-Time Temporal Table Join
requires both" +
+ s" primary key and row time attribute in versioned table," +
Review comment:
do we really check `primary key` constraint ?
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -59,21 +64,21 @@ class StreamExecTemporalJoin(
with StreamPhysicalRel
with StreamExecNode[RowData] {
- override def requireWatermark: Boolean = {
- val nonEquiJoinRex = getJoinInfo.getRemaining(cluster.getRexBuilder)
-
- var rowtimeJoin: Boolean = false
- val visitor = new RexVisitorImpl[Unit](true) {
- override def visitCall(call: RexCall): Unit = {
- if (call.getOperator == TEMPORAL_JOIN_CONDITION) {
- rowtimeJoin = TemporalJoinUtil.isRowtimeCall(call)
- } else {
- call.getOperands.foreach(node => node.accept(this))
- }
+ def rightInputUniqueKeyContainsJoinKey(): Boolean = {
+ val right = getInput(1)
+ val rightUniqueKeys = getCluster.getMetadataQuery.getUniqueKeys(right)
+ if (rightUniqueKeys != null) {
+ val joinKeys = keyPairs.map(_.target).toArray
+ rightUniqueKeys.exists {
+ uniqueKey => joinKeys.forall(uniqueKey.toArray.contains(_))
Review comment:
joinKeys is unique iff joinKeys is the super-set of uniqueKey
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -275,27 +280,47 @@ object StreamExecTemporalJoinToCoProcessTranslator {
joinInfo: JoinInfo,
rexBuilder: RexBuilder): StreamExecTemporalJoinToCoProcessTranslator = {
- checkState(
- !joinInfo.isEqui,
- "Missing %s in join condition",
- TEMPORAL_JOIN_CONDITION)
val leftType = FlinkTypeFactory.toLogicalRowType(leftInput.getRowType)
val rightType = FlinkTypeFactory.toLogicalRowType(rightInput.getRowType)
- val nonEquiJoinRex: RexNode = joinInfo.getRemaining(rexBuilder)
+
val temporalJoinConditionExtractor = new TemporalJoinConditionExtractor(
textualRepresentation,
leftType.getFieldCount,
joinInfo,
rexBuilder)
+ val nonEquiJoinRex: RexNode = joinInfo.getRemaining(rexBuilder)
val remainingNonEquiJoinPredicates =
temporalJoinConditionExtractor.apply(nonEquiJoinRex)
- checkState(
- temporalJoinConditionExtractor.leftTimeAttribute.isDefined &&
- temporalJoinConditionExtractor.rightPrimaryKeyExpression.isDefined,
- "Missing %s in join condition",
- TEMPORAL_JOIN_CONDITION)
+ val (leftTimeAttributeInputRef, rightTimeAttributeInputRef) =
+ if (TemporalJoinUtil.isRowTimeJoin(rexBuilder, joinInfo)) {
+ checkState(
+ temporalJoinConditionExtractor.leftTimeAttribute.isDefined &&
+ temporalJoinConditionExtractor.rightPrimaryKeyExpression.isDefined,
+ "Missing %s in Event-Time temporal join condition",
+ TEMPORAL_JOIN_CONDITION)
+ (extractInputRef(
+ temporalJoinConditionExtractor.leftTimeAttribute.get,
+ textualRepresentation),
+ temporalJoinConditionExtractor.rightTimeAttribute.map(
+ rightTimeAttribute =>
+ extractInputRef(
+ rightTimeAttribute,
+ textualRepresentation
+ ) - leftType.getFieldCount))
+ } else {
+ val leftTimeAttributes = leftInput.getRowType.getFieldList
Review comment:
what if the left side has both row-time attribute and proc-time
attribute ?
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -339,75 +367,50 @@ object StreamExecTemporalJoinToCoProcessTranslator {
var rightTimeAttribute: Option[RexNode] = None
- var rightPrimaryKeyExpression: Option[RexNode] = None
+ var rightPrimaryKeyExpression: Option[Array[RexNode]] = None
override def visitCall(call: RexCall): RexNode = {
if (call.getOperator != TEMPORAL_JOIN_CONDITION) {
return super.visitCall(call)
}
- checkState(
- leftTimeAttribute.isEmpty
- && rightPrimaryKeyExpression.isEmpty
- && rightTimeAttribute.isEmpty,
- "Multiple %s functions in [%s]",
- TEMPORAL_JOIN_CONDITION,
- textualRepresentation)
-
- if (TemporalJoinUtil.isRowtimeCall(call)) {
+ if (TemporalJoinUtil.isRowTimeTemporalJoinConditionCall(call)) {
leftTimeAttribute = Some(call.getOperands.get(0))
rightTimeAttribute = Some(call.getOperands.get(1))
-
- rightPrimaryKeyExpression =
Some(validateRightPrimaryKey(call.getOperands.get(2)))
-
- if (!isRowtimeIndicatorType(rightTimeAttribute.get.getType)) {
- throw new ValidationException(
- s"Non rowtime timeAttribute [${rightTimeAttribute.get.getType}] " +
- s"used to create TemporalTableFunction")
- }
- if (!isRowtimeIndicatorType(leftTimeAttribute.get.getType)) {
- throw new ValidationException(
- s"Non rowtime timeAttribute [${leftTimeAttribute.get.getType}] " +
- s"passed as the argument to TemporalTableFunction")
- }
- }
- else if (TemporalJoinUtil.isProctimeCall(call)) {
+ rightPrimaryKeyExpression =
Some(validateRightPrimaryKey(call.getOperands.get(4)))
+ } else {
leftTimeAttribute = Some(call.getOperands.get(0))
- rightPrimaryKeyExpression =
Some(validateRightPrimaryKey(call.getOperands.get(1)))
-
- if (!isProctimeIndicatorType(leftTimeAttribute.get.getType)) {
- throw new ValidationException(
- s"Non processing timeAttribute [${leftTimeAttribute.get.getType}]
" +
- s"passed as the argument to TemporalTableFunction")
- }
- }
- else {
- throw new IllegalStateException(
- s"Unsupported invocation $call in [$textualRepresentation]")
}
rexBuilder.makeLiteral(true)
}
- private def validateRightPrimaryKey(rightPrimaryKey: RexNode): RexNode = {
- if (joinInfo.rightKeys.size() != 1) {
+ private def validateRightPrimaryKey(rightPrimaryKey: RexNode):
Array[RexNode] = {
Review comment:
use `match`
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
##########
@@ -74,23 +178,150 @@ abstract class
LogicalCorrelateToJoinFromTemporalTableRule(
val rel = builder.build()
call.transformTo(rel)
}
+}
+
+
+/**
+ * General temporal table join rule to rewrite the original Correlate into a
Join.
+ */
+abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule(
+ operand: RelOptRuleOperand,
+ description: String)
+ extends LogicalCorrelateToJoinFromTemporalTableRule(operand, description) {
+
+ protected def extractRightTimeInputRef(
+ leftInput: RelNode,
+ snapshot: LogicalSnapshot): Option[RexNode] = {
+ val rightFields = snapshot.getRowType.getFieldList.asScala
+ val timeAttributeFields = rightFields.filter(
+ f => f.getType.isInstanceOf[TimeIndicatorRelDataType])
+ val rexBuilder = snapshot.getCluster.getRexBuilder
+
+ if (timeAttributeFields != null && timeAttributeFields.length == 1) {
+ val leftFieldCnt = leftInput.getRowType.getFieldCount
+ val timeColIndex = leftFieldCnt +
rightFields.indexOf(timeAttributeFields.get(0))
+ val timeColDataType = timeAttributeFields.get(0).getType
+ Some(rexBuilder.makeInputRef(timeColDataType, timeColIndex))
+ } else {
+ None
+ }
+ }
+
+ protected def extractSnapshotTimeInputRef(
+ leftInput: RelNode,
+ snapshot: LogicalSnapshot): Option[RexInputRef] = {
+ val leftRowType = leftInput.getRowType
+ val leftFields = leftRowType.getFieldList
+ val periodField = snapshot.getPeriod.asInstanceOf[RexFieldAccess].getField
+ if (leftFields.contains(periodField)) {
+ val index = leftRowType.getFieldList.indexOf(periodField)
+ Some(RexInputRef.of(index, leftRowType))
+ } else {
+ None
+ }
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val correlate: LogicalCorrelate = call.rel(0)
+ val leftInput: RelNode = call.rel(1)
+ val filterCondition = getFilterCondition(call)
+ val snapshot = getLogicalSnapshot(call)
+
+ val leftRowType = leftInput.getRowType
+ val joinCondition = filterCondition.accept(new RexShuttle() {
+ // change correlate variable expression to normal RexInputRef (which is
from left side)
+ override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
+ fieldAccess.getReferenceExpr match {
+ case corVar: RexCorrelVariable =>
+ require(correlate.getCorrelationId.equals(corVar.id))
+ val index = leftRowType.getFieldList.indexOf(fieldAccess.getField)
+ RexInputRef.of(index, leftRowType)
+ case _ => super.visitFieldAccess(fieldAccess)
+ }
+ }
+ // update the field index from right side
+ override def visitInputRef(inputRef: RexInputRef): RexNode = {
+ val rightIndex = leftRowType.getFieldCount + inputRef.getIndex
+ new RexInputRef(rightIndex, inputRef.getType)
+ }
+ })
+
+ validateSnapshotInCorrelate(snapshot, correlate)
+
+ val (leftJoinKey, rightJoinKey) = {
+ val rexBuilder = correlate.getCluster.getRexBuilder
+ val relBuilder = call.builder()
+ relBuilder.push(leftInput)
+ relBuilder.push(snapshot)
+ val rewriteJoin = relBuilder.join(correlate.getJoinType,
joinCondition).build()
+ val joinInfo = rewriteJoin.asInstanceOf[LogicalJoin].analyzeCondition()
+ val leftJoinKey = joinInfo.leftKeys.map(i =>
rexBuilder.makeInputRef(leftInput, i))
+ val rightJoinKey = joinInfo.rightKeys.map(i => {
+ val leftFieldCnt = leftInput.getRowType.getFieldCount
+ val leftKeyType = snapshot.getRowType.getFieldList.get(i).getType
+ rexBuilder.makeInputRef(leftKeyType, leftFieldCnt + i)
+ })
+ (leftJoinKey, rightJoinKey)
+ }
+
+ val snapshotTimeInputRef = extractSnapshotTimeInputRef(leftInput, snapshot)
+ .getOrElse(throw new ValidationException("Temporal Table Join requires
time attribute in the " +
+ s"left table, but no row time attribute found."))
+
+ val rexBuilder = correlate.getCluster.getRexBuilder
+ val temporalCondition = if(isRowTimeTemporalTableJoin(snapshot)) {
+ val rightTimeInputRef = extractRightTimeInputRef(leftInput, snapshot)
+ if (rightTimeInputRef.isEmpty ||
!isRowtimeIndicatorType(rightTimeInputRef.get.getType)) {
+ throw new ValidationException("Event-Time Temporal Table Join
requires both" +
+ s" primary key and row time attribute in versioned table," +
+ s" but no row time attribute found.")
Review comment:
can be found
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -18,19 +18,27 @@
package org.apache.flink.table.planner.plan.nodes.physical.stream
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Join, JoinInfo, JoinRelType}
+import org.apache.calcite.rex._
Review comment:
ditto
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
##########
@@ -553,6 +570,20 @@ class FlinkRelMdUniqueKeys private extends
MetadataHandler[BuiltInMetadata.Uniqu
}
}
+ def getUniqueKeys(
+ subset: HepRelVertex,
+ mq: RelMetadataQuery,
+ ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
+ mq.getUniqueKeys(subset.getCurrentRel, ignoreNulls)
+ }
+
+ def getUniqueKeys(
+ subset: WatermarkAssigner,
+ mq: RelMetadataQuery,
+ ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
+ mq.getUniqueKeys(subset.getInput, ignoreNulls)
+ }
Review comment:
ditto
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/LegacyTemporalRowTimeJoinOperator.java
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.temporal;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.JoinedRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.JoinCondition;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This operator works by keeping on the state collection of probe and build
records to process
+ * on next watermark. The idea is that between watermarks we are collecting
those elements
+ * and once we are sure that there will be no updates we emit the correct
result and clean up the
+ * state.
+ *
+ * <p>Cleaning up the state drops all of the "old" values from the probe side,
where "old" is defined
+ * as older then the current watermark. Build side is also cleaned up in the
similar fashion,
+ * however we always keep at least one record - the latest one - even if it's
past the last
+ * watermark.
+ *
+ * <p>One more trick is how the emitting results and cleaning up is triggered.
It is achieved
+ * by registering timers for the keys. We could register a timer for every
probe and build
+ * side element's event time (when watermark exceeds this timer, that's when
we are emitting and/or
+ * cleaning up the state). However this would cause huge number of registered
timers. For example
+ * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if
we
+ * had received Watermark(10), it would trigger 5 separate timers for the same
key. To avoid that
+ * we always keep only one single registered timer for any given key,
registered for the minimal
+ * value. Upon triggering it, we process all records with event times older
then or equal to
+ * currentWatermark.
+ */
+public class LegacyTemporalRowTimeJoinOperator
Review comment:
ditto
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -327,6 +345,16 @@ object StreamExecTemporalJoinToCoProcessTranslator {
inputReferenceVisitor.getFields.head
}
+ private def extractInputRefs(rexNode: RexNode, textualRepresentation:
String): Array[Int] = {
+ val inputReferenceVisitor = new InputRefVisitor
+ rexNode.accept(inputReferenceVisitor)
+ checkState(
+ inputReferenceVisitor.getFields.length == 1,
+ "Failed to find input reference in [%s]",
+ textualRepresentation)
+ inputReferenceVisitor.getFields
Review comment:
please use `RelOptUtil.InputFinder.bits(rexNode)`
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala
##########
@@ -132,7 +127,7 @@ abstract class BaseSnapshotOnTableScanRule(description:
String)
val join = call.rel[FlinkLogicalJoin](0)
val snapshot = call.rel[FlinkLogicalSnapshot](2)
val tableScan = call.rel[TableScan](3)
- matches(join, snapshot, tableScan)
+ matches(join, snapshot, tableScan)
Review comment:
revert this change
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/LegacyTemporalProcessTimeJoinOperator.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.temporal;
+
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.JoinedRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.JoinCondition;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+/**
+ * The operator to temporal join a stream on processing time.
+ */
+public class LegacyTemporalProcessTimeJoinOperator
Review comment:
`LegacyTemporalProcessTimeJoinOperator` and
`TemporalProcessTimeJoinOperator` is almost same, can we reuse them ?
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -504,6 +512,39 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
None
}
+ case temporalJoin: StreamExecTemporalJoin =>
+ val left = temporalJoin.getLeft.asInstanceOf[StreamPhysicalRel]
+ val right = temporalJoin.getRight.asInstanceOf[StreamPhysicalRel]
+
+ // the left input required trait depends on it's parent in temporal
join
+ // the left input will send message to parent
+ val requiredUpdateBeforeByParent = requiredTrait.updateKind ==
UpdateKind.ONLY_UPDATE_AFTER
Review comment:
BEFORE_AND_AFTER ?
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
##########
@@ -42,11 +41,11 @@ import org.apache.calcite.rex.{RexCall, RexInputRef,
RexNode}
import org.apache.calcite.sql.SqlKind
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.util.{Bug, BuiltInMethod, ImmutableBitSet, Util}
-
import com.google.common.collect.ImmutableSet
-
import java.util
+import org.apache.calcite.plan.hep.HepRelVertex
Review comment:
reorder import
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
##########
@@ -244,7 +243,25 @@ class FlinkRelMdUniqueKeys private extends
MetadataHandler[BuiltInMetadata.Uniqu
ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
val inputUniqueKeys = mq.getUniqueKeys(rel.getInput, ignoreNulls)
val rankFunColumnIndex =
RankUtil.getRankNumberColumnIndex(rel).getOrElse(-1)
- if (rankFunColumnIndex < 0) {
+ //TODO current deduplicate on row time is still a Rank,
+ // remove this after support deduplicate on row time
+ val canConvertToDeduplicate: Boolean = {
+ val rankRange = rel.rankRange
+ val isRowNumberType = rel.rankType == RankType.ROW_NUMBER
+ val isLimit1 = rankRange match {
+ case rankRange: ConstantRankRange =>
+ rankRange.getRankStart() == 1 && rankRange.getRankEnd() == 1
+ case _ => false
+ }
+ !rel.outputRankNumber && isRowNumberType && isLimit1
+ }
+
+ if (canConvertToDeduplicate) {
+ val retSet = new JHashSet[ImmutableBitSet]
+ retSet.add(rel.partitionKey)
+ retSet
+ }
Review comment:
please add related tests in FlinkRelMdUniqueKeysTest
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
##########
@@ -17,75 +17,152 @@
*/
package org.apache.flink.table.planner.plan.utils
-import org.apache.flink.util.Preconditions.checkArgument
-
+import org.apache.calcite.rel.core.JoinInfo
import org.apache.calcite.rex._
import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes}
import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
+import scala.collection.JavaConversions._
+
/**
- * Utilities for temporal table join
+ * Utilities for temporal table join.
*/
object TemporalJoinUtil {
//
----------------------------------------------------------------------------------------
- // Temporal TableFunction Join Utilities
+ // Temporal Join Condition Utilities
//
----------------------------------------------------------------------------------------
/**
- * [[TEMPORAL_JOIN_CONDITION]] is a specific condition which correctly
defines
+ * [[TEMPORAL_JOIN_CONDITION]] is a specific join condition which correctly
defines
* references to rightTimeAttribute, rightPrimaryKeyExpression and
leftTimeAttribute.
- * The condition is used to mark this is a temporal tablefunction join.
- * Later rightTimeAttribute, rightPrimaryKeyExpression and
leftTimeAttribute will be
- * extracted from the condition.
+ * The condition is used to mark this is a temporal table join and ensure
columns these
+ * expressions depends on will not be pruned. The join key pair is
necessary to ensure the
+ * the condition will not push down.
+ *
+ * The rightTimeAttribute, rightPrimaryKeyExpression and leftTimeAttribute
will be
+ * extracted from the condition in physical phase.
*/
val TEMPORAL_JOIN_CONDITION = new SqlFunction(
"__TEMPORAL_JOIN_CONDITION",
SqlKind.OTHER_FUNCTION,
ReturnTypes.BOOLEAN_NOT_NULL,
null,
OperandTypes.or(
+ // right time attribute and primary key are required in event-time
temporal table join,
+ OperandTypes.sequence(
+ "'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, LEFT_KEY, RIGHT_KEY,
PRIMARY_KEY)'",
+ OperandTypes.DATETIME,
+ OperandTypes.DATETIME,
+ OperandTypes.ANY,
+ OperandTypes.ANY,
+ OperandTypes.ANY),
+ // the primary key may inferred later in event-time temporal table join,
OperandTypes.sequence(
- "'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+ "'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, LEFT_KEY, RIGHT_KEY)'",
OperandTypes.DATETIME,
OperandTypes.DATETIME,
+ OperandTypes.ANY,
OperandTypes.ANY),
+ // Only left time attribute is required for processing-time temporal
table join,
+ // primary key is optional
OperandTypes.sequence(
- "'(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+ "'(LEFT_TIME_ATTRIBUTE, LEFT_KEY, RIGHT_KEY)'",
OperandTypes.DATETIME,
+ OperandTypes.ANY,
+ OperandTypes.ANY,
OperandTypes.ANY)),
SqlFunctionCategory.SYSTEM)
- def isRowtimeCall(call: RexCall): Boolean = {
- checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
- call.getOperands.size() == 3
- }
+ val TEMPORAL_JOIN_LEFT_KEY = new SqlFunction(
+ "__TEMPORAL_JOIN_LEFT_KEY",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.BOOLEAN_NOT_NULL,
+ null,
+ OperandTypes.ARRAY,
+ SqlFunctionCategory.SYSTEM)
+
+ val TEMPORAL_JOIN_RIGHT_KEY = new SqlFunction(
+ "TEMPORAL_JOIN_RIGHT_KEY",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.BOOLEAN_NOT_NULL,
+ null,
+ OperandTypes.ARRAY,
+ SqlFunctionCategory.SYSTEM)
+
+ val TEMPORAL_JOIN_CONDITION_PRIMARY_KEY = new SqlFunction(
+ "__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.BOOLEAN_NOT_NULL,
+ null,
+ OperandTypes.ARRAY,
+ SqlFunctionCategory.SYSTEM)
- def isProctimeCall(call: RexCall): Boolean = {
- checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
- call.getOperands.size() == 2
- }
def makeRowTimeTemporalJoinConditionCall(
rexBuilder: RexBuilder,
leftTimeAttribute: RexNode,
rightTimeAttribute: RexNode,
- rightPrimaryKeyExpression: RexNode): RexNode = {
+ leftJoinKeyExpression: Seq[RexNode],
+ rightJoinKeyExpression: Seq[RexNode],
+ rightPrimaryKeyExpression: Seq[RexNode]): RexNode = {
rexBuilder.makeCall(
TEMPORAL_JOIN_CONDITION,
leftTimeAttribute,
rightTimeAttribute,
+ makeLeftJoinKeyCall(rexBuilder, leftJoinKeyExpression),
+ makeRightJoinKeyCall(rexBuilder, rightJoinKeyExpression),
+ makePrimaryKeyCall(rexBuilder, rightPrimaryKeyExpression))
+ }
+
+ def makeRowTimeTemporalJoinConditionCall(
+ rexBuilder: RexBuilder,
+ leftTimeAttribute: RexNode,
+ rightTimeAttribute: RexNode,
+ leftJoinKeyExpression: Seq[RexNode],
+ rightJoinKeyExpression: Seq[RexNode]): RexNode = {
+ rexBuilder.makeCall(
+ TEMPORAL_JOIN_CONDITION,
+ leftTimeAttribute,
+ rightTimeAttribute,
+ makeLeftJoinKeyCall(rexBuilder, leftJoinKeyExpression),
+ makeRightJoinKeyCall(rexBuilder, rightJoinKeyExpression))
+ }
+
+ private def makePrimaryKeyCall(
+ rexBuilder: RexBuilder,
+ rightPrimaryKeyExpression: Seq[RexNode]): RexNode = {
+ rexBuilder.makeCall(
+ TEMPORAL_JOIN_CONDITION_PRIMARY_KEY,
rightPrimaryKeyExpression)
}
+ private def makeLeftJoinKeyCall(
+ rexBuilder: RexBuilder,
+ keyExpression: Seq[RexNode]): RexNode = {
+ rexBuilder.makeCall(
+ TEMPORAL_JOIN_LEFT_KEY,
+ keyExpression)
+ }
+
+ private def makeRightJoinKeyCall(
+ rexBuilder: RexBuilder,
+ keyExpression: Seq[RexNode]): RexNode = {
+ rexBuilder.makeCall(
+ TEMPORAL_JOIN_RIGHT_KEY,
+ keyExpression)
+ }
+
def makeProcTimeTemporalJoinConditionCall(
rexBuilder: RexBuilder,
leftTimeAttribute: RexNode,
- rightPrimaryKeyExpression: RexNode): RexNode = {
+ leftJoinKeyExpression: Seq[RexNode],
+ rightJoinKeyExpression: Seq[RexNode]): RexNode = {
Review comment:
indent
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
##########
@@ -103,4 +180,25 @@ object TemporalJoinUtil {
hasTemporalJoinCondition
}
+ def isRowTimeJoin(rexBuilder: RexBuilder, joinInfo: JoinInfo): Boolean = {
+ val nonEquiJoinRex = joinInfo.getRemaining(rexBuilder)
+
+ var rowtimeJoin: Boolean = false
+ val visitor = new RexVisitorImpl[Unit](true) {
+ override def visitCall(call: RexCall): Unit = {
+ if (isRowTimeTemporalJoinConditionCall(call)) {
+ rowtimeJoin = true
+ } else {
+ call.getOperands.foreach(node => node.accept(this))
Review comment:
super.visitCall(call)
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexBuilder, RexCall, RexNode, RexShuttle}
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalJoin,
FlinkLogicalRel, FlinkLogicalSnapshot}
+import
org.apache.flink.table.planner.plan.rules.physical.common.CommonTemporalTableJoinRule
+import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType
+import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
+
+import scala.collection.JavaConversions._
+
+/**
+ * Planner rule that rewrites temporal join with extracted primary key,
Event-time temporal
+ * table join requires primary key and row time attribute of versioned table.
The versioned table
+ * could be a table source or a view only if it contains the unique key and
time attribute.
+ *
+ * <p> Flink support extract the primary key and row time attribute from the
view if the view comes
+ * from [[LogicalRank]] node which can convert to a [[Deduplicate]] node.
+ */
+class TemporalJoinRewriteWithUniqueKeyRule extends RelOptRule(
+ operand(classOf[FlinkLogicalJoin],
+ operand(classOf[FlinkLogicalRel], any()),
+ operand(classOf[FlinkLogicalSnapshot],
+ operand(classOf[FlinkLogicalRel], any()))),
+ "TemporalJoinRewriteWithUniqueKeyRule")
+ with CommonTemporalTableJoinRule {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val join = call.rel[FlinkLogicalJoin](0)
+ val snapshot = call.rel[FlinkLogicalSnapshot](2)
+ val snapshotInput = call.rel[FlinkLogicalRel](3)
+
+ val isTemporalJoin = matches(snapshot)
+ val canConvertToLookup = canConvertToLookupJoin(snapshot, snapshotInput)
+ val supportedJoinTypes = Seq(JoinRelType.INNER)
+
+ isTemporalJoin && !canConvertToLookup &&
supportedJoinTypes.contains(join.getJoinType)
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val join = call.rel[FlinkLogicalJoin](0)
+ val leftInput = call.rel[FlinkLogicalRel](1)
+ val snapshot = call.rel[FlinkLogicalSnapshot](2)
+
+ val joinCondition = join.getCondition
+
+ val newJoinCondition = joinCondition.accept(new RexShuttle {
+ override def visitCall(call: RexCall): RexNode = {
+ if (call.getOperator == TemporalJoinUtil.TEMPORAL_JOIN_CONDITION &&
+ isRowTimeTemporalTableJoin(snapshot)) {
+ val snapshotTimeInputRef = call.operands(0)
+ val rightTimeInputRef = call.operands(1)
+ val leftJoinKey = call.operands(2).asInstanceOf[RexCall].operands
+ val rightJoinKey = call.operands(3).asInstanceOf[RexCall].operands
+
+ val rexBuilder = join.getCluster.getRexBuilder
+ val primaryKeyInputRefs = extractPrimaryKeyInputRefs(leftInput,
snapshot, rexBuilder)
+ if (primaryKeyInputRefs.isEmpty) {
+ throw new ValidationException("Event-Time Temporal Table Join
requires both" +
+ s" primary key and row time attribute in versioned table," +
+ s" but no primary key found.")
+ }
+ TemporalJoinUtil.makeRowTimeTemporalJoinConditionCall(rexBuilder,
snapshotTimeInputRef,
+ rightTimeInputRef, leftJoinKey, rightJoinKey,
primaryKeyInputRefs.get)
+ }
+ else {
+ super.visitCall(call)
+ }
+ }
+ })
+ val rewriteJoin = FlinkLogicalJoin.create(
+ leftInput, snapshot, newJoinCondition, join.getJoinType)
+ call.transformTo(rewriteJoin)
+ }
+
+ private def extractPrimaryKeyInputRefs(
+ leftInput: RelNode,
+ snapshot: FlinkLogicalSnapshot,
+ rexBuilder: RexBuilder): Option[Seq[RexNode]] = {
+ val rightFields = snapshot.getRowType.getFieldList
+ val fmq =
FlinkRelMetadataQuery.reuseOrCreate(snapshot.getCluster.getMetadataQuery)
+
+ val uniqueKeys = fmq.getUniqueKeys(snapshot.getInput())
+ val fields = snapshot.getRowType.getFieldList
+
+ if (uniqueKeys != null && uniqueKeys.size() > 0) {
+ val leftFieldCnt = leftInput.getRowType.getFieldCount
+ uniqueKeys
+ .filter(_.nonEmpty)
+ .map(_.toArray
+ .map(fields)
+ .map(f => rexBuilder.makeInputRef(
+ f.getType,
+ leftFieldCnt + rightFields.indexOf(f)))
+ .toSeq)
+ .toArray
+ .sortBy(_.length)
+ .headOption
Review comment:
it's hard to read, it's better to add some comments and split them into
a few steps
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala
##########
@@ -17,129 +17,256 @@
*/
package org.apache.flink.table.planner.plan.stream.sql.join
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api._
+import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.planner.utils.{StreamTableTestUtil,
TableTestBase}
-
-import org.hamcrest.Matchers.containsString
+import org.junit.Assert.{assertTrue, fail}
import org.junit.Test
-import java.sql.Timestamp
-
+/**
+ * Test temporal join in stream mode.
+ */
class TemporalJoinTest extends TableTestBase {
val util: StreamTableTestUtil = streamTestUtil()
- private val orders = util.addDataStream[(Long, String)](
- "Orders", 'o_amount, 'o_currency, 'o_rowtime.rowtime)
+ util.addTable(
Review comment:
ditto
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
##########
@@ -17,75 +17,152 @@
*/
package org.apache.flink.table.planner.plan.utils
-import org.apache.flink.util.Preconditions.checkArgument
-
+import org.apache.calcite.rel.core.JoinInfo
import org.apache.calcite.rex._
import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes}
import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
+import scala.collection.JavaConversions._
+
/**
- * Utilities for temporal table join
+ * Utilities for temporal table join.
*/
object TemporalJoinUtil {
//
----------------------------------------------------------------------------------------
- // Temporal TableFunction Join Utilities
+ // Temporal Join Condition Utilities
//
----------------------------------------------------------------------------------------
/**
- * [[TEMPORAL_JOIN_CONDITION]] is a specific condition which correctly
defines
+ * [[TEMPORAL_JOIN_CONDITION]] is a specific join condition which correctly
defines
* references to rightTimeAttribute, rightPrimaryKeyExpression and
leftTimeAttribute.
- * The condition is used to mark this is a temporal tablefunction join.
- * Later rightTimeAttribute, rightPrimaryKeyExpression and
leftTimeAttribute will be
- * extracted from the condition.
+ * The condition is used to mark this is a temporal table join and ensure
columns these
+ * expressions depends on will not be pruned. The join key pair is
necessary to ensure the
+ * the condition will not push down.
+ *
+ * The rightTimeAttribute, rightPrimaryKeyExpression and leftTimeAttribute
will be
+ * extracted from the condition in physical phase.
*/
val TEMPORAL_JOIN_CONDITION = new SqlFunction(
"__TEMPORAL_JOIN_CONDITION",
SqlKind.OTHER_FUNCTION,
ReturnTypes.BOOLEAN_NOT_NULL,
null,
OperandTypes.or(
+ // right time attribute and primary key are required in event-time
temporal table join,
+ OperandTypes.sequence(
+ "'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, LEFT_KEY, RIGHT_KEY,
PRIMARY_KEY)'",
+ OperandTypes.DATETIME,
+ OperandTypes.DATETIME,
+ OperandTypes.ANY,
+ OperandTypes.ANY,
+ OperandTypes.ANY),
+ // the primary key may inferred later in event-time temporal table join,
OperandTypes.sequence(
- "'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+ "'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, LEFT_KEY, RIGHT_KEY)'",
OperandTypes.DATETIME,
OperandTypes.DATETIME,
+ OperandTypes.ANY,
OperandTypes.ANY),
+ // Only left time attribute is required for processing-time temporal
table join,
+ // primary key is optional
OperandTypes.sequence(
- "'(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+ "'(LEFT_TIME_ATTRIBUTE, LEFT_KEY, RIGHT_KEY)'",
OperandTypes.DATETIME,
+ OperandTypes.ANY,
+ OperandTypes.ANY,
OperandTypes.ANY)),
SqlFunctionCategory.SYSTEM)
- def isRowtimeCall(call: RexCall): Boolean = {
- checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
- call.getOperands.size() == 3
- }
+ val TEMPORAL_JOIN_LEFT_KEY = new SqlFunction(
+ "__TEMPORAL_JOIN_LEFT_KEY",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.BOOLEAN_NOT_NULL,
+ null,
+ OperandTypes.ARRAY,
+ SqlFunctionCategory.SYSTEM)
+
+ val TEMPORAL_JOIN_RIGHT_KEY = new SqlFunction(
+ "TEMPORAL_JOIN_RIGHT_KEY",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.BOOLEAN_NOT_NULL,
+ null,
+ OperandTypes.ARRAY,
+ SqlFunctionCategory.SYSTEM)
+
+ val TEMPORAL_JOIN_CONDITION_PRIMARY_KEY = new SqlFunction(
+ "__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.BOOLEAN_NOT_NULL,
+ null,
+ OperandTypes.ARRAY,
+ SqlFunctionCategory.SYSTEM)
- def isProctimeCall(call: RexCall): Boolean = {
- checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
- call.getOperands.size() == 2
- }
def makeRowTimeTemporalJoinConditionCall(
rexBuilder: RexBuilder,
leftTimeAttribute: RexNode,
rightTimeAttribute: RexNode,
- rightPrimaryKeyExpression: RexNode): RexNode = {
+ leftJoinKeyExpression: Seq[RexNode],
+ rightJoinKeyExpression: Seq[RexNode],
+ rightPrimaryKeyExpression: Seq[RexNode]): RexNode = {
Review comment:
indent
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/TemporalJoinTest.scala
##########
@@ -17,95 +17,136 @@
*/
package org.apache.flink.table.planner.plan.batch.sql.join
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api._
+import org.apache.flink.table.api.TableException
import org.apache.flink.table.planner.utils.{BatchTableTestUtil, TableTestBase}
-
-import org.hamcrest.Matchers.containsString
import org.junit.Test
-import java.sql.Timestamp
-
+/**
+ * Test temporal join in batch mode.
+ *
+ * <p> Flink only supports lookup join in batch mode, the others Temporal join
is not supported yet.
+ */
class TemporalJoinTest extends TableTestBase {
val util: BatchTableTestUtil = batchTestUtil()
- val orders = util.addDataStream[(Long, String, Timestamp)](
- "Orders", 'o_amount, 'o_currency, 'o_rowtime)
+ util.addTable(
+ """
+ |CREATE TABLE Orders (
+ | o_amount INT,
+ | o_currency STRING,
+ | o_rowtime TIMESTAMP(3),
+ | o_proctime as PROCTIME(),
+ | WATERMARK FOR o_rowtime AS o_rowtime
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'true'
+ |)
+ """.stripMargin)
+ util.addTable(
+ """
+ |CREATE TABLE RatesHistory (
+ | currency STRING,
+ | rate INT,
+ | rowtime TIMESTAMP(3),
+ | WATERMARK FOR rowtime AS rowtime
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'true'
+ |)
+ """.stripMargin)
+
+ util.addTable(
+ """
+ |CREATE TABLE RatesHistoryWithPK (
+ | currency STRING,
+ | rate INT,
+ | rowtime TIMESTAMP(3),
+ | WATERMARK FOR rowtime AS rowtime,
+ | PRIMARY KEY(currency) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'true'
+ |)
+ """.stripMargin)
+
+ util.addTable(
+ """
+ |CREATE TABLE RatesOnly (
+ | currency STRING,
+ | rate INT,
+ | proctime AS PROCTIME()
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'true'
+ |)
+ """.stripMargin)
+
+ util.addTable(
+ " CREATE VIEW DeduplicatedView as SELECT currency, rate, rowtime FROM " +
+ " (SELECT *, " +
+ " ROW_NUMBER() OVER (PARTITION BY currency ORDER BY rowtime
DESC) AS rowNum " +
+ " FROM RatesHistory" +
+ " ) T " +
+ " WHERE rowNum = 1")
+
+ util.addTable(
+ " CREATE VIEW latestView as SELECT currency, rate, proctime FROM " +
+ " (SELECT *, " +
+ " ROW_NUMBER() OVER (PARTITION BY currency ORDER BY proctime
DESC) AS rowNum " +
+ " FROM RatesOnly" +
+ " ) T" +
+ " WHERE rowNum = 1")
+
+ util.addTable("CREATE VIEW latest_rates AS SELECT currency, LAST_VALUE(rate)
AS rate " +
+ "FROM RatesHistory " +
+ "GROUP BY currency ")
Review comment:
move them into `before` method
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala
##########
@@ -17,129 +17,256 @@
*/
package org.apache.flink.table.planner.plan.stream.sql.join
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api._
+import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.planner.utils.{StreamTableTestUtil,
TableTestBase}
-
-import org.hamcrest.Matchers.containsString
+import org.junit.Assert.{assertTrue, fail}
import org.junit.Test
-import java.sql.Timestamp
-
+/**
+ * Test temporal join in stream mode.
+ */
class TemporalJoinTest extends TableTestBase {
val util: StreamTableTestUtil = streamTestUtil()
- private val orders = util.addDataStream[(Long, String)](
- "Orders", 'o_amount, 'o_currency, 'o_rowtime.rowtime)
+ util.addTable(
+ """
+ |CREATE TABLE Orders (
+ | o_amount INT,
+ | o_currency STRING,
+ | o_rowtime TIMESTAMP(3),
+ | o_proctime as PROCTIME(),
+ | WATERMARK FOR o_rowtime AS o_rowtime
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin)
+ util.addTable(
+ """
+ |CREATE TABLE RatesHistory (
+ | currency STRING,
+ | rate INT,
+ | rowtime TIMESTAMP(3),
+ | WATERMARK FOR rowtime AS rowtime
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin)
- private val ratesHistory = util.addDataStream[(String, Int, Timestamp)](
- "RatesHistory", 'currency, 'rate, 'rowtime.rowtime)
+ util.addTable(
+ """
+ |CREATE TABLE RatesHistoryWithPK (
+ | currency STRING,
+ | rate INT,
+ | rowtime TIMESTAMP(3),
+ | WATERMARK FOR rowtime AS rowtime,
+ | PRIMARY KEY(currency) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin)
- util.addFunction(
- "Rates",
- ratesHistory.createTemporalTableFunction($"rowtime", $"currency"))
+ util.addTable(
+ """
+ |CREATE TABLE RatesOnly (
+ | currency STRING,
+ | rate INT,
+ | proctime AS PROCTIME()
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin)
- private val proctimeOrders = util.addDataStream[(Long, String)](
- "ProctimeOrders", 'o_amount, 'o_currency, 'o_proctime.proctime)
+ util.addTable(
+ " CREATE VIEW DeduplicatedView as SELECT currency, rate, rowtime FROM " +
+ " (SELECT *, " +
+ " ROW_NUMBER() OVER (PARTITION BY currency ORDER BY rowtime
DESC) AS rowNum " +
+ " FROM RatesHistory" +
+ " ) T " +
+ " WHERE rowNum = 1")
- private val proctimeRatesHistory = util.addDataStream[(String, Int)](
- "ProctimeRatesHistory", 'currency, 'rate, 'proctime.proctime)
+ util.addTable(
+ " CREATE VIEW latestView as SELECT T.currency, T.rate, T.proctime FROM " +
+ " (SELECT *, " +
+ " ROW_NUMBER() OVER (PARTITION BY currency ORDER BY proctime
DESC) AS rowNum " +
+ " FROM RatesOnly" +
+ " ) T " +
+ " WHERE T.rowNum = 1")
- util.addFunction(
- "ProctimeRates",
- proctimeRatesHistory.createTemporalTableFunction($"proctime", $"currency"))
+ util.addTable("CREATE VIEW latest_rates AS SELECT currency, LAST_VALUE(rate)
AS rate " +
+ "FROM RatesHistory " +
+ "GROUP BY currency ")
@Test
- def testSimpleJoin(): Unit = {
+ def testEventTimeTemporalJoin(): Unit = {
val sqlQuery = "SELECT " +
"o_amount * rate as rate " +
- "FROM Orders AS o, " +
- "LATERAL TABLE (Rates(o.o_rowtime)) AS r " +
- "WHERE currency = o_currency"
+ "FROM Orders AS o JOIN " +
+ "RatesHistoryWithPK FOR SYSTEM_TIME AS OF o.o_rowtime as r " +
+ "on o.o_currency = r.currency"
util.verifyPlan(sqlQuery)
}
@Test
- def testSimpleProctimeJoin(): Unit = {
+ def testEventTimeTemporalJoinWithView(): Unit = {
val sqlQuery = "SELECT " +
"o_amount * rate as rate " +
- "FROM ProctimeOrders AS o, " +
- "LATERAL TABLE (ProctimeRates(o.o_proctime)) AS r " +
- "WHERE currency = o_currency"
+ "FROM Orders AS o JOIN " +
+ "DeduplicatedView " +
+ "FOR SYSTEM_TIME AS OF o.o_rowtime as r1 " +
+ "on o.o_currency = r1.currency"
util.verifyPlan(sqlQuery)
}
@Test
- def testJoinOnQueryLeft(): Unit = {
- val orders = util.tableEnv.sqlQuery("SELECT * FROM Orders WHERE o_amount >
1000")
- util.tableEnv.createTemporaryView("Orders2", orders)
-
+ def testProcTimeTemporalJoin(): Unit = {
val sqlQuery = "SELECT " +
"o_amount * rate as rate " +
- "FROM Orders2 AS o, " +
- "LATERAL TABLE (Rates(o.o_rowtime)) AS r " +
- "WHERE currency = o_currency"
+ "FROM Orders AS o JOIN " +
+ "latestView " +
+ "FOR SYSTEM_TIME AS OF o.o_proctime as r1 " +
+ "on o.o_currency = r1.currency"
util.verifyPlan(sqlQuery)
}
- /**
- * Test versioned joins with more complicated query.
- * Important thing here is that we have complex OR join condition
- * and there are some columns that are not being used (are being pruned).
- */
@Test
- def testComplexJoin(): Unit = {
- val util = streamTestUtil()
- util.addDataStream[(String, Int)]("Table3", 't3_comment, 't3_secondary_key)
- util.addDataStream[(Timestamp, String, Long, String, Int)](
- "Orders", 'o_rowtime.rowtime, 'o_comment, 'o_amount, 'o_currency,
'o_secondary_key)
-
- util.addDataStream[(Timestamp, String, String, Int, Int)](
- "RatesHistory", 'rowtime.rowtime, 'comment, 'currency, 'rate,
'secondary_key)
- val rates = util.tableEnv
- .sqlQuery("SELECT * FROM RatesHistory WHERE rate > 110")
- .createTemporalTableFunction($"rowtime", $"currency")
- util.addTemporarySystemFunction("Rates", rates)
-
- val sqlQuery =
- "SELECT * FROM " +
- "(SELECT " +
- "o_amount * rate as rate, " +
- "secondary_key as secondary_key " +
- "FROM Orders AS o, " +
- "LATERAL TABLE (Rates(o_rowtime)) AS r " +
- "WHERE currency = o_currency OR secondary_key = o_secondary_key), " +
- "Table3 " +
- "WHERE t3_secondary_key = secondary_key"
+ def testProcTimeTemporalJoinWithView(): Unit = {
+ val sqlQuery = "SELECT " +
+ "o_amount * rate as rate " +
+ "FROM Orders AS o JOIN " +
+ "latest_rates " +
+ "FOR SYSTEM_TIME AS OF o.o_proctime as r1 " +
+ "on o.o_currency = r1.currency"
util.verifyPlan(sqlQuery)
}
@Test
- def testUncorrelatedJoin(): Unit = {
- expectedException.expect(classOf[TableException])
- expectedException.expectMessage(containsString("Cannot generate a valid
execution plan"))
+ def testInvalidTemporalTablJoin(): Unit = {
+ util.addTable(
+ """
+ |CREATE TABLE leftTableWithoutTimeAttribute (
+ | o_amount INT,
+ | o_currency STRING,
+ | o_time TIMESTAMP(3)
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin)
+ val sqlQuery1 = "SELECT " +
+ "o_amount * rate as rate " +
+ "FROM leftTableWithoutTimeAttribute AS o JOIN " +
+ "RatesHistoryWithPK FOR SYSTEM_TIME AS OF o.o_time as r " +
+ "on o.o_currency = r.currency"
+ expectExceptionThrown(
+ sqlQuery1,
+ s"Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF'" +
+ s" left table's time attribute field",
+ classOf[ValidationException])
- val sqlQuery = "SELECT " +
+ util.addTable(
+ """
+ |CREATE TABLE versionedTableWithoutPk (
+ | currency STRING,
+ | rate INT,
+ | rowtime TIMESTAMP(3),
+ | WATERMARK FOR rowtime AS rowtime
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin)
+ val sqlQuery2 = "SELECT " +
+ "o_amount * rate as rate " +
+ "FROM Orders AS o JOIN " +
+ "versionedTableWithoutPk FOR SYSTEM_TIME AS OF o.o_rowtime as r " +
+ "on o.o_currency = r.currency"
+ expectExceptionThrown(
+ sqlQuery2,
+ s"Event-Time Temporal Table Join requires both primary key and row time
attribute in " +
+ s"versioned table, but no primary key found.",
+ classOf[ValidationException])
+
+ util.addTable(
+ """
+ |CREATE TABLE versionedTableWithoutTimeAttribute (
+ | currency STRING,
+ | rate INT,
+ | rowtime TIMESTAMP(3),
+ | PRIMARY KEY(currency) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin)
+ val sqlQuery3 = "SELECT " +
"o_amount * rate as rate " +
- "FROM Orders AS o, " +
- "LATERAL TABLE (Rates(TIMESTAMP '2016-06-27 10:10:42.123')) AS r " +
- "WHERE currency = o_currency"
+ "FROM Orders AS o JOIN " +
+ "versionedTableWithoutTimeAttribute FOR SYSTEM_TIME AS OF o.o_rowtime as
r " +
+ "on o.o_currency = r.currency"
+ expectExceptionThrown(
+ sqlQuery3,
+ s"Event-Time Temporal Table Join requires both primary key and row time
attribute in " +
+ s"versioned table, but no row time attribute found.",
+ classOf[ValidationException])
- util.verifyExplain(sqlQuery)
+ util.addTable(
+ """
+ |CREATE TABLE versionedTableWithoutRowtime (
+ | currency STRING,
+ | rate INT,
+ | rowtime TIMESTAMP(3),
+ | proctime AS PROCTIME(),
+ | PRIMARY KEY(currency) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin)
+ val sqlQuery4 = "SELECT " +
+ "o_amount * rate as rate " +
+ "FROM Orders AS o JOIN " +
+ "versionedTableWithoutRowtime FOR SYSTEM_TIME AS OF o.o_rowtime as r " +
+ "on o.o_currency = r.currency"
+ expectExceptionThrown(
+ sqlQuery4,
+ s"Event-Time Temporal Table Join requires both primary key and row time
attribute in " +
+ s"versioned table, but no row time attribute found.",
+ classOf[ValidationException])
Review comment:
another case: join keys does not match pk
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTemporalJoinRule.scala
##########
@@ -18,56 +18,53 @@
package org.apache.flink.table.planner.plan.rules.physical.stream
-import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
-import org.apache.flink.table.planner.plan.nodes.FlinkConventions
-import org.apache.flink.table.planner.plan.nodes.logical._
-import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin
-import
org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.containsTemporalJoinCondition
-import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil,
IntervalJoinUtil}
+import java.util
import org.apache.calcite.plan.RelOptRule.{any, operand}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.core.JoinRelType
-import java.util
+import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
Review comment:
reorder imports
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -275,27 +280,47 @@ object StreamExecTemporalJoinToCoProcessTranslator {
joinInfo: JoinInfo,
rexBuilder: RexBuilder): StreamExecTemporalJoinToCoProcessTranslator = {
- checkState(
- !joinInfo.isEqui,
- "Missing %s in join condition",
- TEMPORAL_JOIN_CONDITION)
val leftType = FlinkTypeFactory.toLogicalRowType(leftInput.getRowType)
val rightType = FlinkTypeFactory.toLogicalRowType(rightInput.getRowType)
- val nonEquiJoinRex: RexNode = joinInfo.getRemaining(rexBuilder)
+
val temporalJoinConditionExtractor = new TemporalJoinConditionExtractor(
textualRepresentation,
leftType.getFieldCount,
joinInfo,
rexBuilder)
+ val nonEquiJoinRex: RexNode = joinInfo.getRemaining(rexBuilder)
val remainingNonEquiJoinPredicates =
temporalJoinConditionExtractor.apply(nonEquiJoinRex)
- checkState(
- temporalJoinConditionExtractor.leftTimeAttribute.isDefined &&
- temporalJoinConditionExtractor.rightPrimaryKeyExpression.isDefined,
- "Missing %s in join condition",
- TEMPORAL_JOIN_CONDITION)
+ val (leftTimeAttributeInputRef, rightTimeAttributeInputRef) =
+ if (TemporalJoinUtil.isRowTimeJoin(rexBuilder, joinInfo)) {
+ checkState(
+ temporalJoinConditionExtractor.leftTimeAttribute.isDefined &&
+ temporalJoinConditionExtractor.rightPrimaryKeyExpression.isDefined,
+ "Missing %s in Event-Time temporal join condition",
+ TEMPORAL_JOIN_CONDITION)
+ (extractInputRef(
+ temporalJoinConditionExtractor.leftTimeAttribute.get,
+ textualRepresentation),
+ temporalJoinConditionExtractor.rightTimeAttribute.map(
+ rightTimeAttribute =>
+ extractInputRef(
+ rightTimeAttribute,
+ textualRepresentation
+ ) - leftType.getFieldCount))
+ } else {
+ val leftTimeAttributes = leftInput.getRowType.getFieldList
+ .filter(f => f.getType.isInstanceOf[TimeIndicatorRelDataType])
+ if (leftTimeAttributes.isEmpty) {
+ throw new ValidationException(
Review comment:
please throw the same exception for the validation
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]