This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 73993fc55e3 [FLINK-38885][table] Migrate `StreamPhysicalJoinRuleBase` 
and its children
73993fc55e3 is described below

commit 73993fc55e379d502c14f154a9bcb587a6539927
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Sat Jan 24 09:49:07 2026 +0100

    [FLINK-38885][table] Migrate `StreamPhysicalJoinRuleBase` and its children
---
 .../stream/StreamPhysicalIntervalJoinRule.java     | 166 +++++++++++++++++++++
 .../physical/stream/StreamPhysicalJoinRule.java    | 123 +++++++++++++++
 .../stream/StreamPhysicalJoinRuleBase.java         | 140 +++++++++++++++++
 .../stream/StreamPhysicalTemporalJoinRule.java     | 107 +++++++++++++
 .../stream/StreamPhysicalIntervalJoinRule.scala    | 122 ---------------
 .../physical/stream/StreamPhysicalJoinRule.scala   |  91 -----------
 .../stream/StreamPhysicalJoinRuleBase.scala        | 109 --------------
 .../stream/StreamPhysicalTemporalJoinRule.scala    |  72 ---------
 .../planner/plan/utils/IntervalJoinUtil.scala      |   2 +-
 9 files changed, 537 insertions(+), 395 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.java
new file mode 100644
index 00000000000..6354f04de07
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.nodes.FlinkRelNode;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.IntervalJoinSpec;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalIntervalJoin;
+import org.apache.flink.table.planner.plan.utils.IntervalJoinUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.immutables.value.Value;
+
+import java.util.Collection;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Option;
+
+/**
+ * Rule that converts non-SEMI/ANTI {@link FlinkLogicalJoin} with window 
bounds in join condition to
+ * {@link StreamPhysicalIntervalJoin}.
+ */
[email protected]
+public class StreamPhysicalIntervalJoinRule
+        extends StreamPhysicalJoinRuleBase<
+                
StreamPhysicalIntervalJoinRule.StreamPhysicalIntervalJoinRuleConfig> {
+    public static final RelOptRule INSTANCE = 
StreamPhysicalIntervalJoinRuleConfig.DEFAULT.toRule();
+
+    public StreamPhysicalIntervalJoinRule(StreamPhysicalIntervalJoinRuleConfig 
config) {
+        super(config);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        FlinkLogicalJoin join = call.rel(0);
+
+        if (!IntervalJoinUtil.satisfyIntervalJoin(join)) {
+            return false;
+        }
+
+        // validate the join
+        IntervalJoinSpec.WindowBounds windowBounds = 
extractWindowBounds(join).f0.get();
+
+        if (windowBounds.isEventTime()) {
+            RelDataType leftTimeAttributeType =
+                    join.getLeft()
+                            .getRowType()
+                            .getFieldList()
+                            .get(windowBounds.getLeftTimeIdx())
+                            .getType();
+            RelDataType rightTimeAttributeType =
+                    join.getRight()
+                            .getRowType()
+                            .getFieldList()
+                            .get(windowBounds.getRightTimeIdx())
+                            .getType();
+            if (leftTimeAttributeType.getSqlTypeName() != 
rightTimeAttributeType.getSqlTypeName()) {
+                throw new ValidationException(
+                        String.format(
+                                "Interval join with rowtime attribute requires 
same rowtime types,"
+                                        + " but the types are %s and %s.",
+                                leftTimeAttributeType, 
rightTimeAttributeType));
+            }
+        } else {
+            // Check that no event-time attributes are in the input because 
the processing time
+            // window
+            // join does not correctly hold back watermarks.
+            // We rely on projection pushdown to remove unused attributes 
before the join.
+            RelDataType joinRowType = join.getRowType();
+            boolean containsRowTime =
+                    joinRowType.getFieldList().stream()
+                            .anyMatch(f -> 
FlinkTypeFactory.isRowtimeIndicatorType(f.getType()));
+            if (containsRowTime) {
+                throw new TableException(
+                        "Interval join with proctime attribute requires no 
event-time attributes are in the "
+                                + "join inputs.");
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public Collection<Integer> computeJoinLeftKeys(FlinkLogicalJoin join) {
+        Tuple2<Option<IntervalJoinSpec.WindowBounds>, Option<RexNode>> tuple2 =
+                extractWindowBounds(join);
+        return join.analyzeCondition().leftKeys.stream()
+                .filter(k -> tuple2.f0.get().getLeftTimeIdx() != k)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public Collection<Integer> computeJoinRightKeys(FlinkLogicalJoin join) {
+        Tuple2<Option<IntervalJoinSpec.WindowBounds>, Option<RexNode>> tuple2 =
+                extractWindowBounds(join);
+        return join.analyzeCondition().rightKeys.stream()
+                .filter(k -> tuple2.f0.get().getRightTimeIdx() != k)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public FlinkRelNode transform(
+            FlinkLogicalJoin join,
+            FlinkRelNode leftInput,
+            Function<RelNode, RelNode> leftConversion,
+            FlinkRelNode rightInput,
+            Function<RelNode, RelNode> rightConversion,
+            RelTraitSet providedTraitSet) {
+        Tuple2<Option<IntervalJoinSpec.WindowBounds>, Option<RexNode>> tuple2 =
+                extractWindowBounds(join);
+        return new StreamPhysicalIntervalJoin(
+                join.getCluster(),
+                providedTraitSet,
+                leftConversion.apply(leftInput),
+                rightConversion.apply(rightInput),
+                join.getJoinType(),
+                join.getCondition(),
+                tuple2.f1.getOrElse(() -> 
join.getCluster().getRexBuilder().makeLiteral(true)),
+                tuple2.f0.get());
+    }
+
+    /** Configuration for {@link StreamPhysicalIntervalJoinRule}. */
+    @Value.Immutable
+    public interface StreamPhysicalIntervalJoinRuleConfig
+            extends StreamPhysicalJoinRuleBaseRuleConfig {
+        StreamPhysicalIntervalJoinRule.StreamPhysicalIntervalJoinRuleConfig 
DEFAULT =
+                
ImmutableStreamPhysicalIntervalJoinRule.StreamPhysicalIntervalJoinRuleConfig
+                        .builder()
+                        .build()
+                        
.withOperandSupplier(StreamPhysicalJoinRuleBaseRuleConfig.OPERAND_TRANSFORM)
+                        .withDescription("StreamPhysicalJoinRuleBase")
+                        .as(
+                                
StreamPhysicalIntervalJoinRule.StreamPhysicalIntervalJoinRuleConfig
+                                        .class);
+
+        @Override
+        default StreamPhysicalIntervalJoinRule toRule() {
+            return new StreamPhysicalIntervalJoinRule(this);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.java
new file mode 100644
index 00000000000..66be2a24096
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.nodes.FlinkRelNode;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalJoin;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.immutables.value.Value;
+
+import java.util.function.Function;
+
+/**
+ * Rule that converts {@link FlinkLogicalJoin} without window bounds in join 
condition to {@link
+ * StreamPhysicalJoin}.
+ */
[email protected]
+public class StreamPhysicalJoinRule
+        extends 
StreamPhysicalJoinRuleBase<StreamPhysicalJoinRule.StreamPhysicalJoinRuleConfig> 
{
+    public static final StreamPhysicalJoinRule INSTANCE =
+            StreamPhysicalJoinRuleConfig.DEFAULT.toRule();
+
+    public 
StreamPhysicalJoinRule(StreamPhysicalJoinRule.StreamPhysicalJoinRuleConfig 
config) {
+        super(config);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        final FlinkLogicalJoin join = call.rel(0);
+        final FlinkLogicalRel left = call.rel(1);
+        final FlinkLogicalRel right = call.rel(2);
+
+        if (!JoinUtil.satisfyRegularJoin(join, left, right)) {
+            return false;
+        }
+
+        // validate the join
+        if (left instanceof FlinkLogicalSnapshot) {
+            throw new TableException(
+                    "Temporal table join only support apply FOR SYSTEM_TIME AS 
OF on the right table.");
+        }
+
+        // INITIAL_TEMPORAL_JOIN_CONDITION should not appear in physical phase 
in case which
+        // fallback
+        // to regular join
+        Preconditions.checkState(
+                
!TemporalJoinUtil.containsInitialTemporalJoinCondition(join.getCondition()));
+
+        // Time attributes must not be in the output type of a regular join
+        boolean timeAttrInOutput =
+                join.getRowType().getFieldList().stream()
+                        .anyMatch(f -> 
FlinkTypeFactory.isTimeIndicatorType(f.getType()));
+        Preconditions.checkState(!timeAttrInOutput);
+
+        // Join condition must not access time attributes
+        boolean remainingPredsAccessTime =
+                JoinUtil.accessesTimeAttribute(
+                        join.getCondition(), 
JoinUtil.combineJoinInputsRowType(join));
+        Preconditions.checkState(!remainingPredsAccessTime);
+        return true;
+    }
+
+    @Override
+    public FlinkRelNode transform(
+            FlinkLogicalJoin join,
+            FlinkRelNode leftInput,
+            Function<RelNode, RelNode> leftConversion,
+            FlinkRelNode rightInput,
+            Function<RelNode, RelNode> rightConversion,
+            RelTraitSet providedTraitSet) {
+        return new StreamPhysicalJoin(
+                join.getCluster(),
+                providedTraitSet,
+                leftConversion.apply(leftInput),
+                rightConversion.apply(rightInput),
+                join.getCondition(),
+                join.getJoinType(),
+                join.getHints());
+    }
+
+    /** Configuration for {@link StreamPhysicalIntervalJoinRule}. */
+    @Value.Immutable
+    public interface StreamPhysicalJoinRuleConfig
+            extends 
StreamPhysicalJoinRuleBase.StreamPhysicalJoinRuleBaseRuleConfig {
+        StreamPhysicalJoinRule.StreamPhysicalJoinRuleConfig DEFAULT =
+                
ImmutableStreamPhysicalJoinRule.StreamPhysicalJoinRuleConfig.builder()
+                        .build()
+                        .withOperandSupplier(OPERAND_TRANSFORM)
+                        .withDescription("StreamPhysicalJoinRule")
+                        
.as(StreamPhysicalJoinRule.StreamPhysicalJoinRuleConfig.class);
+
+        @Override
+        default StreamPhysicalJoinRule toRule() {
+            return new StreamPhysicalJoinRule(this);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRuleBase.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRuleBase.java
new file mode 100644
index 00000000000..339ad280d04
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRuleBase.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.FlinkRelNode;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.IntervalJoinSpec;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel;
+import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
+import org.apache.flink.table.planner.plan.utils.IntervalJoinUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.immutables.value.Value;
+
+import java.util.Collection;
+import java.util.function.Function;
+
+import scala.Option;
+
+/**
+ * Base implementation for rules match stream-stream join, including regular 
stream join, interval
+ * join and temporal join.
+ */
[email protected]
+public abstract class StreamPhysicalJoinRuleBase<
+                T extends 
StreamPhysicalJoinRuleBase.StreamPhysicalJoinRuleBaseRuleConfig>
+        extends 
RelRule<StreamPhysicalJoinRuleBase.StreamPhysicalJoinRuleBaseRuleConfig> {
+
+    protected StreamPhysicalJoinRuleBase(T config) {
+        super(config);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        FlinkLogicalJoin join = call.rel(0);
+        FlinkLogicalRel left = call.rel(1);
+        FlinkLogicalRel right = call.rel(2);
+        FlinkRelNode newJoin =
+                transform(
+                        join,
+                        left,
+                        leftInput -> convertInput(leftInput, 
computeJoinLeftKeys(join)),
+                        right,
+                        rightInput -> convertInput(rightInput, 
computeJoinRightKeys(join)),
+                        
join.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL()));
+        call.transformTo(newJoin);
+    }
+
+    protected Tuple2<Option<IntervalJoinSpec.WindowBounds>, Option<RexNode>> 
extractWindowBounds(
+            FlinkLogicalJoin join) {
+        TableConfig tableConfig = ShortcutUtils.unwrapTableConfig(join);
+        return JavaScalaConversionUtil.toJava(
+                IntervalJoinUtil.extractWindowBoundsFromPredicate(
+                        join.getCondition(),
+                        join.getLeft().getRowType().getFieldCount(),
+                        join.getRowType(),
+                        join.getCluster().getRexBuilder(),
+                        tableConfig,
+                        ShortcutUtils.unwrapClassLoader(join)));
+    }
+
+    private RelNode convertInput(RelNode input, Collection<Integer> columns) {
+        RelTraitSet requiredTraitSet = toHashTraitByColumns(columns, 
input.getTraitSet());
+        return RelOptRule.convert(input, requiredTraitSet);
+    }
+
+    private RelTraitSet toHashTraitByColumns(
+            Collection<Integer> columns, RelTraitSet inputTraitSet) {
+        FlinkRelDistribution distribution =
+                columns.isEmpty()
+                        ? FlinkRelDistribution.SINGLETON()
+                        : FlinkRelDistribution.hash(
+                                
columns.stream().mapToInt(Integer::intValue).toArray(), true);
+        return 
inputTraitSet.replace(FlinkConventions.STREAM_PHYSICAL()).replace(distribution);
+    }
+
+    public Collection<Integer> computeJoinLeftKeys(FlinkLogicalJoin join) {
+        return join.analyzeCondition().leftKeys;
+    }
+
+    public Collection<Integer> computeJoinRightKeys(FlinkLogicalJoin join) {
+        return join.analyzeCondition().rightKeys;
+    }
+
+    public abstract FlinkRelNode transform(
+            FlinkLogicalJoin join,
+            FlinkRelNode leftInput,
+            Function<RelNode, RelNode> leftConversion,
+            FlinkRelNode rightInput,
+            Function<RelNode, RelNode> rightConversion,
+            RelTraitSet providedTraitSet);
+
+    /** Configuration for {@link StreamPhysicalConstantTableFunctionScanRule}. 
*/
+    @Value.Immutable
+    public interface StreamPhysicalJoinRuleBaseRuleConfig extends 
RelRule.Config {
+        RelRule.OperandTransform OPERAND_TRANSFORM =
+                b0 ->
+                        b0.operand(FlinkLogicalJoin.class)
+                                .inputs(
+                                        b1 -> 
b1.operand(FlinkLogicalRel.class).anyInputs(),
+                                        b2 -> 
b2.operand(FlinkLogicalRel.class).anyInputs());
+        StreamPhysicalJoinRuleBase.StreamPhysicalJoinRuleBaseRuleConfig 
DEFAULT =
+                
ImmutableStreamPhysicalJoinRuleBase.StreamPhysicalJoinRuleBaseRuleConfig.builder()
+                        .build()
+                        .withOperandSupplier(OPERAND_TRANSFORM)
+                        .withDescription("StreamPhysicalJoinRuleBase")
+                        
.as(StreamPhysicalJoinRuleBase.StreamPhysicalJoinRuleBaseRuleConfig.class);
+
+        @Override
+        default StreamPhysicalJoinRuleBase toRule() {
+            throw new RuntimeException();
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalJoinRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalJoinRule.java
new file mode 100644
index 00000000000..4c7877a0c94
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalJoinRule.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.planner.plan.nodes.FlinkRelNode;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTemporalJoin;
+import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.immutables.value.Value;
+
+import java.util.function.Function;
+
+/**
+ * Rule that matches a temporal join node and converts it to {@link 
StreamPhysicalTemporalJoin}, the
+ * temporal join node is a {@link FlinkLogicalJoin} which contains {@link 
TEMPORAL_JOIN_CONDITION}.
+ */
[email protected]
+public class StreamPhysicalTemporalJoinRule
+        extends StreamPhysicalJoinRuleBase<
+                
StreamPhysicalTemporalJoinRule.StreamPhysicalTemporalJoinRuleConfig> {
+    public static final StreamPhysicalTemporalJoinRule INSTANCE =
+            StreamPhysicalTemporalJoinRuleConfig.DEFAULT.toRule();
+
+    public StreamPhysicalTemporalJoinRule(StreamPhysicalTemporalJoinRuleConfig 
config) {
+        super(config);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        FlinkLogicalJoin join = call.rel(0);
+        if (!TemporalJoinUtil.satisfyTemporalJoin(join)) {
+            return false;
+        }
+
+        // validate the join
+        // INITIAL_TEMPORAL_JOIN_CONDITION should not appear in physical phase.
+        Preconditions.checkState(
+                
!TemporalJoinUtil.containsInitialTemporalJoinCondition(join.getCondition()));
+        return true;
+    }
+
+    @Override
+    public FlinkRelNode transform(
+            FlinkLogicalJoin join,
+            FlinkRelNode leftInput,
+            Function<RelNode, RelNode> leftConversion,
+            FlinkRelNode rightInput,
+            Function<RelNode, RelNode> rightConversion,
+            RelTraitSet providedTraitSet) {
+        final RelNode newRight;
+        if (rightInput instanceof FlinkLogicalSnapshot) {
+            newRight = ((FlinkLogicalSnapshot) rightInput).getInput();
+        } else {
+            newRight = rightInput;
+        }
+
+        return new StreamPhysicalTemporalJoin(
+                join.getCluster(),
+                providedTraitSet,
+                leftConversion.apply(leftInput),
+                rightConversion.apply(newRight),
+                join.getCondition(),
+                join.getJoinType());
+    }
+
+    /** Configuration for {@link StreamPhysicalIntervalJoinRule}. */
+    @Value.Immutable
+    public interface StreamPhysicalTemporalJoinRuleConfig
+            extends 
StreamPhysicalJoinRuleBase.StreamPhysicalJoinRuleBaseRuleConfig {
+        StreamPhysicalTemporalJoinRule.StreamPhysicalTemporalJoinRuleConfig 
DEFAULT =
+                
ImmutableStreamPhysicalTemporalJoinRule.StreamPhysicalTemporalJoinRuleConfig
+                        .builder()
+                        .build()
+                        .withOperandSupplier(OPERAND_TRANSFORM)
+                        .withDescription("StreamPhysicalTemporalJoinRule")
+                        .as(
+                                
StreamPhysicalTemporalJoinRule.StreamPhysicalTemporalJoinRuleConfig
+                                        .class);
+
+        @Override
+        default StreamPhysicalTemporalJoinRule toRule() {
+            return new StreamPhysicalTemporalJoinRule(this);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.scala
deleted file mode 100644
index aad46100203..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.physical.stream
-
-import org.apache.flink.table.api.{TableException, ValidationException}
-import 
org.apache.flink.table.planner.calcite.FlinkTypeFactory.isRowtimeIndicatorType
-import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
-import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalIntervalJoin
-import 
org.apache.flink.table.planner.plan.utils.IntervalJoinUtil.satisfyIntervalJoin
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-
-import java.util
-
-import scala.collection.JavaConversions._
-
-/**
- * Rule that converts non-SEMI/ANTI [[FlinkLogicalJoin]] with window bounds in 
join condition to
- * [[StreamPhysicalIntervalJoin]].
- */
-class StreamPhysicalIntervalJoinRule
-  extends StreamPhysicalJoinRuleBase("StreamPhysicalIntervalJoinRule") {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val join: FlinkLogicalJoin = call.rel(0)
-
-    if (!satisfyIntervalJoin(join)) {
-      return false
-    }
-
-    // validate the join
-    val windowBounds = extractWindowBounds(join)._1.get
-
-    if (windowBounds.isEventTime) {
-      val leftTimeAttributeType = join.getLeft.getRowType.getFieldList
-        .get(windowBounds.getLeftTimeIdx)
-        .getType
-      val rightTimeAttributeType = join.getRight.getRowType.getFieldList
-        .get(windowBounds.getRightTimeIdx)
-        .getType
-      if (leftTimeAttributeType.getSqlTypeName != 
rightTimeAttributeType.getSqlTypeName) {
-        throw new ValidationException(
-          String.format(
-            "Interval join with rowtime attribute requires same rowtime 
types," +
-              " but the types are %s and %s.",
-            leftTimeAttributeType.toString,
-            rightTimeAttributeType.toString
-          ))
-      }
-    } else {
-      // Check that no event-time attributes are in the input because the 
processing time window
-      // join does not correctly hold back watermarks.
-      // We rely on projection pushdown to remove unused attributes before the 
join.
-      val joinRowType = join.getRowType
-      val containsRowTime = joinRowType.getFieldList.exists(f => 
isRowtimeIndicatorType(f.getType))
-      if (containsRowTime) {
-        throw new TableException(
-          "Interval join with proctime attribute requires no event-time 
attributes are in the " +
-            "join inputs.")
-      }
-    }
-    true
-  }
-
-  override protected def computeJoinLeftKeys(join: FlinkLogicalJoin): 
util.Collection[Integer] = {
-    val (windowBounds, _) = extractWindowBounds(join)
-    join
-      .analyzeCondition()
-      .leftKeys
-      .filter(k => windowBounds.get.getLeftTimeIdx != k)
-      .toList
-  }
-
-  override protected def computeJoinRightKeys(join: FlinkLogicalJoin): 
util.Collection[Integer] = {
-    val (windowBounds, _) = extractWindowBounds(join)
-    join
-      .analyzeCondition()
-      .rightKeys
-      .filter(k => windowBounds.get.getRightTimeIdx != k)
-      .toList
-  }
-
-  override protected def transform(
-      join: FlinkLogicalJoin,
-      leftInput: FlinkRelNode,
-      leftConversion: RelNode => RelNode,
-      rightInput: FlinkRelNode,
-      rightConversion: RelNode => RelNode,
-      providedTraitSet: RelTraitSet): FlinkRelNode = {
-    val (windowBounds, remainCondition) = extractWindowBounds(join)
-    new StreamPhysicalIntervalJoin(
-      join.getCluster,
-      providedTraitSet,
-      leftConversion(leftInput),
-      rightConversion(rightInput),
-      join.getJoinType,
-      join.getCondition,
-      
remainCondition.getOrElse(join.getCluster.getRexBuilder.makeLiteral(true)),
-      windowBounds.get)
-  }
-}
-
-object StreamPhysicalIntervalJoinRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalIntervalJoinRule
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.scala
deleted file mode 100644
index c59183246f8..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.physical.stream
-
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
-import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalJoin, 
FlinkLogicalRel, FlinkLogicalSnapshot}
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalJoin
-import 
org.apache.flink.table.planner.plan.utils.JoinUtil.{accessesTimeAttribute, 
combineJoinInputsRowType, satisfyRegularJoin}
-import 
org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.containsInitialTemporalJoinCondition
-import org.apache.flink.util.Preconditions.checkState
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-
-import scala.collection.JavaConversions._
-
-/**
- * Rule that converts [[FlinkLogicalJoin]] without window bounds in join 
condition to
- * [[StreamPhysicalJoin]].
- */
-class StreamPhysicalJoinRule extends 
StreamPhysicalJoinRuleBase("StreamPhysicalJoinRule") {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val join: FlinkLogicalJoin = call.rel(0)
-    val left: FlinkLogicalRel = call.rel(1).asInstanceOf[FlinkLogicalRel]
-    val right: FlinkLogicalRel = call.rel(2).asInstanceOf[FlinkLogicalRel]
-
-    if (!satisfyRegularJoin(join, left, right)) {
-      return false
-    }
-
-    // validate the join
-    if (left.isInstanceOf[FlinkLogicalSnapshot]) {
-      throw new TableException(
-        "Temporal table join only support apply FOR SYSTEM_TIME AS OF on the 
right table.")
-    }
-
-    // INITIAL_TEMPORAL_JOIN_CONDITION should not appear in physical phase in 
case which fallback
-    // to regular join
-    checkState(!containsInitialTemporalJoinCondition(join.getCondition))
-
-    // Time attributes must not be in the output type of a regular join
-    val timeAttrInOutput = join.getRowType.getFieldList
-      .exists(f => FlinkTypeFactory.isTimeIndicatorType(f.getType))
-    checkState(!timeAttrInOutput)
-
-    // Join condition must not access time attributes
-    val remainingPredsAccessTime =
-      accessesTimeAttribute(join.getCondition, combineJoinInputsRowType(join))
-    checkState(!remainingPredsAccessTime)
-    true
-  }
-
-  override protected def transform(
-      join: FlinkLogicalJoin,
-      leftInput: FlinkRelNode,
-      leftConversion: RelNode => RelNode,
-      rightInput: FlinkRelNode,
-      rightConversion: RelNode => RelNode,
-      providedTraitSet: RelTraitSet): FlinkRelNode = {
-    new StreamPhysicalJoin(
-      join.getCluster,
-      providedTraitSet,
-      leftConversion(leftInput),
-      rightConversion(rightInput),
-      join.getCondition,
-      join.getJoinType,
-      join.getHints)
-  }
-}
-
-object StreamPhysicalJoinRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalJoinRule
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRuleBase.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRuleBase.scala
deleted file mode 100644
index cd8d199b7ab..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRuleBase.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.physical.stream
-
-import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
-import org.apache.flink.table.planner.plan.nodes.{FlinkConventions, 
FlinkRelNode}
-import 
org.apache.flink.table.planner.plan.nodes.exec.spec.IntervalJoinSpec.WindowBounds
-import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalJoin, 
FlinkLogicalRel}
-import org.apache.flink.table.planner.plan.utils.IntervalJoinUtil
-import org.apache.flink.table.planner.utils.ShortcutUtils.{unwrapClassLoader, 
unwrapTableConfig}
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rex.RexNode
-
-import java.util
-
-/**
- * Base implementation for rules match stream-stream join, including regular 
stream join, interval
- * join and temporal join.
- */
-abstract class StreamPhysicalJoinRuleBase(description: String)
-  extends RelOptRule(
-    operand(
-      classOf[FlinkLogicalJoin],
-      operand(classOf[FlinkLogicalRel], any()),
-      operand(classOf[FlinkLogicalRel], any())),
-    description) {
-
-  protected def extractWindowBounds(
-      join: FlinkLogicalJoin): (Option[WindowBounds], Option[RexNode]) = {
-    val tableConfig = unwrapTableConfig(join)
-    IntervalJoinUtil.extractWindowBoundsFromPredicate(
-      join.getCondition,
-      join.getLeft.getRowType.getFieldCount,
-      join.getRowType,
-      join.getCluster.getRexBuilder,
-      tableConfig,
-      unwrapClassLoader(join))
-  }
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val join = call.rel[FlinkLogicalJoin](0)
-    val left = call.rel[FlinkLogicalRel](1)
-    val right = call.rel[FlinkLogicalRel](2)
-
-    def toHashTraitByColumns(
-        columns: util.Collection[_ <: Number],
-        inputTraitSet: RelTraitSet): RelTraitSet = {
-      val distribution = if (columns.size() == 0) {
-        FlinkRelDistribution.SINGLETON
-      } else {
-        FlinkRelDistribution.hash(columns)
-      }
-      inputTraitSet
-        .replace(FlinkConventions.STREAM_PHYSICAL)
-        .replace(distribution)
-    }
-
-    def convertInput(input: RelNode, columns: util.Collection[_ <: Number]): 
RelNode = {
-      val requiredTraitSet = toHashTraitByColumns(columns, input.getTraitSet)
-      RelOptRule.convert(input, requiredTraitSet)
-    }
-
-    val newJoin = transform(
-      join,
-      left,
-      leftInput => {
-        convertInput(leftInput, computeJoinLeftKeys(join))
-      },
-      right,
-      rightInput => {
-        convertInput(rightInput, computeJoinRightKeys(join))
-      },
-      join.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
-    )
-    call.transformTo(newJoin)
-  }
-
-  protected def computeJoinLeftKeys(join: FlinkLogicalJoin): 
util.Collection[Integer] =
-    join.analyzeCondition().leftKeys
-
-  protected def computeJoinRightKeys(join: FlinkLogicalJoin): 
util.Collection[Integer] =
-    join.analyzeCondition().rightKeys
-
-  protected def transform(
-      join: FlinkLogicalJoin,
-      leftInput: FlinkRelNode,
-      leftConversion: RelNode => RelNode,
-      rightInput: FlinkRelNode,
-      rightConversion: RelNode => RelNode,
-      providedTraitSet: RelTraitSet): FlinkRelNode
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalJoinRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalJoinRule.scala
deleted file mode 100644
index 0e3a9846282..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalJoinRule.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.physical.stream
-
-import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
-import org.apache.flink.table.planner.plan.nodes.logical._
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTemporalJoin
-import 
org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.{containsInitialTemporalJoinCondition,
 satisfyTemporalJoin}
-import org.apache.flink.util.Preconditions.checkState
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-
-/**
- * Rule that matches a temporal join node and converts it to 
[[StreamPhysicalTemporalJoin]], the
- * temporal join node is a [[FlinkLogicalJoin]] which contains 
[[TEMPORAL_JOIN_CONDITION]].
- */
-class StreamPhysicalTemporalJoinRule
-  extends StreamPhysicalJoinRuleBase("StreamPhysicalTemporalJoinRule") {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val join = call.rel[FlinkLogicalJoin](0)
-    if (!satisfyTemporalJoin(join)) {
-      return false
-    }
-
-    // validate the join
-    // INITIAL_TEMPORAL_JOIN_CONDITION should not appear in physical phase.
-    checkState(!containsInitialTemporalJoinCondition(join.getCondition))
-    true
-  }
-
-  override protected def transform(
-      join: FlinkLogicalJoin,
-      leftInput: FlinkRelNode,
-      leftConversion: RelNode => RelNode,
-      rightInput: FlinkRelNode,
-      rightConversion: RelNode => RelNode,
-      providedTraitSet: RelTraitSet): FlinkRelNode = {
-    val newRight = rightInput match {
-      case snapshot: FlinkLogicalSnapshot =>
-        snapshot.getInput
-      case rel: FlinkLogicalRel => rel
-    }
-    new StreamPhysicalTemporalJoin(
-      join.getCluster,
-      providedTraitSet,
-      leftConversion(leftInput),
-      rightConversion(newRight),
-      join.getCondition,
-      join.getJoinType)
-  }
-}
-
-object StreamPhysicalTemporalJoinRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalTemporalJoinRule
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala
index 58b2a2d5526..4d06a1607ae 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala
@@ -70,7 +70,7 @@ object IntervalJoinUtil {
    * @return
    *   A Tuple2 of extracted window bounds and remaining predicates.
    */
-  private[flink] def extractWindowBoundsFromPredicate(
+  def extractWindowBoundsFromPredicate(
       predicate: RexNode,
       leftLogicalFieldCnt: Int,
       joinRowType: RelDataType,


Reply via email to