This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 6a12668bcfe [FLINK-34493][table] Migrate ReplaceMinusWithAntiJoinRule to java. 6a12668bcfe is described below commit 6a12668bcfe651fa938517eb2da4d537ce6ce668 Author: liuyongvs <liuyon...@gmail.com> AuthorDate: Fri Mar 1 16:08:52 2024 +0800 [FLINK-34493][table] Migrate ReplaceMinusWithAntiJoinRule to java. --- .../logical/ReplaceMinusWithAntiJoinRule.java | 95 ++++++++++++++++++++++ .../logical/ReplaceMinusWithAntiJoinRule.scala | 65 --------------- 2 files changed, 95 insertions(+), 65 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRule.java new file mode 100644 index 00000000000..35c719e3846 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRule.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.Minus; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.Util; +import org.immutables.value.Value; + +import java.util.List; + +import static org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil.generateEqualsCondition; + +/** + * Planner rule that replaces distinct {@link org.apache.calcite.rel.core.Minus} (SQL keyword: + * EXCEPT) with a distinct {@link org.apache.calcite.rel.core.Aggregate} on an ANTI {@link + * org.apache.calcite.rel.core.Join}. + * + * <p>Only handle the case of input size 2. + */ +@Value.Enclosing +public class ReplaceMinusWithAntiJoinRule + extends RelRule<ReplaceMinusWithAntiJoinRule.ReplaceMinusWithAntiJoinRuleConfig> { + + public static final ReplaceMinusWithAntiJoinRule INSTANCE = + ReplaceMinusWithAntiJoinRule.ReplaceMinusWithAntiJoinRuleConfig.DEFAULT.toRule(); + + private ReplaceMinusWithAntiJoinRule(ReplaceMinusWithAntiJoinRuleConfig config) { + super(config); + } + + @Override + public boolean matches(RelOptRuleCall call) { + Minus minus = call.rel(0); + return !minus.all && minus.getInputs().size() == 2; + } + + @Override + public void onMatch(RelOptRuleCall call) { + Minus minus = call.rel(0); + RelNode left = minus.getInput(0); + RelNode right = minus.getInput(1); + + RelBuilder relBuilder = call.builder(); + List<Integer> keys = Util.range(left.getRowType().getFieldCount()); + List<RexNode> conditions = generateEqualsCondition(relBuilder, left, right, keys); + + relBuilder.push(left); + relBuilder.push(right); + relBuilder + .join(JoinRelType.ANTI, conditions) + .aggregate( + relBuilder.groupKey(keys.stream().mapToInt(Integer::intValue).toArray())); + RelNode rel = relBuilder.build(); + call.transformTo(rel); + } + + /** Rule configuration. */ + @Value.Immutable(singleton = false) + public interface ReplaceMinusWithAntiJoinRuleConfig extends RelRule.Config { + ReplaceMinusWithAntiJoinRule.ReplaceMinusWithAntiJoinRuleConfig DEFAULT = + ImmutableReplaceMinusWithAntiJoinRule.ReplaceMinusWithAntiJoinRuleConfig.builder() + .build() + .withOperandSupplier(b0 -> b0.operand(Minus.class).anyInputs()) + .withRelBuilderFactory(RelFactories.LOGICAL_BUILDER) + .withDescription("ReplaceMinusWithAntiJoinRule"); + + @Override + default ReplaceMinusWithAntiJoinRule toRule() { + return new ReplaceMinusWithAntiJoinRule(this); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRule.scala deleted file mode 100644 index 0b080b86391..00000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRule.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.rules.logical - -import org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil.generateEqualsCondition - -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.calcite.plan.RelOptRule.{any, operand} -import org.apache.calcite.rel.core._ - -import scala.collection.JavaConverters._ - -/** - * Planner rule that replaces distinct [[Minus]] (SQL keyword: EXCEPT) with a distinct [[Aggregate]] - * on an ANTI [[Join]]. - * - * Only handle the case of input size 2. - */ -class ReplaceMinusWithAntiJoinRule - extends RelOptRule( - operand(classOf[Minus], any), - RelFactories.LOGICAL_BUILDER, - "ReplaceMinusWithAntiJoinRule") { - - override def matches(call: RelOptRuleCall): Boolean = { - val minus: Minus = call.rel(0) - !minus.all && minus.getInputs.size() == 2 - } - - override def onMatch(call: RelOptRuleCall): Unit = { - val minus: Minus = call.rel(0) - val left = minus.getInput(0) - val right = minus.getInput(1) - - val relBuilder = call.builder - val keys = 0 until left.getRowType.getFieldCount - val conditions = - generateEqualsCondition(relBuilder, left, right, keys.map(Integer.valueOf).toList.asJava) - - relBuilder.push(left) - relBuilder.push(right) - relBuilder.join(JoinRelType.ANTI, conditions).aggregate(relBuilder.groupKey(keys: _*)) - val rel = relBuilder.build() - call.transformTo(rel) - } -} - -object ReplaceMinusWithAntiJoinRule { - val INSTANCE: RelOptRule = new ReplaceMinusWithAntiJoinRule -}