This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new eb44ac01c99 [FLINK-29849][table-planner] Fix event time temporal join
on an upsert source may produce incorrect execution plan
eb44ac01c99 is described below
commit eb44ac01c9969cb22ab832b6b2155b109f015b06
Author: lincoln.lil <[email protected]>
AuthorDate: Wed Nov 2 16:37:45 2022 +0800
[FLINK-29849][table-planner] Fix event time temporal join on an upsert
source may produce incorrect execution plan
This closes #21219
---
.../logical/EventTimeTemporalJoinRewriteRule.java | 474 +++++++++++++++++++++
.../plan/rules/logical/FlinkFilterJoinRule.java | 2 +
.../planner/plan/utils/TemporalTableJoinUtil.java | 69 +++
.../logical/FlinkLogicalTableSourceScan.scala | 30 +-
.../stream/StreamPhysicalTemporalJoin.scala | 4 +-
.../plan/optimize/program/FlinkStreamProgram.scala | 19 +-
.../planner/plan/rules/FlinkStreamRuleSets.scala | 2 +-
.../TemporalJoinRewriteWithUniqueKeyRule.scala | 7 +-
.../stream/StreamPhysicalTableSourceScanRule.scala | 4 +-
.../planner/plan/utils/TemporalJoinUtil.scala | 7 +-
...AssignerChangelogNormalizeTransposeRuleTest.xml | 8 +-
.../planner/plan/stream/sql/TableScanTest.xml | 8 +-
.../plan/stream/sql/join/TemporalJoinTest.xml | 38 ++
...signerChangelogNormalizeTransposeRuleTest.scala | 2 +
.../plan/stream/sql/join/TemporalJoinTest.scala | 64 ++-
.../runtime/stream/sql/TemporalJoinITCase.scala | 5 +-
.../temporal/TemporalRowTimeJoinOperatorTest.java | 77 ++++
17 files changed, 787 insertions(+), 33 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java
new file mode 100644
index 00000000000..e5a5f57bb52
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot;
+import
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType;
+import org.apache.flink.table.planner.plan.utils.TemporalTableJoinUtil;
+
+import
org.apache.flink.shaded.curator5.org.apache.curator.shaded.com.google.common.collect.Lists;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RuleSet;
+import org.apache.calcite.tools.RuleSets;
+
+/**
+ * Traverses an event time temporal table join {@link RelNode} tree and update
the right child to
+ * set {@link FlinkLogicalTableSourceScan}'s eventTimeSnapshot property to
true which will prevent
+ * it generating a new StreamPhysicalChangelogNormalize later.
+ *
+ * <p>the match patterns are as following(8 variants, the three `Calc` nodes
are all optional):
+ *
+ * <pre>{@code
+ * Join (event time temporal)
+ * / \
+ * RelNode [Calc]
+ * \
+ * Snapshot
+ * \
+ * [Calc]
+ * \
+ * WatermarkAssigner
+ * \
+ * [Calc]
+ * \
+ * TableScan
+ * }</pre>
+ *
+ * <p>Note: This rule can only be used in a separate {@link
org.apache.calcite.plan.hep.HepProgram}
+ * after `LOGICAL_REWRITE` rule sets are applied for now.
+ */
+public class EventTimeTemporalJoinRewriteRule
+ extends RelRule<EventTimeTemporalJoinRewriteRule.Config> {
+
+ public static final RuleSet EVENT_TIME_TEMPORAL_JOIN_REWRITE_RULES =
+ RuleSets.ofList(
+ Config.JOIN_CALC_SNAPSHOT_CALC_WMA_CALC_TS.toRule(),
+ Config.JOIN_CALC_SNAPSHOT_CALC_WMA_TS.toRule(),
+ Config.JOIN_CALC_SNAPSHOT_WMA_CALC_TS.toRule(),
+ Config.JOIN_CALC_SNAPSHOT_WMA_TS.toRule(),
+ Config.JOIN_SNAPSHOT_CALC_WMA_CALC_TS.toRule(),
+ Config.JOIN_SNAPSHOT_CALC_WMA_TS.toRule(),
+ Config.JOIN_SNAPSHOT_WMA_CALC_TS.toRule(),
+ Config.JOIN_SNAPSHOT_WMA_TS.toRule());
+
+ public EventTimeTemporalJoinRewriteRule(Config config) {
+ super(config);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ FlinkLogicalJoin join = call.rel(0);
+ RexNode joinCondition = join.getCondition();
+ // only matches event time temporal join
+ return joinCondition != null
+ &&
TemporalTableJoinUtil.isEventTimeTemporalJoin(joinCondition);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ FlinkLogicalJoin join = call.rel(0);
+ FlinkLogicalRel joinRightChild = call.rel(2);
+ RelNode newRight = transmitSnapshotRequirement(joinRightChild);
+ call.transformTo(
+ join.copy(join.getTraitSet(),
Lists.newArrayList(join.getLeft(), newRight)));
+ }
+
+ private RelNode transmitSnapshotRequirement(RelNode node) {
+ if (node instanceof FlinkLogicalCalc) {
+ final FlinkLogicalCalc calc = (FlinkLogicalCalc) node;
+ // filter is not allowed because it will corrupt the version table
+ if (null != calc.getProgram().getCondition()) {
+ throw new TableException(
+ "Filter is not allowed for right changelog input of
event time temporal join,"
+ + " it will corrupt the versioning of data.
Please consider removing the filter before joining.");
+ }
+
+ final RelNode child = calc.getInput();
+ final RelNode newChild = transmitSnapshotRequirement(child);
+ if (newChild != child) {
+ return calc.copy(calc.getTraitSet(), newChild,
calc.getProgram());
+ }
+ return calc;
+ }
+ if (node instanceof FlinkLogicalSnapshot) {
+ final FlinkLogicalSnapshot snapshot = (FlinkLogicalSnapshot) node;
+ assert isEventTime(snapshot.getPeriod().getType());
+ final RelNode child = snapshot.getInput();
+ final RelNode newChild = transmitSnapshotRequirement(child);
+ if (newChild != child) {
+ return snapshot.copy(snapshot.getTraitSet(), newChild,
snapshot.getPeriod());
+ }
+ return snapshot;
+ }
+ if (node instanceof HepRelVertex) {
+ return transmitSnapshotRequirement(((HepRelVertex)
node).getCurrentRel());
+ }
+ if (node instanceof FlinkLogicalWatermarkAssigner) {
+ final FlinkLogicalWatermarkAssigner wma =
(FlinkLogicalWatermarkAssigner) node;
+ final RelNode child = wma.getInput();
+ final RelNode newChild = transmitSnapshotRequirement(child);
+ if (newChild != child) {
+ return wma.copy(
+ wma.getTraitSet(), newChild, wma.rowtimeFieldIndex(),
wma.watermarkExpr());
+ }
+ return wma;
+ }
+ if (node instanceof FlinkLogicalTableSourceScan) {
+ final FlinkLogicalTableSourceScan ts =
(FlinkLogicalTableSourceScan) node;
+ // update eventTimeSnapshotRequired to true
+ return ts.copy(ts.getTraitSet(), ts.relOptTable(), true);
+ }
+ return node;
+ }
+
+ private boolean isEventTime(RelDataType period) {
+ if (period instanceof TimeIndicatorRelDataType) {
+ return ((TimeIndicatorRelDataType) period).isEventTime();
+ }
+ return false;
+ }
+
+ /**
+ * Configuration for {@link EventTimeTemporalJoinRewriteRule}.
+ *
+ * <p>Operator tree:
+ *
+ * <pre>{@code
+ * Join (event time temporal)
+ * / \
+ * RelNode [Calc]
+ * \
+ * Snapshot
+ * \
+ * [Calc]
+ * \
+ * WatermarkAssigner
+ * \
+ * [Calc]
+ * \
+ * TableScan
+ * }</pre>
+ *
+ * <p>8 variants:
+ *
+ * <ul>
+ * <li>JOIN_CALC_SNAPSHOT_CALC_WMA_CALC_TS
+ * <li>JOIN_CALC_SNAPSHOT_CALC_WMA_TS
+ * <li>JOIN_CALC_SNAPSHOT_WMA_CALC_TS
+ * <li>JOIN_CALC_SNAPSHOT_WMA_TS
+ * <li>JOIN_SNAPSHOT_CALC_WMA_CALC_TS
+ * <li>JOIN_SNAPSHOT_CALC_WMA_TS
+ * <li>JOIN_SNAPSHOT_WMA_CALC_TS
+ * <li>JOIN_SNAPSHOT_WMA_TS
+ * </ul>
+ */
+ public interface Config extends RelRule.Config {
+ RelRule.Config JOIN_CALC_SNAPSHOT_CALC_WMA_CALC_TS =
+ EMPTY.withDescription(
+
"EventTimeTemporalJoinRewriteRule_CALC_SNAPSHOT_CALC_WMA_CALC")
+ .as(Config.class)
+ .withOperandSupplier(
+ joinTransform ->
+ joinTransform
+
.operand(FlinkLogicalJoin.class)
+ .inputs(
+ left ->
+
left.operand(FlinkLogicalRel.class)
+
.anyInputs(),
+ right ->
+ right.operand(
+
FlinkLogicalCalc
+
.class)
+
.oneInput(
+
r1 ->
+
r1.operand(
+
FlinkLogicalSnapshot
+
.class)
+
.oneInput(
+
r2 ->
+
r2.operand(
+
FlinkLogicalCalc
+
.class)
+
.oneInput(
+
r3 ->
+
r3.operand(
+
FlinkLogicalWatermarkAssigner
+
.class)
+
.oneInput(
+
r4 ->
+
r4.operand(
+
FlinkLogicalCalc
+
.class)
+
.oneInput(
+
r5 ->
+
r5.operand(
+
FlinkLogicalTableSourceScan
+
.class)
+
.noInputs())))))));
+ RelRule.Config JOIN_CALC_SNAPSHOT_CALC_WMA_TS =
+
EMPTY.withDescription("EventTimeTemporalJoinRewriteRule_CALC_SNAPSHOT_CALC_WMA")
+ .as(Config.class)
+ .withOperandSupplier(
+ joinTransform ->
+ joinTransform
+
.operand(FlinkLogicalJoin.class)
+ .inputs(
+ left ->
+
left.operand(FlinkLogicalRel.class)
+
.anyInputs(),
+ right ->
+ right.operand(
+
FlinkLogicalCalc
+
.class)
+
.oneInput(
+
r1 ->
+
r1.operand(
+
FlinkLogicalSnapshot
+
.class)
+
.oneInput(
+
r2 ->
+
r2.operand(
+
FlinkLogicalCalc
+
.class)
+
.oneInput(
+
r3 ->
+
r3.operand(
+
FlinkLogicalWatermarkAssigner
+
.class)
+
.oneInput(
+
r4 ->
+
r4.operand(
+
FlinkLogicalTableSourceScan
+
.class)
+
.noInputs()))))));
+
+ RelRule.Config JOIN_CALC_SNAPSHOT_WMA_CALC_TS =
+
EMPTY.withDescription("EventTimeTemporalJoinRewriteRule_CALC_SNAPSHOT_WMA_CALC")
+ .as(Config.class)
+ .withOperandSupplier(
+ joinTransform ->
+ joinTransform
+
.operand(FlinkLogicalJoin.class)
+ .inputs(
+ left ->
+
left.operand(FlinkLogicalRel.class)
+
.anyInputs(),
+ right ->
+ right.operand(
+
FlinkLogicalCalc
+
.class)
+
.oneInput(
+
r1 ->
+
r1.operand(
+
FlinkLogicalSnapshot
+
.class)
+
.oneInput(
+
r2 ->
+
r2.operand(
+
FlinkLogicalWatermarkAssigner
+
.class)
+
.oneInput(
+
r3 ->
+
r3.operand(
+
FlinkLogicalCalc
+
.class)
+
.oneInput(
+
r4 ->
+
r4.operand(
+
FlinkLogicalTableSourceScan
+
.class)
+
.noInputs()))))));
+ RelRule.Config JOIN_CALC_SNAPSHOT_WMA_TS =
+
EMPTY.withDescription("EventTimeTemporalJoinRewriteRule_CALC_SNAPSHOT_WMA")
+ .as(Config.class)
+ .withOperandSupplier(
+ joinTransform ->
+ joinTransform
+
.operand(FlinkLogicalJoin.class)
+ .inputs(
+ left ->
+
left.operand(FlinkLogicalRel.class)
+
.anyInputs(),
+ right ->
+ right.operand(
+
FlinkLogicalCalc
+
.class)
+
.oneInput(
+
r1 ->
+
r1.operand(
+
FlinkLogicalSnapshot
+
.class)
+
.oneInput(
+
r2 ->
+
r2.operand(
+
FlinkLogicalWatermarkAssigner
+
.class)
+
.oneInput(
+
r3 ->
+
r3.operand(
+
FlinkLogicalTableSourceScan
+
.class)
+
.noInputs())))));
+
+ RelRule.Config JOIN_SNAPSHOT_CALC_WMA_CALC_TS =
+
EMPTY.withDescription("EventTimeTemporalJoinRewriteRule_SNAPSHOT_CALC_WMA_CALC")
+ .as(Config.class)
+ .withOperandSupplier(
+ joinTransform ->
+ joinTransform
+
.operand(FlinkLogicalJoin.class)
+ .inputs(
+ left ->
+
left.operand(FlinkLogicalRel.class)
+
.anyInputs(),
+ right ->
+ right.operand(
+
FlinkLogicalSnapshot
+
.class)
+
.oneInput(
+
r1 ->
+
r1.operand(
+
FlinkLogicalCalc
+
.class)
+
.oneInput(
+
r2 ->
+
r2.operand(
+
FlinkLogicalWatermarkAssigner
+
.class)
+
.oneInput(
+
r3 ->
+
r3.operand(
+
FlinkLogicalCalc
+
.class)
+
.oneInput(
+
r4 ->
+
r4.operand(
+
FlinkLogicalTableSourceScan
+
.class)
+
.noInputs()))))));
+ RelRule.Config JOIN_SNAPSHOT_CALC_WMA_TS =
+
EMPTY.withDescription("EventTimeTemporalJoinRewriteRule_SNAPSHOT_CALC_WMA")
+ .as(Config.class)
+ .withOperandSupplier(
+ joinTransform ->
+ joinTransform
+
.operand(FlinkLogicalJoin.class)
+ .inputs(
+ left ->
+
left.operand(FlinkLogicalRel.class)
+
.anyInputs(),
+ right ->
+ right.operand(
+
FlinkLogicalSnapshot
+
.class)
+
.oneInput(
+
r1 ->
+
r1.operand(
+
FlinkLogicalCalc
+
.class)
+
.oneInput(
+
r2 ->
+
r2.operand(
+
FlinkLogicalWatermarkAssigner
+
.class)
+
.oneInput(
+
r3 ->
+
r3.operand(
+
FlinkLogicalTableSourceScan
+
.class)
+
.noInputs())))));
+
+ RelRule.Config JOIN_SNAPSHOT_WMA_CALC_TS =
+
EMPTY.withDescription("EventTimeTemporalJoinRewriteRule_SNAPSHOT_WMA_CALC")
+ .as(Config.class)
+ .withOperandSupplier(
+ joinTransform ->
+ joinTransform
+
.operand(FlinkLogicalJoin.class)
+ .inputs(
+ left ->
+
left.operand(FlinkLogicalRel.class)
+
.anyInputs(),
+ right ->
+ right.operand(
+
FlinkLogicalSnapshot
+
.class)
+
.oneInput(
+
r1 ->
+
r1.operand(
+
FlinkLogicalWatermarkAssigner
+
.class)
+
.oneInput(
+
r2 ->
+
r2.operand(
+
FlinkLogicalCalc
+
.class)
+
.oneInput(
+
r3 ->
+
r3.operand(
+
FlinkLogicalTableSourceScan
+
.class)
+
.noInputs())))));
+
+ RelRule.Config JOIN_SNAPSHOT_WMA_TS =
+
EMPTY.withDescription("EventTimeTemporalJoinRewriteRule_SNAPSHOT_WMA")
+ .as(Config.class)
+ .withOperandSupplier(
+ joinTransform ->
+ joinTransform
+
.operand(FlinkLogicalJoin.class)
+ .inputs(
+ left ->
+
left.operand(FlinkLogicalRel.class)
+
.anyInputs(),
+ right ->
+ right.operand(
+
FlinkLogicalSnapshot
+
.class)
+
.oneInput(
+
r1 ->
+
r1.operand(
+
FlinkLogicalWatermarkAssigner
+
.class)
+
.oneInput(
+
r2 ->
+
r2.operand(
+
FlinkLogicalTableSourceScan
+
.class)
+
.noInputs()))));
+
+ @Override
+ default RelOptRule toRule() {
+ return new EventTimeTemporalJoinRewriteRule(this);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java
index b7f7f727da5..d1c58a000b4 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java
@@ -450,6 +450,8 @@ public abstract class FlinkFilterJoinRule<C extends
FlinkFilterJoinRule.Config>
* Rule that tries to push filter expressions into a join condition and
into the inputs of the
* join.
*
+ * <p>Note: It never pushes a filter into an event time temporal join in
streaming.
+ *
* @see CoreRules#FILTER_INTO_JOIN
*/
public static class FlinkFilterIntoJoinRule
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java
new file mode 100644
index 00000000000..38b5c705b0d
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexVisitor;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.util.Util;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Utility for temporal table join which will gradually replace the scala
class {@link
+ * TemporalJoinUtil}.
+ */
+public class TemporalTableJoinUtil {
+
+ /**
+ * Check if the given join condition is an initial temporal join condition
or a rewrote join
+ * condition on event time.
+ */
+ public static boolean isEventTimeTemporalJoin(@Nonnull RexNode
joinCondition) {
+ RexVisitor<Void> temporalConditionFinder =
+ new RexVisitorImpl<Void>(true) {
+ @Override
+ public Void visitCall(RexCall call) {
+ if ((call.getOperator()
+ == TemporalJoinUtil
+
.INITIAL_TEMPORAL_JOIN_CONDITION()
+ &&
TemporalJoinUtil.isInitialRowTimeTemporalTableJoin(call))
+ || isRowTimeTemporalTableJoinCondition(call)) {
+ // has initial temporal join condition or
+ throw new Util.FoundOne(call);
+ }
+ return super.visitCall(call);
+ }
+ };
+ try {
+ joinCondition.accept(temporalConditionFinder);
+ } catch (Util.FoundOne found) {
+ return true;
+ }
+ return false;
+ }
+
+ /** Check if the given rexCall is a rewrote join condition on event time.
*/
+ public static boolean isRowTimeTemporalTableJoinCondition(RexCall call) {
+ // (LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, LEFT_KEY, RIGHT_KEY,
PRIMARY_KEY)
+ return call.getOperator() == TemporalJoinUtil.TEMPORAL_JOIN_CONDITION()
+ && call.operands.size() == 5;
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
index 080005b6714..ba4f16dee43 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -46,20 +46,43 @@ class FlinkLogicalTableSourceScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
hints: util.List[RelHint],
- relOptTable: TableSourceTable)
+ val relOptTable: TableSourceTable,
+ val eventTimeSnapshotRequired: Boolean = false)
extends TableScan(cluster, traitSet, hints, relOptTable)
with FlinkLogicalRel {
lazy val tableSource: DynamicTableSource = relOptTable.tableSource
+ def copy(
+ traitSet: RelTraitSet,
+ tableSourceTable: TableSourceTable,
+ eventTimeSnapshotRequired: Boolean): FlinkLogicalTableSourceScan = {
+ new FlinkLogicalTableSourceScan(
+ cluster,
+ traitSet,
+ getHints,
+ tableSourceTable,
+ eventTimeSnapshotRequired)
+ }
+
def copy(
traitSet: RelTraitSet,
tableSourceTable: TableSourceTable): FlinkLogicalTableSourceScan = {
- new FlinkLogicalTableSourceScan(cluster, traitSet, getHints,
tableSourceTable)
+ new FlinkLogicalTableSourceScan(
+ cluster,
+ traitSet,
+ getHints,
+ tableSourceTable,
+ eventTimeSnapshotRequired)
}
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]):
RelNode = {
- new FlinkLogicalTableSourceScan(cluster, traitSet, getHints, relOptTable)
+ new FlinkLogicalTableSourceScan(
+ cluster,
+ traitSet,
+ getHints,
+ relOptTable,
+ eventTimeSnapshotRequired)
}
override def deriveRowType(): RelDataType = {
@@ -79,6 +102,7 @@ class FlinkLogicalTableSourceScan(
.explainTerms(pw)
.item("fields", getRowType.getFieldNames.mkString(", "))
.itemIf("hints", RelExplainUtil.hintsToString(getHints),
!getHints.isEmpty)
+ .itemIf("eventTimeSnapshotRequired", "true", eventTimeSnapshotRequired)
}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTemporalJoin.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTemporalJoin.scala
index e5f1450a478..011c4c63f78 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTemporalJoin.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTemporalJoin.scala
@@ -23,7 +23,7 @@ import
org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec
import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin
import
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalJoin
-import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
+import org.apache.flink.table.planner.plan.utils.{TemporalJoinUtil,
TemporalTableJoinUtil}
import
org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.{TEMPORAL_JOIN_CONDITION,
TEMPORAL_JOIN_CONDITION_PRIMARY_KEY}
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
import org.apache.flink.util.Preconditions.checkState
@@ -166,7 +166,7 @@ class StreamPhysicalTemporalJoin(
}
if (
- TemporalJoinUtil.isRowTimeTemporalTableJoinCon(call) ||
+ TemporalTableJoinUtil.isRowTimeTemporalTableJoinCondition(call) ||
TemporalJoinUtil.isRowTimeTemporalFunctionJoinCon(call)
) {
leftTimeAttribute = Some(call.getOperands.get(0))
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
index 4a3908af658..fb8b8ef1f0e 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
@@ -21,6 +21,7 @@ import org.apache.flink.configuration.ReadableConfig
import org.apache.flink.table.api.config.OptimizerConfigOptions
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets
+import
org.apache.flink.table.planner.plan.rules.logical.EventTimeTemporalJoinRewriteRule
import org.apache.calcite.plan.hep.HepMatchOrder
@@ -251,10 +252,20 @@ object FlinkStreamProgram {
// logical rewrite
chainedProgram.addLast(
LOGICAL_REWRITE,
- FlinkHepRuleSetProgramBuilder.newBuilder
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.LOGICAL_REWRITE)
+ FlinkGroupProgramBuilder
+ .newBuilder[StreamOptimizeContext]
+ .addProgram(
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(FlinkStreamRuleSets.LOGICAL_REWRITE)
+ .build())
+ .addProgram(
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+
.add(EventTimeTemporalJoinRewriteRule.EVENT_TIME_TEMPORAL_JOIN_REWRITE_RULES)
+ .build())
.build()
)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index f46ef46a7c0..bc2b5a08e2f 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -135,7 +135,7 @@ object FlinkStreamRuleSets {
/** RuleSet about filter */
private val FILTER_RULES: RuleSet = RuleSets.ofList(
- // push a filter into a join
+ // push a filter into a join (which isn't an event time temporal join)
FlinkFilterJoinRule.FILTER_INTO_JOIN,
// push filter into the children of a join
FlinkFilterJoinRule.JOIN_CONDITION_PUSH,
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
index d3ef36ca150..74a17e5f55e 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
@@ -64,6 +64,7 @@ class TemporalJoinRewriteWithUniqueKeyRule
val join = call.rel[FlinkLogicalJoin](0)
val leftInput = call.rel[FlinkLogicalRel](1)
val snapshot = call.rel[FlinkLogicalSnapshot](2)
+ val snapshotInput = call.rel[FlinkLogicalRel](3)
val joinCondition = join.getCondition
@@ -84,7 +85,8 @@ class TemporalJoinRewriteWithUniqueKeyRule
}
val rexBuilder = join.getCluster.getRexBuilder
- val primaryKeyInputRefs = extractPrimaryKeyInputRefs(leftInput,
snapshot, rexBuilder)
+ val primaryKeyInputRefs =
+ extractPrimaryKeyInputRefs(leftInput, snapshot, snapshotInput,
rexBuilder)
validateRightPrimaryKey(join, rightJoinKey, primaryKeyInputRefs)
if (TemporalJoinUtil.isInitialRowTimeTemporalTableJoin(call)) {
@@ -166,11 +168,12 @@ class TemporalJoinRewriteWithUniqueKeyRule
private def extractPrimaryKeyInputRefs(
leftInput: RelNode,
snapshot: FlinkLogicalSnapshot,
+ snapshotInput: FlinkLogicalRel,
rexBuilder: RexBuilder): Option[Seq[RexNode]] = {
val rightFields = snapshot.getRowType.getFieldList
val fmq =
FlinkRelMetadataQuery.reuseOrCreate(snapshot.getCluster.getMetadataQuery)
- val upsertKeySet = fmq.getUpsertKeys(snapshot.getInput())
+ val upsertKeySet = fmq.getUpsertKeys(snapshotInput)
val fields = snapshot.getRowType.getFieldList
if (upsertKeySet != null && upsertKeySet.size() > 0) {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
index b26a71818b2..e7c390e07bd 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
@@ -69,8 +69,8 @@ class StreamPhysicalTableSourceScanRule
val resolvedSchema = table.contextResolvedTable.getResolvedSchema
if (
- isUpsertSource(resolvedSchema, table.tableSource) ||
- isSourceChangeEventsDuplicate(resolvedSchema, table.tableSource,
tableConfig)
+ !scan.eventTimeSnapshotRequired && (isUpsertSource(resolvedSchema,
table.tableSource) ||
+ isSourceChangeEventsDuplicate(resolvedSchema, table.tableSource,
tableConfig))
) {
// generate changelog normalize node
// primary key has been validated in CatalogSourceTable
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
index def8d430757..41c3dee6d5f 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
@@ -284,7 +284,7 @@ object TemporalJoinUtil {
val visitor = new RexVisitorImpl[Unit](true) {
override def visitCall(call: RexCall): Unit = {
if (
- isRowTimeTemporalTableJoinCon(call) ||
+ TemporalTableJoinUtil.isRowTimeTemporalTableJoinCondition(call) ||
isRowTimeTemporalFunctionJoinCon(call)
) {
rowtimeJoin = true
@@ -297,11 +297,6 @@ object TemporalJoinUtil {
rowtimeJoin
}
- def isRowTimeTemporalTableJoinCon(rexCall: RexCall): Boolean = {
- // (LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, LEFT_KEY, RIGHT_KEY,
PRIMARY_KEY)
- rexCall.getOperator == TEMPORAL_JOIN_CONDITION && rexCall.operands.length
== 5
- }
-
def isRowTimeTemporalFunctionJoinCon(rexCall: RexCall): Boolean = {
// (LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)
rexCall.getOperator == TEMPORAL_JOIN_CONDITION && rexCall.operands.length
== 3
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
index 3d4f6f2835a..32fc6188b9d 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
@@ -220,11 +220,9 @@ Calc(select=[a, b, f], where=[f], changelogMode=[I])
: +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*) AS
ingestion_time, a, b], changelogMode=[I])
: +- TableSourceScan(table=[[default_catalog, default_database,
t1]], fields=[a, b, ingestion_time], changelogMode=[I])
+- Exchange(distribution=[hash[a]], changelogMode=[I,UA,D])
- +- ChangelogNormalize(key=[a], changelogMode=[I,UA,D])
- +- Exchange(distribution=[hash[a]], changelogMode=[I,UA,D])
- +- WatermarkAssigner(rowtime=[ingestion_time],
watermark=[ingestion_time], changelogMode=[I,UA,D])
- +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*)
AS ingestion_time, a, f], changelogMode=[I,UA,D])
- +- TableSourceScan(table=[[default_catalog,
default_database, t2, project=[a, f], metadata=[ts]]], fields=[a, f,
ingestion_time], changelogMode=[I,UA,D])
+ +- WatermarkAssigner(rowtime=[ingestion_time],
watermark=[ingestion_time], changelogMode=[I,UA,D])
+ +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*) AS
ingestion_time, a, f], changelogMode=[I,UA,D])
+ +- TableSourceScan(table=[[default_catalog, default_database, t2,
project=[a, f], metadata=[ts]]], fields=[a, f, ingestion_time],
changelogMode=[I,UA,D])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 62c3334a959..b5445fd0754 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -308,11 +308,9 @@ Calc(select=[currency, amount, rate, *(amount, rate) AS
EXPR$3], changelogMode=[
:- Exchange(distribution=[hash[currency]], changelogMode=[I])
: +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime],
changelogMode=[I])
: +- TableSourceScan(table=[[default_catalog, default_database,
orders]], fields=[amount, currency, rowtime], changelogMode=[I])
- +- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D])
- +- ChangelogNormalize(key=[currency], changelogMode=[I,UA,D])
- +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
- +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime],
changelogMode=[UA,D])
- +- TableSourceScan(table=[[default_catalog, default_database,
rates_history]], fields=[currency, rate, rowtime], changelogMode=[UA,D])
+ +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime],
changelogMode=[UA,D])
+ +- TableSourceScan(table=[[default_catalog, default_database,
rates_history]], fields=[currency, rate, rowtime], changelogMode=[UA,D])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.xml
index 88a64981853..f809e5b9c24 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.xml
@@ -750,6 +750,44 @@ Calc(select=[amount, currency, rowtime,
PROCTIME_MATERIALIZE(proctime) AS procti
+- Exchange(distribution=[hash[currency]])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+- TableSourceScan(table=[[default_catalog,
default_database, RatesHistory]], fields=[currency, rate, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTemporalJoinUpsertSourceWithPostFilter">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(amount=[$0], currency=[$1], rowtime=[$2], proctime=[$3],
currency0=[$4], rate=[$5], valid=[$6], rowtime0=[$7])
++- LogicalFilter(condition=[=($6, _UTF-16LE'true')])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{1, 2}])
+ :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+ : +- LogicalProject(amount=[$0], currency=[$1], rowtime=[$2],
proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
Orders]])
+ +- LogicalFilter(condition=[=($cor0.currency, $0)])
+ +- LogicalSnapshot(period=[$cor0.rowtime])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
UpsertRates]])
+
+== Optimized Physical Plan ==
+Calc(select=[amount, currency, rowtime, PROCTIME_MATERIALIZE(proctime) AS
proctime, currency0, rate, CAST(_UTF-16LE'true':VARCHAR(2147483647) CHARACTER
SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS valid,
CAST(rowtime0 AS TIMESTAMP(3)) AS rowtime0], where=[=(valid,
_UTF-16LE'true':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")],
changelogMode=[I])
++- TemporalJoin(joinType=[InnerJoin], where=[AND(=(currency, currency0),
__TEMPORAL_JOIN_CONDITION(rowtime, rowtime0,
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency0),
__TEMPORAL_JOIN_LEFT_KEY(currency), __TEMPORAL_JOIN_RIGHT_KEY(currency0)))],
select=[amount, currency, rowtime, proctime, currency0, rate, valid, rowtime0],
changelogMode=[I])
+ :- Exchange(distribution=[hash[currency]], changelogMode=[I])
+ : +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime],
changelogMode=[I])
+ : +- Calc(select=[amount, currency, rowtime, PROCTIME() AS proctime],
changelogMode=[I])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
Orders]], fields=[amount, currency, rowtime], changelogMode=[I])
+ +- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime],
changelogMode=[I,UA,D])
+ +- TableSourceScan(table=[[default_catalog, default_database,
UpsertRates]], fields=[currency, rate, valid, rowtime], changelogMode=[I,UA,D])
+
+== Optimized Execution Plan ==
+Calc(select=[amount, currency, rowtime, PROCTIME_MATERIALIZE(proctime) AS
proctime, currency0, rate, CAST('true' AS VARCHAR(2147483647)) AS valid,
CAST(rowtime0 AS TIMESTAMP(3)) AS rowtime0], where=[(valid = 'true')])
++- TemporalJoin(joinType=[InnerJoin], where=[((currency = currency0) AND
__TEMPORAL_JOIN_CONDITION(rowtime, rowtime0,
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency0),
__TEMPORAL_JOIN_LEFT_KEY(currency), __TEMPORAL_JOIN_RIGHT_KEY(currency0)))],
select=[amount, currency, rowtime, proctime, currency0, rate, valid, rowtime0])
+ :- Exchange(distribution=[hash[currency]])
+ : +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+ : +- Calc(select=[amount, currency, rowtime, PROCTIME() AS proctime])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
Orders]], fields=[amount, currency, rowtime])
+ +- Exchange(distribution=[hash[currency]])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
UpsertRates]], fields=[currency, rate, valid, rowtime])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala
index c6b9b3b99fa..c02376ef48c 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala
@@ -194,6 +194,8 @@ class WatermarkAssignerChangelogNormalizeTransposeRuleTest
extends TableTestBase
| 'changelog-mode' = 'I,UA,D'
|)
""".stripMargin)
+ // After FLINK-28988 applied, the filter will not be pushed down into left
input of join and get
+ // a more optimal plan (upsert mode without ChangelogNormalize).
val sql =
"""
|SELECT t1.a, t1.b, t2.f
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala
index b0504ca82ff..32757cddfac 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala
@@ -17,7 +17,7 @@
*/
package org.apache.flink.table.planner.plan.stream.sql.join
-import org.apache.flink.table.api.{ExplainDetail, ValidationException}
+import org.apache.flink.table.api.{ExplainDetail, TableException,
ValidationException}
import org.apache.flink.table.planner.utils.{StreamTableTestUtil,
TableTestBase}
import org.junit.{Before, Test}
@@ -97,6 +97,21 @@ class TemporalJoinTest extends TableTestBase {
|)
""".stripMargin)
+ util.addTable("""
+ |CREATE TABLE UpsertRates (
+ | currency STRING,
+ | rate INT,
+ | valid VARCHAR,
+ | rowtime TIMESTAMP(3),
+ | WATERMARK FOR rowtime AS rowtime,
+ | PRIMARY KEY(currency) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,D',
+ | 'disable-lookup' = 'true'
+ |)
+ """.stripMargin)
+
util.addTable("""
|CREATE TABLE RatesOnly (
| currency STRING,
@@ -570,6 +585,53 @@ class TemporalJoinTest extends TableTestBase {
util.verifyExplainInsert(sql, ExplainDetail.CHANGELOG_MODE)
}
+ @Test
+ def testTemporalJoinUpsertSourceWithPostFilter(): Unit = {
+ val sqlQuery = "SELECT * " +
+ "FROM Orders AS o JOIN " +
+ "UpsertRates FOR SYSTEM_TIME AS OF o.rowtime AS r " +
+ "ON o.currency = r.currency WHERE valid = 'true'"
+
+ util.verifyExplain(sqlQuery, ExplainDetail.CHANGELOG_MODE)
+ }
+
+ @Test
+ def testTemporalJoinUpsertSourceWithPreFilter(): Unit = {
+ util.tableEnv.executeSql(s"""
+ |CREATE TEMPORARY VIEW V1 AS
+ |SELECT * FROM UpsertRates WHERE valid = 'true'
+ |""".stripMargin)
+
+ /**
+ * The problem is: there's exists a filter on an upsert changelog
input(changelogMode=[I,UA,D]),
+ * the UB message must exists for correctness.
+ *
+ * Intermediate plan with modify kind:
+ * {{{
+ * +- TemporalJoin(joinType=[InnerJoin], ..., changelogMode=[I])
+ * :- Exchange(distribution=[hash[currency]], changelogMode=[I])
+ * : +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime],
changelogMode=[I])
+ * : +- Calc(select=[amount, currency, rowtime, ...
changelogMode=[I])
+ * : +- TableSourceScan(table= Orders ... changelogMode=[I])
+ * +- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D])
+ * +- Calc(select=[currency, ... where=[=(valid, _UTF-16LE'true')],
changelogMode=[I,UA,D])
+ * +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime],
changelogMode=[I,UA,D])
+ * +- TableSourceScan(table= UpsertRates, ...
changelogMode=[I,UA,D])
+ * }}}
+ */
+
+ val sqlQuery = "SELECT * " +
+ "FROM Orders AS o JOIN " +
+ "V1 FOR SYSTEM_TIME AS OF o.rowtime AS r " +
+ "ON o.currency = r.currency"
+
+ expectExceptionThrown(
+ sqlQuery,
+ "Filter is not allowed for right changelog input of event time temporal
join, it will corrupt the versioning of data.",
+ classOf[TableException]
+ )
+ }
+
private def expectExceptionThrown(
sql: String,
keywords: String,
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
index 33d6224edfa..60aa95184b6 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
@@ -502,12 +502,13 @@ class TemporalJoinITCase(state: StateBackendMode) extends
StreamingWithStateTest
def testEventTimeTemporalJoinWithFilter(): Unit = {
tEnv.executeSql(
"CREATE VIEW v1 AS" +
- " SELECT * FROM versioned_currency_with_single_key WHERE rate < 115")
+ " SELECT * FROM versioned_currency_with_single_key")
val sql = "INSERT INTO rowtime_default_sink " +
" SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate,
r.currency_time " +
" FROM orders_rowtime AS o " +
" JOIN v1 FOR SYSTEM_TIME AS OF o.order_time as r " +
- " ON o.currency = r.currency"
+ " ON o.currency = r.currency" +
+ " WHERE rate < 115"
tEnv.executeSql(sql).await()
val expected = List(
"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01",
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
index 0c243bf7e84..92bf6f28e54 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
@@ -193,6 +193,83 @@ public class TemporalRowTimeJoinOperatorTest extends
TemporalTimeJoinOperatorTes
testHarness.close();
}
+ @Test
+ public void testRowTimeTemporalJoinOnUpsertSource() throws Exception {
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(new Watermark(1));
+ expectedOutput.add(new Watermark(2));
+ expectedOutput.add(updateAfterRecord(3L, "k1", "1a3", 2L, "k1",
"1a2"));
+ expectedOutput.add(new Watermark(5));
+ expectedOutput.add(insertRecord(6L, "k2", "2a3", 4L, "k2", "2a4"));
+ expectedOutput.add(new Watermark(8));
+ expectedOutput.add(new Watermark(9));
+ expectedOutput.add(insertRecord(11L, "k2", "5a12", 10L, "k2", "2a6"));
+ expectedOutput.add(new Watermark(13));
+
+ testRowTimeTemporalJoinOnUpsertSource(false, expectedOutput);
+ }
+
+ @Test
+ public void testRowTimeLeftTemporalJoinOnUpsertSource() throws Exception {
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(new Watermark(1));
+ expectedOutput.add(insertRecord(1L, "k1", "1a1", null, null, null));
+ expectedOutput.add(new Watermark(2));
+ expectedOutput.add(updateAfterRecord(3L, "k1", "1a3", 2L, "k1",
"1a2"));
+ expectedOutput.add(new Watermark(5));
+ expectedOutput.add(insertRecord(6L, "k2", "2a3", 4L, "k2", "2a4"));
+ expectedOutput.add(new Watermark(8));
+ expectedOutput.add(insertRecord(9L, "k2", "5a11", null, null, null));
+ expectedOutput.add(new Watermark(9));
+ expectedOutput.add(insertRecord(11L, "k2", "5a12", 10L, "k2", "2a6"));
+ expectedOutput.add(new Watermark(13));
+
+ testRowTimeTemporalJoinOnUpsertSource(true, expectedOutput);
+ }
+
+ private void testRowTimeTemporalJoinOnUpsertSource(
+ boolean isLeftOuterJoin, List<Object> expectedOutput) throws
Exception {
+ TemporalRowTimeJoinOperator joinOperator =
+ new TemporalRowTimeJoinOperator(
+ rowType, rowType, joinCondition, 0, 0, 0, 0,
isLeftOuterJoin);
+ KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData,
RowData> testHarness =
+ createTestHarness(joinOperator);
+
+ testHarness.open();
+
+ testHarness.processWatermark1(new Watermark(1));
+ testHarness.processWatermark2(new Watermark(1));
+
+ testHarness.processElement1(insertRecord(1L, "k1", "1a1"));
+ testHarness.processElement2(insertRecord(2L, "k1", "1a2"));
+
+ testHarness.processWatermark1(new Watermark(2));
+ testHarness.processWatermark2(new Watermark(2));
+
+ testHarness.processElement1(updateAfterRecord(3L, "k1", "1a3"));
+ testHarness.processElement2(insertRecord(4L, "k2", "2a4"));
+
+ testHarness.processWatermark1(new Watermark(5));
+ testHarness.processWatermark2(new Watermark(5));
+
+ testHarness.processElement1(insertRecord(6L, "k2", "2a3"));
+ testHarness.processElement2(updateAfterRecord(7L, "k2", "2a5"));
+
+ testHarness.processWatermark1(new Watermark(8));
+ testHarness.processWatermark2(new Watermark(9));
+
+ testHarness.processElement1(insertRecord(9L, "k2", "5a11"));
+ testHarness.processElement1(insertRecord(11L, "k2", "5a12"));
+ testHarness.processElement2(deleteRecord(9L, "k2", "2a5"));
+ testHarness.processElement2(insertRecord(10L, "k2", "2a6"));
+
+ testHarness.processWatermark1(new Watermark(13));
+ testHarness.processWatermark2(new Watermark(13));
+
+ assertor.assertOutputEquals("output wrong.", expectedOutput,
testHarness.getOutput());
+ testHarness.close();
+ }
+
private KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData,
RowData>
createTestHarness(TemporalRowTimeJoinOperator
temporalJoinOperator) throws Exception {