Repository: flink Updated Branches: refs/heads/master 85793a25a -> fdf436099
[revert] [FLINK-3944] [tableAPI] Reverts "Add rewrite rules to reorder Cartesian products and joins." This reverts commit 85793a25a78ba5be96fabc2a26569318c6b53853. Added rewrite rules blow up search space which cannot be effectively pruned without cardinality estimates. This closes #2098 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0b9e8d3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0b9e8d3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0b9e8d3 Branch: refs/heads/master Commit: e0b9e8d3fef724a8785d51007174dc1e684e5f28 Parents: 85793a2 Author: Fabian Hueske <[email protected]> Authored: Mon Jun 13 18:18:42 2016 +0200 Committer: Fabian Hueske <[email protected]> Committed: Tue Jun 14 15:05:11 2016 +0200 ---------------------------------------------------------------------- docs/apis/table.md | 2 +- .../api/table/plan/rules/FlinkRuleSets.scala | 3 - .../plan/rules/dataSet/CrossCommuteRule.scala | 42 ----- .../rules/dataSet/JoinCrossAssociateRule.scala | 154 ------------------- .../api/scala/batch/table/JoinITCase.scala | 19 --- 5 files changed, 1 insertion(+), 219 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e0b9e8d3/docs/apis/table.md ---------------------------------------------------------------------- diff --git a/docs/apis/table.md b/docs/apis/table.md index 5b5b7f0..1b25099 100644 --- a/docs/apis/table.md +++ b/docs/apis/table.md @@ -788,7 +788,7 @@ Among others, the following SQL features are not supported, yet: - Grouping sets - `INTERSECT` and `EXCEPT` set operations -*Note: Tables are joined in the order in which they are specified in the `FROM` clause. Join orders that include Cartesian products are possibly reordered to resolve the Cartesian products into inner equi-joins. Please specify join orders without Cartesian products to avoid such reorderings. +*Note: Tables are joined in the order in which they are specified in the `FROM` clause. In some cases the table order must be manually tweaked to resolve Cartesian products.* ### SQL on Streaming Tables http://git-wip-us.apache.org/repos/asf/flink/blob/e0b9e8d3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala index cc8fd40..a2ec08d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala @@ -60,9 +60,6 @@ object FlinkRuleSets { // join rules JoinPushExpressionsRule.INSTANCE, - // reorder Joins and Cross products to resolve join orders with Cartesian products - JoinCrossAssociateRule.INSTANCE, - CrossCommuteRule.INSTANCE, // remove union with only a single child UnionEliminatorRule.INSTANCE, http://git-wip-us.apache.org/repos/asf/flink/blob/e0b9e8d3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/CrossCommuteRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/CrossCommuteRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/CrossCommuteRule.scala deleted file mode 100644 index 36a083a..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/CrossCommuteRule.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.dataSet - -import org.apache.calcite.plan.RelOptRuleCall -import org.apache.calcite.rel.core.RelFactories -import org.apache.calcite.rel.logical.LogicalJoin -import org.apache.calcite.rel.rules.JoinCommuteRule - -class CrossCommuteRule(swapOuter: Boolean) extends JoinCommuteRule( - classOf[LogicalJoin], - RelFactories.LOGICAL_BUILDER, - swapOuter) { - - override def matches(call: RelOptRuleCall): Boolean = { - - val join = call.rel(0).asInstanceOf[LogicalJoin] - - // check if Join is a Cartesian product - join.getCondition.isAlwaysTrue - } -} - -object CrossCommuteRule { - val INSTANCE = new CrossCommuteRule(true) -} http://git-wip-us.apache.org/repos/asf/flink/blob/e0b9e8d3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/JoinCrossAssociateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/JoinCrossAssociateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/JoinCrossAssociateRule.scala deleted file mode 100644 index 4f1051a..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/JoinCrossAssociateRule.scala +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.dataSet - -import org.apache.calcite.plan.RelOptRule.{any, operand} -import org.apache.calcite.plan.{RelOptUtil, RelOptRule, RelOptRuleCall} -import org.apache.calcite.plan.volcano.RelSubset -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.core.{JoinRelType, Join} -import org.apache.calcite.rel.logical.LogicalJoin -import org.apache.calcite.rex.{RexUtil, RexNode, RexPermuteInputsShuttle} -import org.apache.calcite.util.ImmutableBitSet -import org.apache.calcite.util.mapping.Mappings - -import scala.collection.JavaConverters._ -import scala.collection.mutable - -/** - * Based on Apache Calcite's JoinAssociateRule. - * JoinAssociateRule could not be extended because it does not provide a public constructor. - * - * - onMatch() is the same as in JoinAssociateRule (except for porting Java to Scala code) - * - matches() was added to restrict the rule to only switch if at least one join is a - * Cartesian product. - * - */ -class JoinCrossAssociateRule extends RelOptRule( - operand(classOf[Join], operand(classOf[Join], any), operand(classOf[RelSubset], any))) { - - override def matches(call: RelOptRuleCall): Boolean = { - - val left = call.rel(0).asInstanceOf[LogicalJoin] - val right = call.rel(1).asInstanceOf[LogicalJoin] - - // check if either Join is a Cartesian product - left.getCondition.isAlwaysTrue || right.getCondition.isAlwaysTrue - } - - def onMatch(call: RelOptRuleCall) { - - val topJoin = call.rel(0).asInstanceOf[Join] - val bottomJoin = call.rel(1).asInstanceOf[Join] - val relA: RelNode = bottomJoin.getLeft - val relB: RelNode = bottomJoin.getRight - val relC = call.rel(2).asInstanceOf[RelSubset] - val cluster = topJoin.getCluster - val rexBuilder = cluster.getRexBuilder - - if (relC.getConvention != relA.getConvention) { - // relC could have any trait-set. But if we're matching say - // EnumerableConvention, we're only interested in enumerable subsets. - return - } - - val aCount = relA.getRowType.getFieldCount - val bCount = relB.getRowType.getFieldCount - val cCount = relC.getRowType.getFieldCount - val aBitSet: ImmutableBitSet = ImmutableBitSet.range(0, aCount) - val bBitSet: ImmutableBitSet = ImmutableBitSet.range(aCount, aCount + bCount) - if (!topJoin.getSystemFieldList.isEmpty) { - // FIXME Enable this rule for joins with system fields - return - } - - // If either join is not inner, we cannot proceed. - // (Is this too strict?) - if (topJoin.getJoinType != JoinRelType.INNER || bottomJoin.getJoinType != JoinRelType.INNER) { - return - } - - // Goal is to transform to - // - // newTopJoin - // / \ - // A newBottomJoin - // / \ - // B C - - // Split the condition of topJoin and bottomJoin into a conjunctions. A - // condition can be pushed down if it does not use columns from A. - val top = new mutable.ArrayBuffer[RexNode]().asJava - val bottom = new mutable.ArrayBuffer[RexNode]().asJava - split(topJoin.getCondition, aBitSet, top, bottom) - split(bottomJoin.getCondition, aBitSet, top, bottom) - - // Mapping for moving conditions from topJoin or bottomJoin to - // newBottomJoin. - // target: | B | C | - // source: | A | B | C | - val bottomMapping: Mappings.TargetMapping = Mappings.createShiftMapping( - aCount + bCount + cCount, 0, aCount, bCount, bCount, aCount + bCount, cCount) - val newBottomList = new mutable.ArrayBuffer[RexNode]().asJava - new RexPermuteInputsShuttle(bottomMapping, relB, relC) - .visitList(bottom, newBottomList) - val newBottomCondition: RexNode = - RexUtil.composeConjunction(rexBuilder, newBottomList, false) - - val newBottomJoin: Join = bottomJoin.copy( - bottomJoin.getTraitSet, newBottomCondition, relB, relC, JoinRelType.INNER, false) - - // Condition for newTopJoin consists of pieces from bottomJoin and topJoin. - // Field ordinals do not need to be changed. - val newTopCondition: RexNode = RexUtil.composeConjunction(rexBuilder, top, false) - val newTopJoin: Join = topJoin.copy( - topJoin.getTraitSet, newTopCondition, relA, newBottomJoin, JoinRelType.INNER, false) - - call.transformTo(newTopJoin) - } - - /** - * Copied from Calcite's JoinPushThroughJoinRule - * - * Splits a condition into conjunctions that do or do not intersect with - * a given bit set. - */ - private def split( - condition: RexNode, - bitSet: ImmutableBitSet, - intersecting: java.util.List[RexNode], - nonIntersecting: java.util.List[RexNode]) { - - import scala.collection.JavaConversions._ - for (node <- RelOptUtil.conjunctions(condition)) { - val inputBitSet: ImmutableBitSet = RelOptUtil.InputFinder.bits(node) - if (bitSet.intersects(inputBitSet)) { - intersecting.add(node) - } - else { - nonIntersecting.add(node) - } - } - } - -} - -object JoinCrossAssociateRule { - val INSTANCE = new JoinCrossAssociateRule() -} http://git-wip-us.apache.org/repos/asf/flink/blob/e0b9e8d3/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala index bcc2bef..db629e6 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala @@ -374,23 +374,4 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test - def testJoinOrderWithCross(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a1, 'b1, 'c1) - val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a2, 'b2, 'c2) - val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a3, 'b3, 'c3) - - val joinT = ds1.join(ds2).join(ds3) - .where('a1 === 'a3) - .where('b2 === 'b3) - .select('a1, 'b2) - - val expected = "1,1\n" + "2,2\n" + "2,2\n" + "3,2\n" + "3,2\n" - val results = joinT.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - }
