This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new eb44ac01c99 [FLINK-29849][table-planner] Fix event time temporal join 
on an upsert source may produce incorrect execution plan
eb44ac01c99 is described below

commit eb44ac01c9969cb22ab832b6b2155b109f015b06
Author: lincoln.lil <[email protected]>
AuthorDate: Wed Nov 2 16:37:45 2022 +0800

    [FLINK-29849][table-planner] Fix event time temporal join on an upsert 
source may produce incorrect execution plan
    
    This closes #21219
---
 .../logical/EventTimeTemporalJoinRewriteRule.java  | 474 +++++++++++++++++++++
 .../plan/rules/logical/FlinkFilterJoinRule.java    |   2 +
 .../planner/plan/utils/TemporalTableJoinUtil.java  |  69 +++
 .../logical/FlinkLogicalTableSourceScan.scala      |  30 +-
 .../stream/StreamPhysicalTemporalJoin.scala        |   4 +-
 .../plan/optimize/program/FlinkStreamProgram.scala |  19 +-
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |   2 +-
 .../TemporalJoinRewriteWithUniqueKeyRule.scala     |   7 +-
 .../stream/StreamPhysicalTableSourceScanRule.scala |   4 +-
 .../planner/plan/utils/TemporalJoinUtil.scala      |   7 +-
 ...AssignerChangelogNormalizeTransposeRuleTest.xml |   8 +-
 .../planner/plan/stream/sql/TableScanTest.xml      |   8 +-
 .../plan/stream/sql/join/TemporalJoinTest.xml      |  38 ++
 ...signerChangelogNormalizeTransposeRuleTest.scala |   2 +
 .../plan/stream/sql/join/TemporalJoinTest.scala    |  64 ++-
 .../runtime/stream/sql/TemporalJoinITCase.scala    |   5 +-
 .../temporal/TemporalRowTimeJoinOperatorTest.java  |  77 ++++
 17 files changed, 787 insertions(+), 33 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java
new file mode 100644
index 00000000000..e5a5f57bb52
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType;
+import org.apache.flink.table.planner.plan.utils.TemporalTableJoinUtil;
+
+import 
org.apache.flink.shaded.curator5.org.apache.curator.shaded.com.google.common.collect.Lists;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RuleSet;
+import org.apache.calcite.tools.RuleSets;
+
+/**
+ * Traverses an event time temporal table join {@link RelNode} tree and update 
the right child to
+ * set {@link FlinkLogicalTableSourceScan}'s eventTimeSnapshot property to 
true which will prevent
+ * it generating a new StreamPhysicalChangelogNormalize later.
+ *
+ * <p>the match patterns are as following(8 variants, the three `Calc` nodes 
are all optional):
+ *
+ * <pre>{@code
+ *    Join (event time temporal)
+ *      /       \
+ * RelNode     [Calc]
+ *               \
+ *             Snapshot
+ *                \
+ *              [Calc]
+ *                 \
+ *             WatermarkAssigner
+ *                  \
+ *                [Calc]
+ *                   \
+ *                TableScan
+ * }</pre>
+ *
+ * <p>Note: This rule can only be used in a separate {@link 
org.apache.calcite.plan.hep.HepProgram}
+ * after `LOGICAL_REWRITE` rule sets are applied for now.
+ */
+public class EventTimeTemporalJoinRewriteRule
+        extends RelRule<EventTimeTemporalJoinRewriteRule.Config> {
+
+    public static final RuleSet EVENT_TIME_TEMPORAL_JOIN_REWRITE_RULES =
+            RuleSets.ofList(
+                    Config.JOIN_CALC_SNAPSHOT_CALC_WMA_CALC_TS.toRule(),
+                    Config.JOIN_CALC_SNAPSHOT_CALC_WMA_TS.toRule(),
+                    Config.JOIN_CALC_SNAPSHOT_WMA_CALC_TS.toRule(),
+                    Config.JOIN_CALC_SNAPSHOT_WMA_TS.toRule(),
+                    Config.JOIN_SNAPSHOT_CALC_WMA_CALC_TS.toRule(),
+                    Config.JOIN_SNAPSHOT_CALC_WMA_TS.toRule(),
+                    Config.JOIN_SNAPSHOT_WMA_CALC_TS.toRule(),
+                    Config.JOIN_SNAPSHOT_WMA_TS.toRule());
+
+    public EventTimeTemporalJoinRewriteRule(Config config) {
+        super(config);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        FlinkLogicalJoin join = call.rel(0);
+        RexNode joinCondition = join.getCondition();
+        // only matches event time temporal join
+        return joinCondition != null
+                && 
TemporalTableJoinUtil.isEventTimeTemporalJoin(joinCondition);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        FlinkLogicalJoin join = call.rel(0);
+        FlinkLogicalRel joinRightChild = call.rel(2);
+        RelNode newRight = transmitSnapshotRequirement(joinRightChild);
+        call.transformTo(
+                join.copy(join.getTraitSet(), 
Lists.newArrayList(join.getLeft(), newRight)));
+    }
+
+    private RelNode transmitSnapshotRequirement(RelNode node) {
+        if (node instanceof FlinkLogicalCalc) {
+            final FlinkLogicalCalc calc = (FlinkLogicalCalc) node;
+            // filter is not allowed because it will corrupt the version table
+            if (null != calc.getProgram().getCondition()) {
+                throw new TableException(
+                        "Filter is not allowed for right changelog input of 
event time temporal join,"
+                                + " it will corrupt the versioning of data. 
Please consider removing the filter before joining.");
+            }
+
+            final RelNode child = calc.getInput();
+            final RelNode newChild = transmitSnapshotRequirement(child);
+            if (newChild != child) {
+                return calc.copy(calc.getTraitSet(), newChild, 
calc.getProgram());
+            }
+            return calc;
+        }
+        if (node instanceof FlinkLogicalSnapshot) {
+            final FlinkLogicalSnapshot snapshot = (FlinkLogicalSnapshot) node;
+            assert isEventTime(snapshot.getPeriod().getType());
+            final RelNode child = snapshot.getInput();
+            final RelNode newChild = transmitSnapshotRequirement(child);
+            if (newChild != child) {
+                return snapshot.copy(snapshot.getTraitSet(), newChild, 
snapshot.getPeriod());
+            }
+            return snapshot;
+        }
+        if (node instanceof HepRelVertex) {
+            return transmitSnapshotRequirement(((HepRelVertex) 
node).getCurrentRel());
+        }
+        if (node instanceof FlinkLogicalWatermarkAssigner) {
+            final FlinkLogicalWatermarkAssigner wma = 
(FlinkLogicalWatermarkAssigner) node;
+            final RelNode child = wma.getInput();
+            final RelNode newChild = transmitSnapshotRequirement(child);
+            if (newChild != child) {
+                return wma.copy(
+                        wma.getTraitSet(), newChild, wma.rowtimeFieldIndex(), 
wma.watermarkExpr());
+            }
+            return wma;
+        }
+        if (node instanceof FlinkLogicalTableSourceScan) {
+            final FlinkLogicalTableSourceScan ts = 
(FlinkLogicalTableSourceScan) node;
+            // update eventTimeSnapshotRequired to true
+            return ts.copy(ts.getTraitSet(), ts.relOptTable(), true);
+        }
+        return node;
+    }
+
+    private boolean isEventTime(RelDataType period) {
+        if (period instanceof TimeIndicatorRelDataType) {
+            return ((TimeIndicatorRelDataType) period).isEventTime();
+        }
+        return false;
+    }
+
+    /**
+     * Configuration for {@link EventTimeTemporalJoinRewriteRule}.
+     *
+     * <p>Operator tree:
+     *
+     * <pre>{@code
+     *    Join (event time temporal)
+     *      /       \
+     * RelNode     [Calc]
+     *               \
+     *             Snapshot
+     *                \
+     *              [Calc]
+     *                 \
+     *             WatermarkAssigner
+     *                  \
+     *                [Calc]
+     *                   \
+     *                TableScan
+     * }</pre>
+     *
+     * <p>8 variants:
+     *
+     * <ul>
+     *   <li>JOIN_CALC_SNAPSHOT_CALC_WMA_CALC_TS
+     *   <li>JOIN_CALC_SNAPSHOT_CALC_WMA_TS
+     *   <li>JOIN_CALC_SNAPSHOT_WMA_CALC_TS
+     *   <li>JOIN_CALC_SNAPSHOT_WMA_TS
+     *   <li>JOIN_SNAPSHOT_CALC_WMA_CALC_TS
+     *   <li>JOIN_SNAPSHOT_CALC_WMA_TS
+     *   <li>JOIN_SNAPSHOT_WMA_CALC_TS
+     *   <li>JOIN_SNAPSHOT_WMA_TS
+     * </ul>
+     */
+    public interface Config extends RelRule.Config {
+        RelRule.Config JOIN_CALC_SNAPSHOT_CALC_WMA_CALC_TS =
+                EMPTY.withDescription(
+                                
"EventTimeTemporalJoinRewriteRule_CALC_SNAPSHOT_CALC_WMA_CALC")
+                        .as(Config.class)
+                        .withOperandSupplier(
+                                joinTransform ->
+                                        joinTransform
+                                                
.operand(FlinkLogicalJoin.class)
+                                                .inputs(
+                                                        left ->
+                                                                
left.operand(FlinkLogicalRel.class)
+                                                                        
.anyInputs(),
+                                                        right ->
+                                                                right.operand(
+                                                                               
 FlinkLogicalCalc
+                                                                               
         .class)
+                                                                        
.oneInput(
+                                                                               
 r1 ->
+                                                                               
         r1.operand(
+                                                                               
                         FlinkLogicalSnapshot
+                                                                               
                                 .class)
+                                                                               
                 .oneInput(
+                                                                               
                         r2 ->
+                                                                               
                                 r2.operand(
+                                                                               
                                                 FlinkLogicalCalc
+                                                                               
                                                         .class)
+                                                                               
                                         .oneInput(
+                                                                               
                                                 r3 ->
+                                                                               
                                                         r3.operand(
+                                                                               
                                                                         
FlinkLogicalWatermarkAssigner
+                                                                               
                                                                                
 .class)
+                                                                               
                                                                 .oneInput(
+                                                                               
                                                                         r4 ->
+                                                                               
                                                                                
 r4.operand(
+                                                                               
                                                                                
                 FlinkLogicalCalc
+                                                                               
                                                                                
                         .class)
+                                                                               
                                                                                
         .oneInput(
+                                                                               
                                                                                
                 r5 ->
+                                                                               
                                                                                
                         r5.operand(
+                                                                               
                                                                                
                                         FlinkLogicalTableSourceScan
+                                                                               
                                                                                
                                                 .class)
+                                                                               
                                                                                
                                 .noInputs())))))));
+        RelRule.Config JOIN_CALC_SNAPSHOT_CALC_WMA_TS =
+                
EMPTY.withDescription("EventTimeTemporalJoinRewriteRule_CALC_SNAPSHOT_CALC_WMA")
+                        .as(Config.class)
+                        .withOperandSupplier(
+                                joinTransform ->
+                                        joinTransform
+                                                
.operand(FlinkLogicalJoin.class)
+                                                .inputs(
+                                                        left ->
+                                                                
left.operand(FlinkLogicalRel.class)
+                                                                        
.anyInputs(),
+                                                        right ->
+                                                                right.operand(
+                                                                               
 FlinkLogicalCalc
+                                                                               
         .class)
+                                                                        
.oneInput(
+                                                                               
 r1 ->
+                                                                               
         r1.operand(
+                                                                               
                         FlinkLogicalSnapshot
+                                                                               
                                 .class)
+                                                                               
                 .oneInput(
+                                                                               
                         r2 ->
+                                                                               
                                 r2.operand(
+                                                                               
                                                 FlinkLogicalCalc
+                                                                               
                                                         .class)
+                                                                               
                                         .oneInput(
+                                                                               
                                                 r3 ->
+                                                                               
                                                         r3.operand(
+                                                                               
                                                                         
FlinkLogicalWatermarkAssigner
+                                                                               
                                                                                
 .class)
+                                                                               
                                                                 .oneInput(
+                                                                               
                                                                         r4 ->
+                                                                               
                                                                                
 r4.operand(
+                                                                               
                                                                                
                 FlinkLogicalTableSourceScan
+                                                                               
                                                                                
                         .class)
+                                                                               
                                                                                
         .noInputs()))))));
+
+        RelRule.Config JOIN_CALC_SNAPSHOT_WMA_CALC_TS =
+                
EMPTY.withDescription("EventTimeTemporalJoinRewriteRule_CALC_SNAPSHOT_WMA_CALC")
+                        .as(Config.class)
+                        .withOperandSupplier(
+                                joinTransform ->
+                                        joinTransform
+                                                
.operand(FlinkLogicalJoin.class)
+                                                .inputs(
+                                                        left ->
+                                                                
left.operand(FlinkLogicalRel.class)
+                                                                        
.anyInputs(),
+                                                        right ->
+                                                                right.operand(
+                                                                               
 FlinkLogicalCalc
+                                                                               
         .class)
+                                                                        
.oneInput(
+                                                                               
 r1 ->
+                                                                               
         r1.operand(
+                                                                               
                         FlinkLogicalSnapshot
+                                                                               
                                 .class)
+                                                                               
                 .oneInput(
+                                                                               
                         r2 ->
+                                                                               
                                 r2.operand(
+                                                                               
                                                 FlinkLogicalWatermarkAssigner
+                                                                               
                                                         .class)
+                                                                               
                                         .oneInput(
+                                                                               
                                                 r3 ->
+                                                                               
                                                         r3.operand(
+                                                                               
                                                                         
FlinkLogicalCalc
+                                                                               
                                                                                
 .class)
+                                                                               
                                                                 .oneInput(
+                                                                               
                                                                         r4 ->
+                                                                               
                                                                                
 r4.operand(
+                                                                               
                                                                                
                 FlinkLogicalTableSourceScan
+                                                                               
                                                                                
                         .class)
+                                                                               
                                                                                
         .noInputs()))))));
+        RelRule.Config JOIN_CALC_SNAPSHOT_WMA_TS =
+                
EMPTY.withDescription("EventTimeTemporalJoinRewriteRule_CALC_SNAPSHOT_WMA")
+                        .as(Config.class)
+                        .withOperandSupplier(
+                                joinTransform ->
+                                        joinTransform
+                                                
.operand(FlinkLogicalJoin.class)
+                                                .inputs(
+                                                        left ->
+                                                                
left.operand(FlinkLogicalRel.class)
+                                                                        
.anyInputs(),
+                                                        right ->
+                                                                right.operand(
+                                                                               
 FlinkLogicalCalc
+                                                                               
         .class)
+                                                                        
.oneInput(
+                                                                               
 r1 ->
+                                                                               
         r1.operand(
+                                                                               
                         FlinkLogicalSnapshot
+                                                                               
                                 .class)
+                                                                               
                 .oneInput(
+                                                                               
                         r2 ->
+                                                                               
                                 r2.operand(
+                                                                               
                                                 FlinkLogicalWatermarkAssigner
+                                                                               
                                                         .class)
+                                                                               
                                         .oneInput(
+                                                                               
                                                 r3 ->
+                                                                               
                                                         r3.operand(
+                                                                               
                                                                         
FlinkLogicalTableSourceScan
+                                                                               
                                                                                
 .class)
+                                                                               
                                                                 
.noInputs())))));
+
+        RelRule.Config JOIN_SNAPSHOT_CALC_WMA_CALC_TS =
+                
EMPTY.withDescription("EventTimeTemporalJoinRewriteRule_SNAPSHOT_CALC_WMA_CALC")
+                        .as(Config.class)
+                        .withOperandSupplier(
+                                joinTransform ->
+                                        joinTransform
+                                                
.operand(FlinkLogicalJoin.class)
+                                                .inputs(
+                                                        left ->
+                                                                
left.operand(FlinkLogicalRel.class)
+                                                                        
.anyInputs(),
+                                                        right ->
+                                                                right.operand(
+                                                                               
 FlinkLogicalSnapshot
+                                                                               
         .class)
+                                                                        
.oneInput(
+                                                                               
 r1 ->
+                                                                               
         r1.operand(
+                                                                               
                         FlinkLogicalCalc
+                                                                               
                                 .class)
+                                                                               
                 .oneInput(
+                                                                               
                         r2 ->
+                                                                               
                                 r2.operand(
+                                                                               
                                                 FlinkLogicalWatermarkAssigner
+                                                                               
                                                         .class)
+                                                                               
                                         .oneInput(
+                                                                               
                                                 r3 ->
+                                                                               
                                                         r3.operand(
+                                                                               
                                                                         
FlinkLogicalCalc
+                                                                               
                                                                                
 .class)
+                                                                               
                                                                 .oneInput(
+                                                                               
                                                                         r4 ->
+                                                                               
                                                                                
 r4.operand(
+                                                                               
                                                                                
                 FlinkLogicalTableSourceScan
+                                                                               
                                                                                
                         .class)
+                                                                               
                                                                                
         .noInputs()))))));
+        RelRule.Config JOIN_SNAPSHOT_CALC_WMA_TS =
+                
EMPTY.withDescription("EventTimeTemporalJoinRewriteRule_SNAPSHOT_CALC_WMA")
+                        .as(Config.class)
+                        .withOperandSupplier(
+                                joinTransform ->
+                                        joinTransform
+                                                
.operand(FlinkLogicalJoin.class)
+                                                .inputs(
+                                                        left ->
+                                                                
left.operand(FlinkLogicalRel.class)
+                                                                        
.anyInputs(),
+                                                        right ->
+                                                                right.operand(
+                                                                               
 FlinkLogicalSnapshot
+                                                                               
         .class)
+                                                                        
.oneInput(
+                                                                               
 r1 ->
+                                                                               
         r1.operand(
+                                                                               
                         FlinkLogicalCalc
+                                                                               
                                 .class)
+                                                                               
                 .oneInput(
+                                                                               
                         r2 ->
+                                                                               
                                 r2.operand(
+                                                                               
                                                 FlinkLogicalWatermarkAssigner
+                                                                               
                                                         .class)
+                                                                               
                                         .oneInput(
+                                                                               
                                                 r3 ->
+                                                                               
                                                         r3.operand(
+                                                                               
                                                                         
FlinkLogicalTableSourceScan
+                                                                               
                                                                                
 .class)
+                                                                               
                                                                 
.noInputs())))));
+
+        RelRule.Config JOIN_SNAPSHOT_WMA_CALC_TS =
+                
EMPTY.withDescription("EventTimeTemporalJoinRewriteRule_SNAPSHOT_WMA_CALC")
+                        .as(Config.class)
+                        .withOperandSupplier(
+                                joinTransform ->
+                                        joinTransform
+                                                
.operand(FlinkLogicalJoin.class)
+                                                .inputs(
+                                                        left ->
+                                                                
left.operand(FlinkLogicalRel.class)
+                                                                        
.anyInputs(),
+                                                        right ->
+                                                                right.operand(
+                                                                               
 FlinkLogicalSnapshot
+                                                                               
         .class)
+                                                                        
.oneInput(
+                                                                               
 r1 ->
+                                                                               
         r1.operand(
+                                                                               
                         FlinkLogicalWatermarkAssigner
+                                                                               
                                 .class)
+                                                                               
                 .oneInput(
+                                                                               
                         r2 ->
+                                                                               
                                 r2.operand(
+                                                                               
                                                 FlinkLogicalCalc
+                                                                               
                                                         .class)
+                                                                               
                                         .oneInput(
+                                                                               
                                                 r3 ->
+                                                                               
                                                         r3.operand(
+                                                                               
                                                                         
FlinkLogicalTableSourceScan
+                                                                               
                                                                                
 .class)
+                                                                               
                                                                 
.noInputs())))));
+
+        RelRule.Config JOIN_SNAPSHOT_WMA_TS =
+                
EMPTY.withDescription("EventTimeTemporalJoinRewriteRule_SNAPSHOT_WMA")
+                        .as(Config.class)
+                        .withOperandSupplier(
+                                joinTransform ->
+                                        joinTransform
+                                                
.operand(FlinkLogicalJoin.class)
+                                                .inputs(
+                                                        left ->
+                                                                
left.operand(FlinkLogicalRel.class)
+                                                                        
.anyInputs(),
+                                                        right ->
+                                                                right.operand(
+                                                                               
 FlinkLogicalSnapshot
+                                                                               
         .class)
+                                                                        
.oneInput(
+                                                                               
 r1 ->
+                                                                               
         r1.operand(
+                                                                               
                         FlinkLogicalWatermarkAssigner
+                                                                               
                                 .class)
+                                                                               
                 .oneInput(
+                                                                               
                         r2 ->
+                                                                               
                                 r2.operand(
+                                                                               
                                                 FlinkLogicalTableSourceScan
+                                                                               
                                                         .class)
+                                                                               
                                         .noInputs()))));
+
+        @Override
+        default RelOptRule toRule() {
+            return new EventTimeTemporalJoinRewriteRule(this);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java
index b7f7f727da5..d1c58a000b4 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java
@@ -450,6 +450,8 @@ public abstract class FlinkFilterJoinRule<C extends 
FlinkFilterJoinRule.Config>
      * Rule that tries to push filter expressions into a join condition and 
into the inputs of the
      * join.
      *
+     * <p>Note: It never pushes a filter into an event time temporal join in 
streaming.
+     *
      * @see CoreRules#FILTER_INTO_JOIN
      */
     public static class FlinkFilterIntoJoinRule
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java
new file mode 100644
index 00000000000..38b5c705b0d
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexVisitor;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.util.Util;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Utility for temporal table join which will gradually replace the scala 
class {@link
+ * TemporalJoinUtil}.
+ */
+public class TemporalTableJoinUtil {
+
+    /**
+     * Check if the given join condition is an initial temporal join condition 
or a rewrote join
+     * condition on event time.
+     */
+    public static boolean isEventTimeTemporalJoin(@Nonnull RexNode 
joinCondition) {
+        RexVisitor<Void> temporalConditionFinder =
+                new RexVisitorImpl<Void>(true) {
+                    @Override
+                    public Void visitCall(RexCall call) {
+                        if ((call.getOperator()
+                                                == TemporalJoinUtil
+                                                        
.INITIAL_TEMPORAL_JOIN_CONDITION()
+                                        && 
TemporalJoinUtil.isInitialRowTimeTemporalTableJoin(call))
+                                || isRowTimeTemporalTableJoinCondition(call)) {
+                            // has initial temporal join condition or
+                            throw new Util.FoundOne(call);
+                        }
+                        return super.visitCall(call);
+                    }
+                };
+        try {
+            joinCondition.accept(temporalConditionFinder);
+        } catch (Util.FoundOne found) {
+            return true;
+        }
+        return false;
+    }
+
+    /** Check if the given rexCall is a rewrote join condition on event time. 
*/
+    public static boolean isRowTimeTemporalTableJoinCondition(RexCall call) {
+        // (LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, LEFT_KEY, RIGHT_KEY, 
PRIMARY_KEY)
+        return call.getOperator() == TemporalJoinUtil.TEMPORAL_JOIN_CONDITION()
+                && call.operands.size() == 5;
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
index 080005b6714..ba4f16dee43 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -46,20 +46,43 @@ class FlinkLogicalTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     hints: util.List[RelHint],
-    relOptTable: TableSourceTable)
+    val relOptTable: TableSourceTable,
+    val eventTimeSnapshotRequired: Boolean = false)
   extends TableScan(cluster, traitSet, hints, relOptTable)
   with FlinkLogicalRel {
 
   lazy val tableSource: DynamicTableSource = relOptTable.tableSource
 
+  def copy(
+      traitSet: RelTraitSet,
+      tableSourceTable: TableSourceTable,
+      eventTimeSnapshotRequired: Boolean): FlinkLogicalTableSourceScan = {
+    new FlinkLogicalTableSourceScan(
+      cluster,
+      traitSet,
+      getHints,
+      tableSourceTable,
+      eventTimeSnapshotRequired)
+  }
+
   def copy(
       traitSet: RelTraitSet,
       tableSourceTable: TableSourceTable): FlinkLogicalTableSourceScan = {
-    new FlinkLogicalTableSourceScan(cluster, traitSet, getHints, 
tableSourceTable)
+    new FlinkLogicalTableSourceScan(
+      cluster,
+      traitSet,
+      getHints,
+      tableSourceTable,
+      eventTimeSnapshotRequired)
   }
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
-    new FlinkLogicalTableSourceScan(cluster, traitSet, getHints, relOptTable)
+    new FlinkLogicalTableSourceScan(
+      cluster,
+      traitSet,
+      getHints,
+      relOptTable,
+      eventTimeSnapshotRequired)
   }
 
   override def deriveRowType(): RelDataType = {
@@ -79,6 +102,7 @@ class FlinkLogicalTableSourceScan(
       .explainTerms(pw)
       .item("fields", getRowType.getFieldNames.mkString(", "))
       .itemIf("hints", RelExplainUtil.hintsToString(getHints), 
!getHints.isEmpty)
+      .itemIf("eventTimeSnapshotRequired", "true", eventTimeSnapshotRequired)
   }
 
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTemporalJoin.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTemporalJoin.scala
index e5f1450a478..011c4c63f78 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTemporalJoin.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTemporalJoin.scala
@@ -23,7 +23,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
 import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin
 import 
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalJoin
-import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
+import org.apache.flink.table.planner.plan.utils.{TemporalJoinUtil, 
TemporalTableJoinUtil}
 import 
org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.{TEMPORAL_JOIN_CONDITION,
 TEMPORAL_JOIN_CONDITION_PRIMARY_KEY}
 import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
 import org.apache.flink.util.Preconditions.checkState
@@ -166,7 +166,7 @@ class StreamPhysicalTemporalJoin(
       }
 
       if (
-        TemporalJoinUtil.isRowTimeTemporalTableJoinCon(call) ||
+        TemporalTableJoinUtil.isRowTimeTemporalTableJoinCondition(call) ||
         TemporalJoinUtil.isRowTimeTemporalFunctionJoinCon(call)
       ) {
         leftTimeAttribute = Some(call.getOperands.get(0))
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
index 4a3908af658..fb8b8ef1f0e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
@@ -21,6 +21,7 @@ import org.apache.flink.configuration.ReadableConfig
 import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets
+import 
org.apache.flink.table.planner.plan.rules.logical.EventTimeTemporalJoinRewriteRule
 
 import org.apache.calcite.plan.hep.HepMatchOrder
 
@@ -251,10 +252,20 @@ object FlinkStreamProgram {
     // logical rewrite
     chainedProgram.addLast(
       LOGICAL_REWRITE,
-      FlinkHepRuleSetProgramBuilder.newBuilder
-        .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
-        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
-        .add(FlinkStreamRuleSets.LOGICAL_REWRITE)
+      FlinkGroupProgramBuilder
+        .newBuilder[StreamOptimizeContext]
+        .addProgram(
+          FlinkHepRuleSetProgramBuilder.newBuilder
+            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+            .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+            .add(FlinkStreamRuleSets.LOGICAL_REWRITE)
+            .build())
+        .addProgram(
+          FlinkHepRuleSetProgramBuilder.newBuilder
+            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+            .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+            
.add(EventTimeTemporalJoinRewriteRule.EVENT_TIME_TEMPORAL_JOIN_REWRITE_RULES)
+            .build())
         .build()
     )
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index f46ef46a7c0..bc2b5a08e2f 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -135,7 +135,7 @@ object FlinkStreamRuleSets {
 
   /** RuleSet about filter */
   private val FILTER_RULES: RuleSet = RuleSets.ofList(
-    // push a filter into a join
+    // push a filter into a join (which isn't an event time temporal join)
     FlinkFilterJoinRule.FILTER_INTO_JOIN,
     // push filter into the children of a join
     FlinkFilterJoinRule.JOIN_CONDITION_PUSH,
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
index d3ef36ca150..74a17e5f55e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
@@ -64,6 +64,7 @@ class TemporalJoinRewriteWithUniqueKeyRule
     val join = call.rel[FlinkLogicalJoin](0)
     val leftInput = call.rel[FlinkLogicalRel](1)
     val snapshot = call.rel[FlinkLogicalSnapshot](2)
+    val snapshotInput = call.rel[FlinkLogicalRel](3)
 
     val joinCondition = join.getCondition
 
@@ -84,7 +85,8 @@ class TemporalJoinRewriteWithUniqueKeyRule
             }
 
           val rexBuilder = join.getCluster.getRexBuilder
-          val primaryKeyInputRefs = extractPrimaryKeyInputRefs(leftInput, 
snapshot, rexBuilder)
+          val primaryKeyInputRefs =
+            extractPrimaryKeyInputRefs(leftInput, snapshot, snapshotInput, 
rexBuilder)
           validateRightPrimaryKey(join, rightJoinKey, primaryKeyInputRefs)
 
           if (TemporalJoinUtil.isInitialRowTimeTemporalTableJoin(call)) {
@@ -166,11 +168,12 @@ class TemporalJoinRewriteWithUniqueKeyRule
   private def extractPrimaryKeyInputRefs(
       leftInput: RelNode,
       snapshot: FlinkLogicalSnapshot,
+      snapshotInput: FlinkLogicalRel,
       rexBuilder: RexBuilder): Option[Seq[RexNode]] = {
     val rightFields = snapshot.getRowType.getFieldList
     val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(snapshot.getCluster.getMetadataQuery)
 
-    val upsertKeySet = fmq.getUpsertKeys(snapshot.getInput())
+    val upsertKeySet = fmq.getUpsertKeys(snapshotInput)
     val fields = snapshot.getRowType.getFieldList
 
     if (upsertKeySet != null && upsertKeySet.size() > 0) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
index b26a71818b2..e7c390e07bd 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
@@ -69,8 +69,8 @@ class StreamPhysicalTableSourceScanRule
     val resolvedSchema = table.contextResolvedTable.getResolvedSchema
 
     if (
-      isUpsertSource(resolvedSchema, table.tableSource) ||
-      isSourceChangeEventsDuplicate(resolvedSchema, table.tableSource, 
tableConfig)
+      !scan.eventTimeSnapshotRequired && (isUpsertSource(resolvedSchema, 
table.tableSource) ||
+        isSourceChangeEventsDuplicate(resolvedSchema, table.tableSource, 
tableConfig))
     ) {
       // generate changelog normalize node
       // primary key has been validated in CatalogSourceTable
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
index def8d430757..41c3dee6d5f 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
@@ -284,7 +284,7 @@ object TemporalJoinUtil {
     val visitor = new RexVisitorImpl[Unit](true) {
       override def visitCall(call: RexCall): Unit = {
         if (
-          isRowTimeTemporalTableJoinCon(call) ||
+          TemporalTableJoinUtil.isRowTimeTemporalTableJoinCondition(call) ||
           isRowTimeTemporalFunctionJoinCon(call)
         ) {
           rowtimeJoin = true
@@ -297,11 +297,6 @@ object TemporalJoinUtil {
     rowtimeJoin
   }
 
-  def isRowTimeTemporalTableJoinCon(rexCall: RexCall): Boolean = {
-    // (LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, LEFT_KEY, RIGHT_KEY, 
PRIMARY_KEY)
-    rexCall.getOperator == TEMPORAL_JOIN_CONDITION && rexCall.operands.length 
== 5
-  }
-
   def isRowTimeTemporalFunctionJoinCon(rexCall: RexCall): Boolean = {
     // (LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)
     rexCall.getOperator == TEMPORAL_JOIN_CONDITION && rexCall.operands.length 
== 3
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
index 3d4f6f2835a..32fc6188b9d 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
@@ -220,11 +220,9 @@ Calc(select=[a, b, f], where=[f], changelogMode=[I])
    :     +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*) AS 
ingestion_time, a, b], changelogMode=[I])
    :        +- TableSourceScan(table=[[default_catalog, default_database, 
t1]], fields=[a, b, ingestion_time], changelogMode=[I])
    +- Exchange(distribution=[hash[a]], changelogMode=[I,UA,D])
-      +- ChangelogNormalize(key=[a], changelogMode=[I,UA,D])
-         +- Exchange(distribution=[hash[a]], changelogMode=[I,UA,D])
-            +- WatermarkAssigner(rowtime=[ingestion_time], 
watermark=[ingestion_time], changelogMode=[I,UA,D])
-               +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*) 
AS ingestion_time, a, f], changelogMode=[I,UA,D])
-                  +- TableSourceScan(table=[[default_catalog, 
default_database, t2, project=[a, f], metadata=[ts]]], fields=[a, f, 
ingestion_time], changelogMode=[I,UA,D])
+      +- WatermarkAssigner(rowtime=[ingestion_time], 
watermark=[ingestion_time], changelogMode=[I,UA,D])
+         +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*) AS 
ingestion_time, a, f], changelogMode=[I,UA,D])
+            +- TableSourceScan(table=[[default_catalog, default_database, t2, 
project=[a, f], metadata=[ts]]], fields=[a, f, ingestion_time], 
changelogMode=[I,UA,D])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 62c3334a959..b5445fd0754 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -308,11 +308,9 @@ Calc(select=[currency, amount, rate, *(amount, rate) AS 
EXPR$3], changelogMode=[
    :- Exchange(distribution=[hash[currency]], changelogMode=[I])
    :  +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], 
changelogMode=[I])
    :     +- TableSourceScan(table=[[default_catalog, default_database, 
orders]], fields=[amount, currency, rowtime], changelogMode=[I])
-   +- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D])
-      +- ChangelogNormalize(key=[currency], changelogMode=[I,UA,D])
-         +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
-            +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], 
changelogMode=[UA,D])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
rates_history]], fields=[currency, rate, rowtime], changelogMode=[UA,D])
+   +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
+      +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], 
changelogMode=[UA,D])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
rates_history]], fields=[currency, rate, rowtime], changelogMode=[UA,D])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.xml
index 88a64981853..f809e5b9c24 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.xml
@@ -750,6 +750,44 @@ Calc(select=[amount, currency, rowtime, 
PROCTIME_MATERIALIZE(proctime) AS procti
             +- Exchange(distribution=[hash[currency]])
                +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
                   +- TableSourceScan(table=[[default_catalog, 
default_database, RatesHistory]], fields=[currency, rate, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTemporalJoinUpsertSourceWithPostFilter">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(amount=[$0], currency=[$1], rowtime=[$2], proctime=[$3], 
currency0=[$4], rate=[$5], valid=[$6], rowtime0=[$7])
++- LogicalFilter(condition=[=($6, _UTF-16LE'true')])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{1, 2}])
+      :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+      :  +- LogicalProject(amount=[$0], currency=[$1], rowtime=[$2], 
proctime=[PROCTIME()])
+      :     +- LogicalTableScan(table=[[default_catalog, default_database, 
Orders]])
+      +- LogicalFilter(condition=[=($cor0.currency, $0)])
+         +- LogicalSnapshot(period=[$cor0.rowtime])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
+               +- LogicalTableScan(table=[[default_catalog, default_database, 
UpsertRates]])
+
+== Optimized Physical Plan ==
+Calc(select=[amount, currency, rowtime, PROCTIME_MATERIALIZE(proctime) AS 
proctime, currency0, rate, CAST(_UTF-16LE'true':VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS valid, 
CAST(rowtime0 AS TIMESTAMP(3)) AS rowtime0], where=[=(valid, 
_UTF-16LE'true':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")], 
changelogMode=[I])
++- TemporalJoin(joinType=[InnerJoin], where=[AND(=(currency, currency0), 
__TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, 
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency0), 
__TEMPORAL_JOIN_LEFT_KEY(currency), __TEMPORAL_JOIN_RIGHT_KEY(currency0)))], 
select=[amount, currency, rowtime, proctime, currency0, rate, valid, rowtime0], 
changelogMode=[I])
+   :- Exchange(distribution=[hash[currency]], changelogMode=[I])
+   :  +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], 
changelogMode=[I])
+   :     +- Calc(select=[amount, currency, rowtime, PROCTIME() AS proctime], 
changelogMode=[I])
+   :        +- TableSourceScan(table=[[default_catalog, default_database, 
Orders]], fields=[amount, currency, rowtime], changelogMode=[I])
+   +- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D])
+      +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], 
changelogMode=[I,UA,D])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
UpsertRates]], fields=[currency, rate, valid, rowtime], changelogMode=[I,UA,D])
+
+== Optimized Execution Plan ==
+Calc(select=[amount, currency, rowtime, PROCTIME_MATERIALIZE(proctime) AS 
proctime, currency0, rate, CAST('true' AS VARCHAR(2147483647)) AS valid, 
CAST(rowtime0 AS TIMESTAMP(3)) AS rowtime0], where=[(valid = 'true')])
++- TemporalJoin(joinType=[InnerJoin], where=[((currency = currency0) AND 
__TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, 
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency0), 
__TEMPORAL_JOIN_LEFT_KEY(currency), __TEMPORAL_JOIN_RIGHT_KEY(currency0)))], 
select=[amount, currency, rowtime, proctime, currency0, rate, valid, rowtime0])
+   :- Exchange(distribution=[hash[currency]])
+   :  +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+   :     +- Calc(select=[amount, currency, rowtime, PROCTIME() AS proctime])
+   :        +- TableSourceScan(table=[[default_catalog, default_database, 
Orders]], fields=[amount, currency, rowtime])
+   +- Exchange(distribution=[hash[currency]])
+      +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
UpsertRates]], fields=[currency, rate, valid, rowtime])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala
index c6b9b3b99fa..c02376ef48c 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala
@@ -194,6 +194,8 @@ class WatermarkAssignerChangelogNormalizeTransposeRuleTest 
extends TableTestBase
                     | 'changelog-mode' = 'I,UA,D'
                     |)
       """.stripMargin)
+    // After FLINK-28988 applied, the filter will not be pushed down into left 
input of join and get
+    // a more optimal plan (upsert mode without ChangelogNormalize).
     val sql =
       """
         |SELECT t1.a, t1.b, t2.f
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala
index b0504ca82ff..32757cddfac 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.planner.plan.stream.sql.join
 
-import org.apache.flink.table.api.{ExplainDetail, ValidationException}
+import org.apache.flink.table.api.{ExplainDetail, TableException, 
ValidationException}
 import org.apache.flink.table.planner.utils.{StreamTableTestUtil, 
TableTestBase}
 
 import org.junit.{Before, Test}
@@ -97,6 +97,21 @@ class TemporalJoinTest extends TableTestBase {
                     |)
       """.stripMargin)
 
+    util.addTable("""
+                    |CREATE TABLE UpsertRates (
+                    | currency STRING,
+                    | rate INT,
+                    | valid VARCHAR,
+                    | rowtime TIMESTAMP(3),
+                    | WATERMARK FOR rowtime AS rowtime,
+                    | PRIMARY KEY(currency) NOT ENFORCED
+                    |) WITH (
+                    | 'connector' = 'values',
+                    | 'changelog-mode' = 'I,UA,D',
+                    | 'disable-lookup' = 'true'
+                    |)
+      """.stripMargin)
+
     util.addTable("""
                     |CREATE TABLE RatesOnly (
                     | currency STRING,
@@ -570,6 +585,53 @@ class TemporalJoinTest extends TableTestBase {
     util.verifyExplainInsert(sql, ExplainDetail.CHANGELOG_MODE)
   }
 
+  @Test
+  def testTemporalJoinUpsertSourceWithPostFilter(): Unit = {
+    val sqlQuery = "SELECT * " +
+      "FROM Orders AS o JOIN " +
+      "UpsertRates FOR SYSTEM_TIME AS OF o.rowtime AS r " +
+      "ON o.currency = r.currency WHERE valid = 'true'"
+
+    util.verifyExplain(sqlQuery, ExplainDetail.CHANGELOG_MODE)
+  }
+
+  @Test
+  def testTemporalJoinUpsertSourceWithPreFilter(): Unit = {
+    util.tableEnv.executeSql(s"""
+                                |CREATE TEMPORARY VIEW V1 AS
+                                |SELECT * FROM UpsertRates WHERE valid = 'true'
+                                |""".stripMargin)
+
+    /**
+     * The problem is: there's exists a filter on an upsert changelog 
input(changelogMode=[I,UA,D]),
+     * the UB message must exists for correctness.
+     *
+     * Intermediate plan with modify kind:
+     * {{{
+     * +- TemporalJoin(joinType=[InnerJoin], ..., changelogMode=[I])
+     *    :- Exchange(distribution=[hash[currency]], changelogMode=[I])
+     *      : +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], 
changelogMode=[I])
+     *        : +- Calc(select=[amount, currency, rowtime, ... 
changelogMode=[I])
+     *          : +- TableSourceScan(table= Orders ... changelogMode=[I])
+     *    +- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D])
+     *      +- Calc(select=[currency, ... where=[=(valid, _UTF-16LE'true')], 
changelogMode=[I,UA,D])
+     *        +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], 
changelogMode=[I,UA,D])
+     *          +- TableSourceScan(table= UpsertRates, ... 
changelogMode=[I,UA,D])
+     * }}}
+     */
+
+    val sqlQuery = "SELECT * " +
+      "FROM Orders AS o JOIN " +
+      "V1 FOR SYSTEM_TIME AS OF o.rowtime AS r " +
+      "ON o.currency = r.currency"
+
+    expectExceptionThrown(
+      sqlQuery,
+      "Filter is not allowed for right changelog input of event time temporal 
join, it will corrupt the versioning of data.",
+      classOf[TableException]
+    )
+  }
+
   private def expectExceptionThrown(
       sql: String,
       keywords: String,
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
index 33d6224edfa..60aa95184b6 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
@@ -502,12 +502,13 @@ class TemporalJoinITCase(state: StateBackendMode) extends 
StreamingWithStateTest
   def testEventTimeTemporalJoinWithFilter(): Unit = {
     tEnv.executeSql(
       "CREATE VIEW v1 AS" +
-        " SELECT * FROM versioned_currency_with_single_key WHERE rate < 115")
+        " SELECT * FROM versioned_currency_with_single_key")
     val sql = "INSERT INTO rowtime_default_sink " +
       " SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, 
r.currency_time " +
       " FROM orders_rowtime AS o " +
       " JOIN v1 FOR SYSTEM_TIME AS OF o.order_time as r " +
-      " ON o.currency = r.currency"
+      " ON o.currency = r.currency" +
+      " WHERE rate < 115"
     tEnv.executeSql(sql).await()
     val expected = List(
       "1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01",
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
index 0c243bf7e84..92bf6f28e54 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
@@ -193,6 +193,83 @@ public class TemporalRowTimeJoinOperatorTest extends 
TemporalTimeJoinOperatorTes
         testHarness.close();
     }
 
+    @Test
+    public void testRowTimeTemporalJoinOnUpsertSource() throws Exception {
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(new Watermark(1));
+        expectedOutput.add(new Watermark(2));
+        expectedOutput.add(updateAfterRecord(3L, "k1", "1a3", 2L, "k1", 
"1a2"));
+        expectedOutput.add(new Watermark(5));
+        expectedOutput.add(insertRecord(6L, "k2", "2a3", 4L, "k2", "2a4"));
+        expectedOutput.add(new Watermark(8));
+        expectedOutput.add(new Watermark(9));
+        expectedOutput.add(insertRecord(11L, "k2", "5a12", 10L, "k2", "2a6"));
+        expectedOutput.add(new Watermark(13));
+
+        testRowTimeTemporalJoinOnUpsertSource(false, expectedOutput);
+    }
+
+    @Test
+    public void testRowTimeLeftTemporalJoinOnUpsertSource() throws Exception {
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(new Watermark(1));
+        expectedOutput.add(insertRecord(1L, "k1", "1a1", null, null, null));
+        expectedOutput.add(new Watermark(2));
+        expectedOutput.add(updateAfterRecord(3L, "k1", "1a3", 2L, "k1", 
"1a2"));
+        expectedOutput.add(new Watermark(5));
+        expectedOutput.add(insertRecord(6L, "k2", "2a3", 4L, "k2", "2a4"));
+        expectedOutput.add(new Watermark(8));
+        expectedOutput.add(insertRecord(9L, "k2", "5a11", null, null, null));
+        expectedOutput.add(new Watermark(9));
+        expectedOutput.add(insertRecord(11L, "k2", "5a12", 10L, "k2", "2a6"));
+        expectedOutput.add(new Watermark(13));
+
+        testRowTimeTemporalJoinOnUpsertSource(true, expectedOutput);
+    }
+
+    private void testRowTimeTemporalJoinOnUpsertSource(
+            boolean isLeftOuterJoin, List<Object> expectedOutput) throws 
Exception {
+        TemporalRowTimeJoinOperator joinOperator =
+                new TemporalRowTimeJoinOperator(
+                        rowType, rowType, joinCondition, 0, 0, 0, 0, 
isLeftOuterJoin);
+        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, 
RowData> testHarness =
+                createTestHarness(joinOperator);
+
+        testHarness.open();
+
+        testHarness.processWatermark1(new Watermark(1));
+        testHarness.processWatermark2(new Watermark(1));
+
+        testHarness.processElement1(insertRecord(1L, "k1", "1a1"));
+        testHarness.processElement2(insertRecord(2L, "k1", "1a2"));
+
+        testHarness.processWatermark1(new Watermark(2));
+        testHarness.processWatermark2(new Watermark(2));
+
+        testHarness.processElement1(updateAfterRecord(3L, "k1", "1a3"));
+        testHarness.processElement2(insertRecord(4L, "k2", "2a4"));
+
+        testHarness.processWatermark1(new Watermark(5));
+        testHarness.processWatermark2(new Watermark(5));
+
+        testHarness.processElement1(insertRecord(6L, "k2", "2a3"));
+        testHarness.processElement2(updateAfterRecord(7L, "k2", "2a5"));
+
+        testHarness.processWatermark1(new Watermark(8));
+        testHarness.processWatermark2(new Watermark(9));
+
+        testHarness.processElement1(insertRecord(9L, "k2", "5a11"));
+        testHarness.processElement1(insertRecord(11L, "k2", "5a12"));
+        testHarness.processElement2(deleteRecord(9L, "k2", "2a5"));
+        testHarness.processElement2(insertRecord(10L, "k2", "2a6"));
+
+        testHarness.processWatermark1(new Watermark(13));
+        testHarness.processWatermark2(new Watermark(13));
+
+        assertor.assertOutputEquals("output wrong.", expectedOutput, 
testHarness.getOutput());
+        testHarness.close();
+    }
+
     private KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, 
RowData>
             createTestHarness(TemporalRowTimeJoinOperator 
temporalJoinOperator) throws Exception {
 

Reply via email to