This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 55dc9fa6922aacab3258159affe5867934771c6e Author: liuyongvs <[email protected]> AuthorDate: Tue Apr 9 15:23:22 2024 +0800 [FLINK-35054][table] Migrate TemporalJoinRewriteWithUniqueKeyRule to java This closes #24635 --- .../TemporalJoinRewriteWithUniqueKeyRule.java | 304 +++++++++++++++++++++ .../TemporalJoinRewriteWithUniqueKeyRule.scala | 201 -------------- 2 files changed, 304 insertions(+), 201 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.java new file mode 100644 index 00000000000..342cafd2526 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeduplicate; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank; +import org.apache.flink.table.planner.plan.rules.common.CommonTemporalTableJoinRule; +import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.util.ImmutableBitSet; +import org.immutables.value.Value; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import scala.collection.JavaConverters; + +/** + * Planner rule that rewrites temporal join with extracted primary key, Event-time temporal table + * join requires primary key and row time attribute of versioned table. The versioned table could be + * a table source or a view only if it contains the unique key and time attribute. + * + * <p>Flink supports extract the primary key and row time attribute from the view if the view comes + * from {@link StreamPhysicalRank} node which can convert to a {@link StreamPhysicalDeduplicate} + * node. + */ [email protected] +public class TemporalJoinRewriteWithUniqueKeyRule + extends RelRule< + TemporalJoinRewriteWithUniqueKeyRule.TemporalJoinRewriteWithUniqueKeyRuleConfig> + implements CommonTemporalTableJoinRule { + + public static final TemporalJoinRewriteWithUniqueKeyRule INSTANCE = + TemporalJoinRewriteWithUniqueKeyRule.TemporalJoinRewriteWithUniqueKeyRuleConfig.DEFAULT + .toRule(); + + private TemporalJoinRewriteWithUniqueKeyRule( + TemporalJoinRewriteWithUniqueKeyRuleConfig config) { + super(config); + } + + @Override + public boolean matches(RelOptRuleCall call) { + FlinkLogicalJoin join = call.rel(0); + FlinkLogicalSnapshot snapshot = call.rel(2); + FlinkLogicalRel snapshotInput = call.rel(3); + + boolean isTemporalJoin = matches(snapshot); + boolean canConvertToLookup = canConvertToLookupJoin(snapshot, snapshotInput); + List<JoinRelType> supportedJoinTypes = Arrays.asList(JoinRelType.INNER, JoinRelType.LEFT); + + return isTemporalJoin + && !canConvertToLookup + && supportedJoinTypes.contains(join.getJoinType()); + } + + @Override + public void onMatch(RelOptRuleCall call) { + FlinkLogicalJoin join = call.rel(0); + FlinkLogicalRel leftInput = call.rel(1); + FlinkLogicalSnapshot snapshot = call.rel(2); + FlinkLogicalRel snapshotInput = call.rel(3); + + RexNode joinCondition = join.getCondition(); + RexNode newJoinCondition = + joinCondition.accept( + new RexShuttle() { + @Override + public RexNode visitCall(RexCall call) { + if (call.getOperator() + .equals( + TemporalJoinUtil + .INITIAL_TEMPORAL_JOIN_CONDITION())) { + RexNode snapshotTimeInputRef; + List<RexNode> leftJoinKey; + List<RexNode> rightJoinKey; + + if (TemporalJoinUtil.isInitialRowTimeTemporalTableJoin(call)) { + snapshotTimeInputRef = call.operands.get(0); + leftJoinKey = + ((RexCall) call.operands.get(2)).getOperands(); + rightJoinKey = + ((RexCall) call.operands.get(3)).getOperands(); + } else { + snapshotTimeInputRef = call.operands.get(0); + leftJoinKey = + ((RexCall) call.operands.get(1)).getOperands(); + rightJoinKey = + ((RexCall) call.operands.get(2)).getOperands(); + } + + RexBuilder rexBuilder = join.getCluster().getRexBuilder(); + Optional<List<RexNode>> primaryKeyInputRefs = + extractPrimaryKeyInputRefs( + leftInput, snapshot, snapshotInput, rexBuilder); + validateRightPrimaryKey( + join, rightJoinKey, primaryKeyInputRefs); + + if (TemporalJoinUtil.isInitialRowTimeTemporalTableJoin(call)) { + RexNode rightTimeInputRef = call.operands.get(1); + return TemporalJoinUtil.makeRowTimeTemporalTableJoinConCall( + rexBuilder, + snapshotTimeInputRef, + rightTimeInputRef, + JavaConverters.asScalaBufferConverter( + primaryKeyInputRefs.get()) + .asScala(), + JavaConverters.asScalaBufferConverter(leftJoinKey) + .asScala(), + JavaConverters.asScalaBufferConverter(rightJoinKey) + .asScala()); + } else { + return TemporalJoinUtil + .makeProcTimeTemporalTableJoinConCall( + rexBuilder, + snapshotTimeInputRef, + JavaConverters.asScalaBufferConverter( + primaryKeyInputRefs.get()) + .asScala(), + JavaConverters.asScalaBufferConverter( + leftJoinKey) + .asScala(), + JavaConverters.asScalaBufferConverter( + rightJoinKey) + .asScala()); + } + } else { + return super.visitCall(call); + } + } + }); + RelNode rewriteJoin = + FlinkLogicalJoin.create( + leftInput, snapshot, newJoinCondition, join.getHints(), join.getJoinType()); + call.transformTo(rewriteJoin); + } + + private void validateRightPrimaryKey( + FlinkLogicalJoin join, + List<RexNode> rightJoinKeyExpressions, + Optional<List<RexNode>> rightPrimaryKeyInputRefs) { + + if (!rightPrimaryKeyInputRefs.isPresent()) { + throw new ValidationException( + "Temporal Table Join requires primary key in versioned table, " + + "but no primary key can be found. " + + "The physical plan is:\n" + + RelOptUtil.toString(join) + + "\n"); + } + + List<Integer> rightJoinKeyRefIndices = + rightJoinKeyExpressions.stream() + .map(rex -> ((RexInputRef) rex).getIndex()) + .collect(Collectors.toList()); + + List<Integer> rightPrimaryKeyRefIndices = + rightPrimaryKeyInputRefs.get().stream() + .map(rex -> ((RexInputRef) rex).getIndex()) + .collect(Collectors.toList()); + + boolean primaryKeyContainedInJoinKey = + rightPrimaryKeyRefIndices.stream() + .allMatch(pk -> rightJoinKeyRefIndices.contains(pk)); + + if (!primaryKeyContainedInJoinKey) { + List<String> joinFieldNames = join.getRowType().getFieldNames(); + List<String> joinLeftFieldNames = join.getLeft().getRowType().getFieldNames(); + List<String> joinRightFieldNames = join.getRight().getRowType().getFieldNames(); + + String primaryKeyNames = + rightPrimaryKeyRefIndices.stream() + .map(i -> joinFieldNames.get(i)) + .collect(Collectors.joining(",")); + + String joinEquiInfo = + join.analyzeCondition().pairs().stream() + .map( + pair -> + joinLeftFieldNames.get(pair.source) + + "=" + + joinRightFieldNames.get(pair.target)) + .collect(Collectors.joining(",")); + + throw new ValidationException( + "Temporal table's primary key [" + + primaryKeyNames + + "] must be included in the " + + "equivalence condition of temporal join, but current temporal join condition is [" + + joinEquiInfo + + "]."); + } + } + + private Optional<List<RexNode>> extractPrimaryKeyInputRefs( + RelNode leftInput, + FlinkLogicalSnapshot snapshot, + FlinkLogicalRel snapshotInput, + RexBuilder rexBuilder) { + List<RelDataTypeField> rightFields = snapshot.getRowType().getFieldList(); + FlinkRelMetadataQuery fmq = + FlinkRelMetadataQuery.reuseOrCreate(snapshot.getCluster().getMetadataQuery()); + + Set<ImmutableBitSet> upsertKeySet = fmq.getUpsertKeys(snapshotInput); + List<RelDataTypeField> fields = snapshot.getRowType().getFieldList(); + + if (upsertKeySet != null && !upsertKeySet.isEmpty()) { + int leftFieldCnt = leftInput.getRowType().getFieldCount(); + List<List<RexNode>> upsertKeySetInputRefs = + upsertKeySet.stream() + .filter(bitSet -> !bitSet.isEmpty()) + .map( + bitSet -> + Arrays.stream(bitSet.toArray()) + .mapToObj(index -> fields.get(index)) + // build InputRef of upsert key in snapshot + .map( + f -> + (RexNode) + rexBuilder.makeInputRef( + f.getType(), + leftFieldCnt + + rightFields + .indexOf( + f))) + .collect(Collectors.toList())) + .collect(Collectors.toList()); + // select shortest upsert key as primary key + return upsertKeySetInputRefs.stream() + .sorted(Comparator.comparingInt(List::size)) + .findFirst(); + } else { + return Optional.empty(); + } + } + + /** Rule configuration. */ + @Value.Immutable(singleton = false) + public interface TemporalJoinRewriteWithUniqueKeyRuleConfig extends RelRule.Config { + TemporalJoinRewriteWithUniqueKeyRule.TemporalJoinRewriteWithUniqueKeyRuleConfig DEFAULT = + ImmutableTemporalJoinRewriteWithUniqueKeyRule + .TemporalJoinRewriteWithUniqueKeyRuleConfig.builder() + .build() + .withOperandSupplier( + b0 -> + b0.operand(FlinkLogicalJoin.class) + .inputs( + b1 -> + b1.operand(FlinkLogicalRel.class) + .anyInputs(), + b2 -> + b2.operand( + FlinkLogicalSnapshot + .class) + .oneInput( + b3 -> + b3.operand( + FlinkLogicalRel + .class) + .anyInputs()))) + .withDescription("TemporalJoinRewriteWithUniqueKeyRule"); + + @Override + default TemporalJoinRewriteWithUniqueKeyRule toRule() { + return new TemporalJoinRewriteWithUniqueKeyRule(this); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala deleted file mode 100644 index 74a17e5f55e..00000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.rules.logical - -import org.apache.flink.table.api.ValidationException -import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery -import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalJoin, FlinkLogicalRel, FlinkLogicalSnapshot} -import org.apache.flink.table.planner.plan.rules.common.CommonTemporalTableJoinRule -import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil - -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil} -import org.apache.calcite.plan.RelOptRule.{any, operand} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.core.JoinRelType -import org.apache.calcite.rex._ - -import scala.collection.JavaConversions._ - -/** - * Planner rule that rewrites temporal join with extracted primary key, Event-time temporal table - * join requires primary key and row time attribute of versioned table. The versioned table could be - * a table source or a view only if it contains the unique key and time attribute. - * - * <p> Flink supports extract the primary key and row time attribute from the view if the view comes - * from [[LogicalRank]] node which can convert to a [[Deduplicate]] node. - */ -class TemporalJoinRewriteWithUniqueKeyRule - extends RelOptRule( - operand( - classOf[FlinkLogicalJoin], - operand(classOf[FlinkLogicalRel], any()), - operand(classOf[FlinkLogicalSnapshot], operand(classOf[FlinkLogicalRel], any()))), - "TemporalJoinRewriteWithUniqueKeyRule") - with CommonTemporalTableJoinRule { - - override def matches(call: RelOptRuleCall): Boolean = { - val join = call.rel[FlinkLogicalJoin](0) - val snapshot = call.rel[FlinkLogicalSnapshot](2) - val snapshotInput = call.rel[FlinkLogicalRel](3) - - val isTemporalJoin = matches(snapshot) - val canConvertToLookup = canConvertToLookupJoin(snapshot, snapshotInput) - val supportedJoinTypes = Seq(JoinRelType.INNER, JoinRelType.LEFT) - - isTemporalJoin && !canConvertToLookup && supportedJoinTypes.contains(join.getJoinType) - } - - override def onMatch(call: RelOptRuleCall): Unit = { - val join = call.rel[FlinkLogicalJoin](0) - val leftInput = call.rel[FlinkLogicalRel](1) - val snapshot = call.rel[FlinkLogicalSnapshot](2) - val snapshotInput = call.rel[FlinkLogicalRel](3) - - val joinCondition = join.getCondition - - val newJoinCondition = joinCondition.accept(new RexShuttle { - override def visitCall(call: RexCall): RexNode = { - if (call.getOperator == TemporalJoinUtil.INITIAL_TEMPORAL_JOIN_CONDITION) { - val (snapshotTimeInputRef, leftJoinKey, rightJoinKey) = - if (TemporalJoinUtil.isInitialRowTimeTemporalTableJoin(call)) { - val snapshotTimeInputRef = call.operands(0) - val leftJoinKey = call.operands(2).asInstanceOf[RexCall].operands - val rightJoinKey = call.operands(3).asInstanceOf[RexCall].operands - (snapshotTimeInputRef, leftJoinKey, rightJoinKey) - } else { - val snapshotTimeInputRef = call.operands(0) - val leftJoinKey = call.operands(1).asInstanceOf[RexCall].operands - val rightJoinKey = call.operands(2).asInstanceOf[RexCall].operands - (snapshotTimeInputRef, leftJoinKey, rightJoinKey) - } - - val rexBuilder = join.getCluster.getRexBuilder - val primaryKeyInputRefs = - extractPrimaryKeyInputRefs(leftInput, snapshot, snapshotInput, rexBuilder) - validateRightPrimaryKey(join, rightJoinKey, primaryKeyInputRefs) - - if (TemporalJoinUtil.isInitialRowTimeTemporalTableJoin(call)) { - val rightTimeInputRef = call.operands(1) - TemporalJoinUtil.makeRowTimeTemporalTableJoinConCall( - rexBuilder, - snapshotTimeInputRef, - rightTimeInputRef, - primaryKeyInputRefs.get, - leftJoinKey, - rightJoinKey) - } else { - TemporalJoinUtil.makeProcTimeTemporalTableJoinConCall( - rexBuilder, - snapshotTimeInputRef, - primaryKeyInputRefs.get, - leftJoinKey, - rightJoinKey) - } - } else { - super.visitCall(call) - } - } - }) - val rewriteJoin = - FlinkLogicalJoin.create( - leftInput, - snapshot, - newJoinCondition, - join.getHints, - join.getJoinType) - call.transformTo(rewriteJoin) - } - - private def validateRightPrimaryKey( - join: FlinkLogicalJoin, - rightJoinKeyExpression: Seq[RexNode], - rightPrimaryKeyInputRefs: Option[Seq[RexNode]]): Unit = { - - if (rightPrimaryKeyInputRefs.isEmpty) { - throw new ValidationException( - "Temporal Table Join requires primary key in versioned table, " + - s"but no primary key can be found. " + - s"The physical plan is:\n${RelOptUtil.toString(join)}\n") - } - - val rightJoinKeyRefIndices = rightJoinKeyExpression - .map(rex => rex.asInstanceOf[RexInputRef].getIndex) - .toArray - - val rightPrimaryKeyRefIndices = rightPrimaryKeyInputRefs.get - .map(rex => rex.asInstanceOf[RexInputRef].getIndex) - .toArray - - val primaryKeyContainedInJoinKey = rightPrimaryKeyRefIndices - .forall(pk => rightJoinKeyRefIndices.contains(pk)) - - if (!primaryKeyContainedInJoinKey) { - val joinFieldNames = join.getRowType.getFieldNames - val joinLeftFieldNames = join.getLeft.getRowType.getFieldNames - val joinRightFieldNamess = join.getRight.getRowType.getFieldNames - val primaryKeyNames = rightPrimaryKeyRefIndices - .map(i => joinFieldNames.get(i)) - .toList - .mkString(",") - val joinEquiInfo = join.analyzeCondition - .pairs() - .map { - pair => joinLeftFieldNames.get(pair.source) + "=" + joinRightFieldNamess.get(pair.target) - } - .toList - .mkString(",") - throw new ValidationException( - s"Temporal table's primary key [$primaryKeyNames] must be included in the equivalence " + - s"condition of temporal join, but current temporal join condition is [$joinEquiInfo].") - } - } - - private def extractPrimaryKeyInputRefs( - leftInput: RelNode, - snapshot: FlinkLogicalSnapshot, - snapshotInput: FlinkLogicalRel, - rexBuilder: RexBuilder): Option[Seq[RexNode]] = { - val rightFields = snapshot.getRowType.getFieldList - val fmq = FlinkRelMetadataQuery.reuseOrCreate(snapshot.getCluster.getMetadataQuery) - - val upsertKeySet = fmq.getUpsertKeys(snapshotInput) - val fields = snapshot.getRowType.getFieldList - - if (upsertKeySet != null && upsertKeySet.size() > 0) { - val leftFieldCnt = leftInput.getRowType.getFieldCount - val upsertKeySetInputRefs = upsertKeySet - .filter(_.nonEmpty) - .map( - _.toArray - .map(fields) - // build InputRef of upsert key in snapshot - .map(f => rexBuilder.makeInputRef(f.getType, leftFieldCnt + rightFields.indexOf(f))) - .toSeq) - // select shortest upsert key as primary key - upsertKeySetInputRefs.toArray - .sortBy(_.length) - .headOption - } else { - None - } - } -} - -object TemporalJoinRewriteWithUniqueKeyRule { - val INSTANCE: TemporalJoinRewriteWithUniqueKeyRule = new TemporalJoinRewriteWithUniqueKeyRule -}
