This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 73993fc55e3 [FLINK-38885][table] Migrate `StreamPhysicalJoinRuleBase`
and its children
73993fc55e3 is described below
commit 73993fc55e379d502c14f154a9bcb587a6539927
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Sat Jan 24 09:49:07 2026 +0100
[FLINK-38885][table] Migrate `StreamPhysicalJoinRuleBase` and its children
---
.../stream/StreamPhysicalIntervalJoinRule.java | 166 +++++++++++++++++++++
.../physical/stream/StreamPhysicalJoinRule.java | 123 +++++++++++++++
.../stream/StreamPhysicalJoinRuleBase.java | 140 +++++++++++++++++
.../stream/StreamPhysicalTemporalJoinRule.java | 107 +++++++++++++
.../stream/StreamPhysicalIntervalJoinRule.scala | 122 ---------------
.../physical/stream/StreamPhysicalJoinRule.scala | 91 -----------
.../stream/StreamPhysicalJoinRuleBase.scala | 109 --------------
.../stream/StreamPhysicalTemporalJoinRule.scala | 72 ---------
.../planner/plan/utils/IntervalJoinUtil.scala | 2 +-
9 files changed, 537 insertions(+), 395 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.java
new file mode 100644
index 00000000000..6354f04de07
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.nodes.FlinkRelNode;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.IntervalJoinSpec;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalIntervalJoin;
+import org.apache.flink.table.planner.plan.utils.IntervalJoinUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.immutables.value.Value;
+
+import java.util.Collection;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Option;
+
+/**
+ * Rule that converts non-SEMI/ANTI {@link FlinkLogicalJoin} with window
bounds in join condition to
+ * {@link StreamPhysicalIntervalJoin}.
+ */
[email protected]
+public class StreamPhysicalIntervalJoinRule
+ extends StreamPhysicalJoinRuleBase<
+
StreamPhysicalIntervalJoinRule.StreamPhysicalIntervalJoinRuleConfig> {
+ public static final RelOptRule INSTANCE =
StreamPhysicalIntervalJoinRuleConfig.DEFAULT.toRule();
+
+ public StreamPhysicalIntervalJoinRule(StreamPhysicalIntervalJoinRuleConfig
config) {
+ super(config);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ FlinkLogicalJoin join = call.rel(0);
+
+ if (!IntervalJoinUtil.satisfyIntervalJoin(join)) {
+ return false;
+ }
+
+ // validate the join
+ IntervalJoinSpec.WindowBounds windowBounds =
extractWindowBounds(join).f0.get();
+
+ if (windowBounds.isEventTime()) {
+ RelDataType leftTimeAttributeType =
+ join.getLeft()
+ .getRowType()
+ .getFieldList()
+ .get(windowBounds.getLeftTimeIdx())
+ .getType();
+ RelDataType rightTimeAttributeType =
+ join.getRight()
+ .getRowType()
+ .getFieldList()
+ .get(windowBounds.getRightTimeIdx())
+ .getType();
+ if (leftTimeAttributeType.getSqlTypeName() !=
rightTimeAttributeType.getSqlTypeName()) {
+ throw new ValidationException(
+ String.format(
+ "Interval join with rowtime attribute requires
same rowtime types,"
+ + " but the types are %s and %s.",
+ leftTimeAttributeType,
rightTimeAttributeType));
+ }
+ } else {
+ // Check that no event-time attributes are in the input because
the processing time
+ // window
+ // join does not correctly hold back watermarks.
+ // We rely on projection pushdown to remove unused attributes
before the join.
+ RelDataType joinRowType = join.getRowType();
+ boolean containsRowTime =
+ joinRowType.getFieldList().stream()
+ .anyMatch(f ->
FlinkTypeFactory.isRowtimeIndicatorType(f.getType()));
+ if (containsRowTime) {
+ throw new TableException(
+ "Interval join with proctime attribute requires no
event-time attributes are in the "
+ + "join inputs.");
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public Collection<Integer> computeJoinLeftKeys(FlinkLogicalJoin join) {
+ Tuple2<Option<IntervalJoinSpec.WindowBounds>, Option<RexNode>> tuple2 =
+ extractWindowBounds(join);
+ return join.analyzeCondition().leftKeys.stream()
+ .filter(k -> tuple2.f0.get().getLeftTimeIdx() != k)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public Collection<Integer> computeJoinRightKeys(FlinkLogicalJoin join) {
+ Tuple2<Option<IntervalJoinSpec.WindowBounds>, Option<RexNode>> tuple2 =
+ extractWindowBounds(join);
+ return join.analyzeCondition().rightKeys.stream()
+ .filter(k -> tuple2.f0.get().getRightTimeIdx() != k)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public FlinkRelNode transform(
+ FlinkLogicalJoin join,
+ FlinkRelNode leftInput,
+ Function<RelNode, RelNode> leftConversion,
+ FlinkRelNode rightInput,
+ Function<RelNode, RelNode> rightConversion,
+ RelTraitSet providedTraitSet) {
+ Tuple2<Option<IntervalJoinSpec.WindowBounds>, Option<RexNode>> tuple2 =
+ extractWindowBounds(join);
+ return new StreamPhysicalIntervalJoin(
+ join.getCluster(),
+ providedTraitSet,
+ leftConversion.apply(leftInput),
+ rightConversion.apply(rightInput),
+ join.getJoinType(),
+ join.getCondition(),
+ tuple2.f1.getOrElse(() ->
join.getCluster().getRexBuilder().makeLiteral(true)),
+ tuple2.f0.get());
+ }
+
+ /** Configuration for {@link StreamPhysicalIntervalJoinRule}. */
+ @Value.Immutable
+ public interface StreamPhysicalIntervalJoinRuleConfig
+ extends StreamPhysicalJoinRuleBaseRuleConfig {
+ StreamPhysicalIntervalJoinRule.StreamPhysicalIntervalJoinRuleConfig
DEFAULT =
+
ImmutableStreamPhysicalIntervalJoinRule.StreamPhysicalIntervalJoinRuleConfig
+ .builder()
+ .build()
+
.withOperandSupplier(StreamPhysicalJoinRuleBaseRuleConfig.OPERAND_TRANSFORM)
+ .withDescription("StreamPhysicalJoinRuleBase")
+ .as(
+
StreamPhysicalIntervalJoinRule.StreamPhysicalIntervalJoinRuleConfig
+ .class);
+
+ @Override
+ default StreamPhysicalIntervalJoinRule toRule() {
+ return new StreamPhysicalIntervalJoinRule(this);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.java
new file mode 100644
index 00000000000..66be2a24096
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.nodes.FlinkRelNode;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalJoin;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.immutables.value.Value;
+
+import java.util.function.Function;
+
+/**
+ * Rule that converts {@link FlinkLogicalJoin} without window bounds in join
condition to {@link
+ * StreamPhysicalJoin}.
+ */
[email protected]
+public class StreamPhysicalJoinRule
+ extends
StreamPhysicalJoinRuleBase<StreamPhysicalJoinRule.StreamPhysicalJoinRuleConfig>
{
+ public static final StreamPhysicalJoinRule INSTANCE =
+ StreamPhysicalJoinRuleConfig.DEFAULT.toRule();
+
+ public
StreamPhysicalJoinRule(StreamPhysicalJoinRule.StreamPhysicalJoinRuleConfig
config) {
+ super(config);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final FlinkLogicalJoin join = call.rel(0);
+ final FlinkLogicalRel left = call.rel(1);
+ final FlinkLogicalRel right = call.rel(2);
+
+ if (!JoinUtil.satisfyRegularJoin(join, left, right)) {
+ return false;
+ }
+
+ // validate the join
+ if (left instanceof FlinkLogicalSnapshot) {
+ throw new TableException(
+ "Temporal table join only support apply FOR SYSTEM_TIME AS
OF on the right table.");
+ }
+
+ // INITIAL_TEMPORAL_JOIN_CONDITION should not appear in physical phase
in case which
+ // fallback
+ // to regular join
+ Preconditions.checkState(
+
!TemporalJoinUtil.containsInitialTemporalJoinCondition(join.getCondition()));
+
+ // Time attributes must not be in the output type of a regular join
+ boolean timeAttrInOutput =
+ join.getRowType().getFieldList().stream()
+ .anyMatch(f ->
FlinkTypeFactory.isTimeIndicatorType(f.getType()));
+ Preconditions.checkState(!timeAttrInOutput);
+
+ // Join condition must not access time attributes
+ boolean remainingPredsAccessTime =
+ JoinUtil.accessesTimeAttribute(
+ join.getCondition(),
JoinUtil.combineJoinInputsRowType(join));
+ Preconditions.checkState(!remainingPredsAccessTime);
+ return true;
+ }
+
+ @Override
+ public FlinkRelNode transform(
+ FlinkLogicalJoin join,
+ FlinkRelNode leftInput,
+ Function<RelNode, RelNode> leftConversion,
+ FlinkRelNode rightInput,
+ Function<RelNode, RelNode> rightConversion,
+ RelTraitSet providedTraitSet) {
+ return new StreamPhysicalJoin(
+ join.getCluster(),
+ providedTraitSet,
+ leftConversion.apply(leftInput),
+ rightConversion.apply(rightInput),
+ join.getCondition(),
+ join.getJoinType(),
+ join.getHints());
+ }
+
+ /** Configuration for {@link StreamPhysicalIntervalJoinRule}. */
+ @Value.Immutable
+ public interface StreamPhysicalJoinRuleConfig
+ extends
StreamPhysicalJoinRuleBase.StreamPhysicalJoinRuleBaseRuleConfig {
+ StreamPhysicalJoinRule.StreamPhysicalJoinRuleConfig DEFAULT =
+
ImmutableStreamPhysicalJoinRule.StreamPhysicalJoinRuleConfig.builder()
+ .build()
+ .withOperandSupplier(OPERAND_TRANSFORM)
+ .withDescription("StreamPhysicalJoinRule")
+
.as(StreamPhysicalJoinRule.StreamPhysicalJoinRuleConfig.class);
+
+ @Override
+ default StreamPhysicalJoinRule toRule() {
+ return new StreamPhysicalJoinRule(this);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRuleBase.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRuleBase.java
new file mode 100644
index 00000000000..339ad280d04
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRuleBase.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.FlinkRelNode;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.IntervalJoinSpec;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel;
+import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
+import org.apache.flink.table.planner.plan.utils.IntervalJoinUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.immutables.value.Value;
+
+import java.util.Collection;
+import java.util.function.Function;
+
+import scala.Option;
+
+/**
+ * Base implementation for rules match stream-stream join, including regular
stream join, interval
+ * join and temporal join.
+ */
[email protected]
+public abstract class StreamPhysicalJoinRuleBase<
+ T extends
StreamPhysicalJoinRuleBase.StreamPhysicalJoinRuleBaseRuleConfig>
+ extends
RelRule<StreamPhysicalJoinRuleBase.StreamPhysicalJoinRuleBaseRuleConfig> {
+
+ protected StreamPhysicalJoinRuleBase(T config) {
+ super(config);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ FlinkLogicalJoin join = call.rel(0);
+ FlinkLogicalRel left = call.rel(1);
+ FlinkLogicalRel right = call.rel(2);
+ FlinkRelNode newJoin =
+ transform(
+ join,
+ left,
+ leftInput -> convertInput(leftInput,
computeJoinLeftKeys(join)),
+ right,
+ rightInput -> convertInput(rightInput,
computeJoinRightKeys(join)),
+
join.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL()));
+ call.transformTo(newJoin);
+ }
+
+ protected Tuple2<Option<IntervalJoinSpec.WindowBounds>, Option<RexNode>>
extractWindowBounds(
+ FlinkLogicalJoin join) {
+ TableConfig tableConfig = ShortcutUtils.unwrapTableConfig(join);
+ return JavaScalaConversionUtil.toJava(
+ IntervalJoinUtil.extractWindowBoundsFromPredicate(
+ join.getCondition(),
+ join.getLeft().getRowType().getFieldCount(),
+ join.getRowType(),
+ join.getCluster().getRexBuilder(),
+ tableConfig,
+ ShortcutUtils.unwrapClassLoader(join)));
+ }
+
+ private RelNode convertInput(RelNode input, Collection<Integer> columns) {
+ RelTraitSet requiredTraitSet = toHashTraitByColumns(columns,
input.getTraitSet());
+ return RelOptRule.convert(input, requiredTraitSet);
+ }
+
+ private RelTraitSet toHashTraitByColumns(
+ Collection<Integer> columns, RelTraitSet inputTraitSet) {
+ FlinkRelDistribution distribution =
+ columns.isEmpty()
+ ? FlinkRelDistribution.SINGLETON()
+ : FlinkRelDistribution.hash(
+
columns.stream().mapToInt(Integer::intValue).toArray(), true);
+ return
inputTraitSet.replace(FlinkConventions.STREAM_PHYSICAL()).replace(distribution);
+ }
+
+ public Collection<Integer> computeJoinLeftKeys(FlinkLogicalJoin join) {
+ return join.analyzeCondition().leftKeys;
+ }
+
+ public Collection<Integer> computeJoinRightKeys(FlinkLogicalJoin join) {
+ return join.analyzeCondition().rightKeys;
+ }
+
+ public abstract FlinkRelNode transform(
+ FlinkLogicalJoin join,
+ FlinkRelNode leftInput,
+ Function<RelNode, RelNode> leftConversion,
+ FlinkRelNode rightInput,
+ Function<RelNode, RelNode> rightConversion,
+ RelTraitSet providedTraitSet);
+
+ /** Configuration for {@link StreamPhysicalConstantTableFunctionScanRule}.
*/
+ @Value.Immutable
+ public interface StreamPhysicalJoinRuleBaseRuleConfig extends
RelRule.Config {
+ RelRule.OperandTransform OPERAND_TRANSFORM =
+ b0 ->
+ b0.operand(FlinkLogicalJoin.class)
+ .inputs(
+ b1 ->
b1.operand(FlinkLogicalRel.class).anyInputs(),
+ b2 ->
b2.operand(FlinkLogicalRel.class).anyInputs());
+ StreamPhysicalJoinRuleBase.StreamPhysicalJoinRuleBaseRuleConfig
DEFAULT =
+
ImmutableStreamPhysicalJoinRuleBase.StreamPhysicalJoinRuleBaseRuleConfig.builder()
+ .build()
+ .withOperandSupplier(OPERAND_TRANSFORM)
+ .withDescription("StreamPhysicalJoinRuleBase")
+
.as(StreamPhysicalJoinRuleBase.StreamPhysicalJoinRuleBaseRuleConfig.class);
+
+ @Override
+ default StreamPhysicalJoinRuleBase toRule() {
+ throw new RuntimeException();
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalJoinRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalJoinRule.java
new file mode 100644
index 00000000000..4c7877a0c94
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalJoinRule.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.planner.plan.nodes.FlinkRelNode;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTemporalJoin;
+import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.immutables.value.Value;
+
+import java.util.function.Function;
+
+/**
+ * Rule that matches a temporal join node and converts it to {@link
StreamPhysicalTemporalJoin}, the
+ * temporal join node is a {@link FlinkLogicalJoin} which contains {@link
TEMPORAL_JOIN_CONDITION}.
+ */
[email protected]
+public class StreamPhysicalTemporalJoinRule
+ extends StreamPhysicalJoinRuleBase<
+
StreamPhysicalTemporalJoinRule.StreamPhysicalTemporalJoinRuleConfig> {
+ public static final StreamPhysicalTemporalJoinRule INSTANCE =
+ StreamPhysicalTemporalJoinRuleConfig.DEFAULT.toRule();
+
+ public StreamPhysicalTemporalJoinRule(StreamPhysicalTemporalJoinRuleConfig
config) {
+ super(config);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ FlinkLogicalJoin join = call.rel(0);
+ if (!TemporalJoinUtil.satisfyTemporalJoin(join)) {
+ return false;
+ }
+
+ // validate the join
+ // INITIAL_TEMPORAL_JOIN_CONDITION should not appear in physical phase.
+ Preconditions.checkState(
+
!TemporalJoinUtil.containsInitialTemporalJoinCondition(join.getCondition()));
+ return true;
+ }
+
+ @Override
+ public FlinkRelNode transform(
+ FlinkLogicalJoin join,
+ FlinkRelNode leftInput,
+ Function<RelNode, RelNode> leftConversion,
+ FlinkRelNode rightInput,
+ Function<RelNode, RelNode> rightConversion,
+ RelTraitSet providedTraitSet) {
+ final RelNode newRight;
+ if (rightInput instanceof FlinkLogicalSnapshot) {
+ newRight = ((FlinkLogicalSnapshot) rightInput).getInput();
+ } else {
+ newRight = rightInput;
+ }
+
+ return new StreamPhysicalTemporalJoin(
+ join.getCluster(),
+ providedTraitSet,
+ leftConversion.apply(leftInput),
+ rightConversion.apply(newRight),
+ join.getCondition(),
+ join.getJoinType());
+ }
+
+ /** Configuration for {@link StreamPhysicalIntervalJoinRule}. */
+ @Value.Immutable
+ public interface StreamPhysicalTemporalJoinRuleConfig
+ extends
StreamPhysicalJoinRuleBase.StreamPhysicalJoinRuleBaseRuleConfig {
+ StreamPhysicalTemporalJoinRule.StreamPhysicalTemporalJoinRuleConfig
DEFAULT =
+
ImmutableStreamPhysicalTemporalJoinRule.StreamPhysicalTemporalJoinRuleConfig
+ .builder()
+ .build()
+ .withOperandSupplier(OPERAND_TRANSFORM)
+ .withDescription("StreamPhysicalTemporalJoinRule")
+ .as(
+
StreamPhysicalTemporalJoinRule.StreamPhysicalTemporalJoinRuleConfig
+ .class);
+
+ @Override
+ default StreamPhysicalTemporalJoinRule toRule() {
+ return new StreamPhysicalTemporalJoinRule(this);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.scala
deleted file mode 100644
index aad46100203..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.physical.stream
-
-import org.apache.flink.table.api.{TableException, ValidationException}
-import
org.apache.flink.table.planner.calcite.FlinkTypeFactory.isRowtimeIndicatorType
-import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
-import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin
-import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalIntervalJoin
-import
org.apache.flink.table.planner.plan.utils.IntervalJoinUtil.satisfyIntervalJoin
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-
-import java.util
-
-import scala.collection.JavaConversions._
-
-/**
- * Rule that converts non-SEMI/ANTI [[FlinkLogicalJoin]] with window bounds in
join condition to
- * [[StreamPhysicalIntervalJoin]].
- */
-class StreamPhysicalIntervalJoinRule
- extends StreamPhysicalJoinRuleBase("StreamPhysicalIntervalJoinRule") {
-
- override def matches(call: RelOptRuleCall): Boolean = {
- val join: FlinkLogicalJoin = call.rel(0)
-
- if (!satisfyIntervalJoin(join)) {
- return false
- }
-
- // validate the join
- val windowBounds = extractWindowBounds(join)._1.get
-
- if (windowBounds.isEventTime) {
- val leftTimeAttributeType = join.getLeft.getRowType.getFieldList
- .get(windowBounds.getLeftTimeIdx)
- .getType
- val rightTimeAttributeType = join.getRight.getRowType.getFieldList
- .get(windowBounds.getRightTimeIdx)
- .getType
- if (leftTimeAttributeType.getSqlTypeName !=
rightTimeAttributeType.getSqlTypeName) {
- throw new ValidationException(
- String.format(
- "Interval join with rowtime attribute requires same rowtime
types," +
- " but the types are %s and %s.",
- leftTimeAttributeType.toString,
- rightTimeAttributeType.toString
- ))
- }
- } else {
- // Check that no event-time attributes are in the input because the
processing time window
- // join does not correctly hold back watermarks.
- // We rely on projection pushdown to remove unused attributes before the
join.
- val joinRowType = join.getRowType
- val containsRowTime = joinRowType.getFieldList.exists(f =>
isRowtimeIndicatorType(f.getType))
- if (containsRowTime) {
- throw new TableException(
- "Interval join with proctime attribute requires no event-time
attributes are in the " +
- "join inputs.")
- }
- }
- true
- }
-
- override protected def computeJoinLeftKeys(join: FlinkLogicalJoin):
util.Collection[Integer] = {
- val (windowBounds, _) = extractWindowBounds(join)
- join
- .analyzeCondition()
- .leftKeys
- .filter(k => windowBounds.get.getLeftTimeIdx != k)
- .toList
- }
-
- override protected def computeJoinRightKeys(join: FlinkLogicalJoin):
util.Collection[Integer] = {
- val (windowBounds, _) = extractWindowBounds(join)
- join
- .analyzeCondition()
- .rightKeys
- .filter(k => windowBounds.get.getRightTimeIdx != k)
- .toList
- }
-
- override protected def transform(
- join: FlinkLogicalJoin,
- leftInput: FlinkRelNode,
- leftConversion: RelNode => RelNode,
- rightInput: FlinkRelNode,
- rightConversion: RelNode => RelNode,
- providedTraitSet: RelTraitSet): FlinkRelNode = {
- val (windowBounds, remainCondition) = extractWindowBounds(join)
- new StreamPhysicalIntervalJoin(
- join.getCluster,
- providedTraitSet,
- leftConversion(leftInput),
- rightConversion(rightInput),
- join.getJoinType,
- join.getCondition,
-
remainCondition.getOrElse(join.getCluster.getRexBuilder.makeLiteral(true)),
- windowBounds.get)
- }
-}
-
-object StreamPhysicalIntervalJoinRule {
- val INSTANCE: RelOptRule = new StreamPhysicalIntervalJoinRule
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.scala
deleted file mode 100644
index c59183246f8..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.physical.stream
-
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
-import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalJoin,
FlinkLogicalRel, FlinkLogicalSnapshot}
-import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalJoin
-import
org.apache.flink.table.planner.plan.utils.JoinUtil.{accessesTimeAttribute,
combineJoinInputsRowType, satisfyRegularJoin}
-import
org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.containsInitialTemporalJoinCondition
-import org.apache.flink.util.Preconditions.checkState
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-
-import scala.collection.JavaConversions._
-
-/**
- * Rule that converts [[FlinkLogicalJoin]] without window bounds in join
condition to
- * [[StreamPhysicalJoin]].
- */
-class StreamPhysicalJoinRule extends
StreamPhysicalJoinRuleBase("StreamPhysicalJoinRule") {
-
- override def matches(call: RelOptRuleCall): Boolean = {
- val join: FlinkLogicalJoin = call.rel(0)
- val left: FlinkLogicalRel = call.rel(1).asInstanceOf[FlinkLogicalRel]
- val right: FlinkLogicalRel = call.rel(2).asInstanceOf[FlinkLogicalRel]
-
- if (!satisfyRegularJoin(join, left, right)) {
- return false
- }
-
- // validate the join
- if (left.isInstanceOf[FlinkLogicalSnapshot]) {
- throw new TableException(
- "Temporal table join only support apply FOR SYSTEM_TIME AS OF on the
right table.")
- }
-
- // INITIAL_TEMPORAL_JOIN_CONDITION should not appear in physical phase in
case which fallback
- // to regular join
- checkState(!containsInitialTemporalJoinCondition(join.getCondition))
-
- // Time attributes must not be in the output type of a regular join
- val timeAttrInOutput = join.getRowType.getFieldList
- .exists(f => FlinkTypeFactory.isTimeIndicatorType(f.getType))
- checkState(!timeAttrInOutput)
-
- // Join condition must not access time attributes
- val remainingPredsAccessTime =
- accessesTimeAttribute(join.getCondition, combineJoinInputsRowType(join))
- checkState(!remainingPredsAccessTime)
- true
- }
-
- override protected def transform(
- join: FlinkLogicalJoin,
- leftInput: FlinkRelNode,
- leftConversion: RelNode => RelNode,
- rightInput: FlinkRelNode,
- rightConversion: RelNode => RelNode,
- providedTraitSet: RelTraitSet): FlinkRelNode = {
- new StreamPhysicalJoin(
- join.getCluster,
- providedTraitSet,
- leftConversion(leftInput),
- rightConversion(rightInput),
- join.getCondition,
- join.getJoinType,
- join.getHints)
- }
-}
-
-object StreamPhysicalJoinRule {
- val INSTANCE: RelOptRule = new StreamPhysicalJoinRule
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRuleBase.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRuleBase.scala
deleted file mode 100644
index cd8d199b7ab..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRuleBase.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.physical.stream
-
-import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
-import org.apache.flink.table.planner.plan.nodes.{FlinkConventions,
FlinkRelNode}
-import
org.apache.flink.table.planner.plan.nodes.exec.spec.IntervalJoinSpec.WindowBounds
-import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalJoin,
FlinkLogicalRel}
-import org.apache.flink.table.planner.plan.utils.IntervalJoinUtil
-import org.apache.flink.table.planner.utils.ShortcutUtils.{unwrapClassLoader,
unwrapTableConfig}
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rex.RexNode
-
-import java.util
-
-/**
- * Base implementation for rules match stream-stream join, including regular
stream join, interval
- * join and temporal join.
- */
-abstract class StreamPhysicalJoinRuleBase(description: String)
- extends RelOptRule(
- operand(
- classOf[FlinkLogicalJoin],
- operand(classOf[FlinkLogicalRel], any()),
- operand(classOf[FlinkLogicalRel], any())),
- description) {
-
- protected def extractWindowBounds(
- join: FlinkLogicalJoin): (Option[WindowBounds], Option[RexNode]) = {
- val tableConfig = unwrapTableConfig(join)
- IntervalJoinUtil.extractWindowBoundsFromPredicate(
- join.getCondition,
- join.getLeft.getRowType.getFieldCount,
- join.getRowType,
- join.getCluster.getRexBuilder,
- tableConfig,
- unwrapClassLoader(join))
- }
-
- override def onMatch(call: RelOptRuleCall): Unit = {
- val join = call.rel[FlinkLogicalJoin](0)
- val left = call.rel[FlinkLogicalRel](1)
- val right = call.rel[FlinkLogicalRel](2)
-
- def toHashTraitByColumns(
- columns: util.Collection[_ <: Number],
- inputTraitSet: RelTraitSet): RelTraitSet = {
- val distribution = if (columns.size() == 0) {
- FlinkRelDistribution.SINGLETON
- } else {
- FlinkRelDistribution.hash(columns)
- }
- inputTraitSet
- .replace(FlinkConventions.STREAM_PHYSICAL)
- .replace(distribution)
- }
-
- def convertInput(input: RelNode, columns: util.Collection[_ <: Number]):
RelNode = {
- val requiredTraitSet = toHashTraitByColumns(columns, input.getTraitSet)
- RelOptRule.convert(input, requiredTraitSet)
- }
-
- val newJoin = transform(
- join,
- left,
- leftInput => {
- convertInput(leftInput, computeJoinLeftKeys(join))
- },
- right,
- rightInput => {
- convertInput(rightInput, computeJoinRightKeys(join))
- },
- join.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
- )
- call.transformTo(newJoin)
- }
-
- protected def computeJoinLeftKeys(join: FlinkLogicalJoin):
util.Collection[Integer] =
- join.analyzeCondition().leftKeys
-
- protected def computeJoinRightKeys(join: FlinkLogicalJoin):
util.Collection[Integer] =
- join.analyzeCondition().rightKeys
-
- protected def transform(
- join: FlinkLogicalJoin,
- leftInput: FlinkRelNode,
- leftConversion: RelNode => RelNode,
- rightInput: FlinkRelNode,
- rightConversion: RelNode => RelNode,
- providedTraitSet: RelTraitSet): FlinkRelNode
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalJoinRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalJoinRule.scala
deleted file mode 100644
index 0e3a9846282..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalJoinRule.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.physical.stream
-
-import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
-import org.apache.flink.table.planner.plan.nodes.logical._
-import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTemporalJoin
-import
org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.{containsInitialTemporalJoinCondition,
satisfyTemporalJoin}
-import org.apache.flink.util.Preconditions.checkState
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-
-/**
- * Rule that matches a temporal join node and converts it to
[[StreamPhysicalTemporalJoin]], the
- * temporal join node is a [[FlinkLogicalJoin]] which contains
[[TEMPORAL_JOIN_CONDITION]].
- */
-class StreamPhysicalTemporalJoinRule
- extends StreamPhysicalJoinRuleBase("StreamPhysicalTemporalJoinRule") {
-
- override def matches(call: RelOptRuleCall): Boolean = {
- val join = call.rel[FlinkLogicalJoin](0)
- if (!satisfyTemporalJoin(join)) {
- return false
- }
-
- // validate the join
- // INITIAL_TEMPORAL_JOIN_CONDITION should not appear in physical phase.
- checkState(!containsInitialTemporalJoinCondition(join.getCondition))
- true
- }
-
- override protected def transform(
- join: FlinkLogicalJoin,
- leftInput: FlinkRelNode,
- leftConversion: RelNode => RelNode,
- rightInput: FlinkRelNode,
- rightConversion: RelNode => RelNode,
- providedTraitSet: RelTraitSet): FlinkRelNode = {
- val newRight = rightInput match {
- case snapshot: FlinkLogicalSnapshot =>
- snapshot.getInput
- case rel: FlinkLogicalRel => rel
- }
- new StreamPhysicalTemporalJoin(
- join.getCluster,
- providedTraitSet,
- leftConversion(leftInput),
- rightConversion(newRight),
- join.getCondition,
- join.getJoinType)
- }
-}
-
-object StreamPhysicalTemporalJoinRule {
- val INSTANCE: RelOptRule = new StreamPhysicalTemporalJoinRule
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala
index 58b2a2d5526..4d06a1607ae 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala
@@ -70,7 +70,7 @@ object IntervalJoinUtil {
* @return
* A Tuple2 of extracted window bounds and remaining predicates.
*/
- private[flink] def extractWindowBoundsFromPredicate(
+ def extractWindowBoundsFromPredicate(
predicate: RexNode,
leftLogicalFieldCnt: Int,
joinRowType: RelDataType,