godfreyhe commented on a change in pull request #13299:
URL: https://github.com/apache/flink/pull/13299#discussion_r505142151
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
##########
@@ -73,6 +122,28 @@ class ChangelogModeInferenceTest(aggMode: AggMode) extends
TableTestBase {
util.verifyPlan(sql, ExplainDetail.CHANGELOG_MODE)
}
+ @Test
+ def testTemporalJoinWithDeduplicateView(): Unit = {
Review comment:
it's better we can do some refactor, many test cases do not involve
aggregation
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRuleTest.scala
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api._
+import
org.apache.flink.table.planner.plan.optimize.program.{FlinkChainedProgram,
FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE, StreamOptimizeContext}
+import org.apache.flink.table.planner.utils.{StreamTableTestUtil,
TableTestBase, TableTestUtil}
+import org.junit.{Before, Test}
+
+/**
+ * Test for [[LogicalCorrelateToJoinFromTemporalTableRule]].
+ */
+class LogicalCorrelateToJoinFromTemporalTableRuleTest extends TableTestBase {
+
+ protected val util: StreamTableTestUtil = streamTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ util.addTable(
+ """
+ |CREATE TABLE T1 (
+ | id STRING,
+ | mount INT,
+ | proctime as PROCTIME(),
+ | rowtime TIMESTAMP(3),
+ | WATERMARK FOR rowtime AS rowtime
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin)
+
+ //lookup table, CollectionTableSource implements LookupableTableSource
interface
+ util.addTable(
+ """
+ |CREATE TABLE T2 (
+ | id STRING,
+ | rate INT,
+ | PRIMARY KEY(id) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin)
+
+ // non-lookup table
+ util.addTableSource[(String, Int)]("T3", 'id, 'rate, 'rowtime.rowtime())
+ }
+
+ @Test
+ def testLookupJoinWithFilter(): Unit = {
+
setUpCurrentRule(LogicalCorrelateToJoinFromTemporalTableRule.LOOKUP_JOIN_WITH_FILTER)
+ util.verifyPlan("SELECT * FROM T1 JOIN T2 FOR SYSTEM_TIME AS OF
T1.proctime AS dimTable " +
+ "ON T1.id = dimTable.id AND dimTable.rate > 10")
+ }
+
+ @Test
+ def testLeftLookupJoinOnTrue(): Unit = {
+ // lookup join also does not support ON TRUE condition in runtime
+
setUpCurrentRule(LogicalCorrelateToJoinFromTemporalTableRule.LOOKUP_JOIN_WITHOUT_FILTER)
+ util.verifyPlan("SELECT * FROM T1 LEFT JOIN T2 FOR SYSTEM_TIME AS OF " +
+ "T1.proctime AS dimTable ON TRUE")
+ }
+
+ @Test
+ def testProcTimeTemporalJoinWithFilter(): Unit = {
+ setUpCurrentRule(LogicalCorrelateToJoinFromTemporalTableRule.WITH_FILTER)
+ util.verifyPlan("SELECT * FROM T1 JOIN T3 FOR SYSTEM_TIME AS OF
T1.proctime AS dimTable " +
+ "ON T1.id = dimTable.id AND dimTable.rate > 10")
+ }
+
+ @Test
+ def testRowTimeTemporalJoinWithFilter(): Unit = {
+ setUpCurrentRule(LogicalCorrelateToJoinFromTemporalTableRule.WITH_FILTER)
+ util.verifyPlan("SELECT * FROM T1 JOIN T3 FOR SYSTEM_TIME AS OF T1.rowtime
AS dimTable " +
+ "ON T1.id = dimTable.id AND dimTable.rate > 10")
+ }
+
+ @Test
+ def testRowTimeLeftTemporalJoinWithFilter(): Unit = {
+ setUpCurrentRule(LogicalCorrelateToJoinFromTemporalTableRule.WITH_FILTER)
+ util.verifyPlan("SELECT * FROM T1 LEFT JOIN T3 FOR SYSTEM_TIME AS OF
T1.rowtime AS dimTable " +
+ "ON T1.id = dimTable.id AND dimTable.rate > 10")
+ }
+
+ @Test
+ def testLookupJoinOnTrue(): Unit = {
Review comment:
also add a test about non-equal condition
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -241,7 +233,7 @@ class StreamExecTemporalJoinToCoProcessTranslator private (
val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
val maxRetentionTime = tableConfig.getMaxIdleStateRetentionTime
joinType match {
- case JoinRelType.INNER =>
+ case JoinRelType.INNER | JoinRelType.LEFT =>
Review comment:
Does runtime support left join? this change should be covered by at
least one IT case
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/common/CommonTemporalTableJoinRule.scala
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.common
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.connector.source.LookupTableSource
+import
org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalLegacyTableSourceScan,
FlinkLogicalRel, FlinkLogicalSnapshot, FlinkLogicalTableSourceScan}
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecLookupJoin,
StreamExecTemporalJoin}
+import org.apache.flink.table.planner.plan.schema.{LegacyTableSourceTable,
TableSourceTable, TimeIndicatorRelDataType}
+import org.apache.flink.table.sources.LookupableTableSource
+
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.{LogicalProject, LogicalTableScan}
+import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess}
+
+/**
+ * Base implementation that matches temporal join node.
+ *
+ * <p> The initial temporal table join (FOR SYSTEM_TIME AS OF) is a
Correlate, rewrite it into
+ * a Join to make join condition can be pushed-down. The join will be
translated into
+ * [[StreamExecLookupJoin]] in physical or translated into
[[StreamExecTemporalJoin]].
+ */
+trait CommonTemporalTableJoinRule {
+
+ protected def matches(snapshot: FlinkLogicalSnapshot): Boolean = {
+
+ // period specification check
+ snapshot.getPeriod match {
+ // it's left table's field, pass
+ case r: RexFieldAccess if
r.getReferenceExpr.isInstanceOf[RexCorrelVariable] =>
+ case _ =>
+ throw new TableException("Temporal table join currently only supports
" +
+ "'FOR SYSTEM_TIME AS OF' left table's time attribute field, doesn't
support 'PROCTIME()'")
+ }
+
+ snapshot.getPeriod.getType match {
+ // supports both event-time and processing time
+ case t: TimeIndicatorRelDataType =>
+ case _ =>
+ throw new TableException("Temporal table join currently only supports
" +
+ "'FOR SYSTEM_TIME AS OF' left table's time attribute field")
+ }
+ true
+ }
+
+ protected def canConvertToLookupJoin(
+ snapshot: FlinkLogicalSnapshot,
+ snapshotInput: FlinkLogicalRel): Boolean = {
+ val isProcessingTime = snapshot.getPeriod.getType match {
+ case t: TimeIndicatorRelDataType if !t.isEventTime => true
+ case _ => false
+ }
+
+ val tableScan = getTableScan(snapshotInput)
+ val snapshotOnLookupSource = tableScan match {
+ case Some(scan) => isTableSourceScan(scan) && isLookupTableSource(scan)
+ case _ => false
+ }
+
+ isProcessingTime && snapshotOnLookupSource
+ }
+
+ private def getTableScan(snapshotInput: RelNode): Option[TableScan] = {
+ snapshotInput match {
+ case tableScan: TableScan
+ => Some(tableScan)
+ // computed column on lookup table
+ case project: LogicalProject if
trimHep(project.getInput).isInstanceOf[TableScan]
+ => Some(trimHep(project.getInput).asInstanceOf[TableScan])
+ case _ => None
+ }
+ }
+
+ private def isTableSourceScan(relNode: RelNode): Boolean = {
+ relNode match {
+ case r: LogicalTableScan =>
+ val table = r.getTable
+ table match {
+ case _: LegacyTableSourceTable[Any] | _: TableSourceTable => true
+ case _ => false
+ }
+ case _: FlinkLogicalLegacyTableSourceScan | _:
FlinkLogicalTableSourceScan => true
+ case _ => false
+ }
+ }
+
+ private def isLookupTableSource(relNode: RelNode): Boolean = relNode match {
+ case scan: FlinkLogicalLegacyTableSourceScan =>
+ scan.tableSource.isInstanceOf[LookupableTableSource[_]]
+ case scan: FlinkLogicalTableSourceScan =>
+ scan.tableSource.isInstanceOf[LookupTableSource]
+ case scan: LogicalTableScan =>
+ scan.getTable match {
+ case table: LegacyTableSourceTable[_] =>
+ table.tableSource.isInstanceOf[LookupableTableSource[_]]
+ case table: TableSourceTable =>
+ table.tableSource.isInstanceOf[LookupTableSource]
+ case _ => false
+ }
+ case _ => false
+ }
+
+ /** Trim out the HepRelVertex wrapper and get current relational expression.
*/
+ protected def trimHep(node: RelNode): RelNode = {
Review comment:
both `StreamExecTemporalJoinRule` and
`TemporalJoinRewriteWithUniqueKeyRule` will use this method indirectly, while
`StreamExecTemporalJoinRule` is used in volcano planner.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRuleTest.scala
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api._
+import
org.apache.flink.table.planner.plan.optimize.program.{FlinkChainedProgram,
FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE, StreamOptimizeContext}
+import org.apache.flink.table.planner.utils.{StreamTableTestUtil,
TableTestBase, TableTestUtil}
+import org.junit.{Before, Test}
+
+/**
+ * Test for [[LogicalCorrelateToJoinFromTemporalTableRule]].
+ */
+class LogicalCorrelateToJoinFromTemporalTableRuleTest extends TableTestBase {
Review comment:
add a test about right join and full join
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]