Repository: flink
Updated Branches:
  refs/heads/master 85793a25a -> fdf436099


[revert] [FLINK-3944] [tableAPI] Reverts "Add rewrite rules to reorder 
Cartesian products and joins."

This reverts commit 85793a25a78ba5be96fabc2a26569318c6b53853.
Added rewrite rules blow up search space which cannot be effectively pruned 
without cardinality estimates.

This closes #2098


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0b9e8d3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0b9e8d3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0b9e8d3

Branch: refs/heads/master
Commit: e0b9e8d3fef724a8785d51007174dc1e684e5f28
Parents: 85793a2
Author: Fabian Hueske <[email protected]>
Authored: Mon Jun 13 18:18:42 2016 +0200
Committer: Fabian Hueske <[email protected]>
Committed: Tue Jun 14 15:05:11 2016 +0200

----------------------------------------------------------------------
 docs/apis/table.md                              |   2 +-
 .../api/table/plan/rules/FlinkRuleSets.scala    |   3 -
 .../plan/rules/dataSet/CrossCommuteRule.scala   |  42 -----
 .../rules/dataSet/JoinCrossAssociateRule.scala  | 154 -------------------
 .../api/scala/batch/table/JoinITCase.scala      |  19 ---
 5 files changed, 1 insertion(+), 219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0b9e8d3/docs/apis/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/table.md b/docs/apis/table.md
index 5b5b7f0..1b25099 100644
--- a/docs/apis/table.md
+++ b/docs/apis/table.md
@@ -788,7 +788,7 @@ Among others, the following SQL features are not supported, 
yet:
 - Grouping sets
 - `INTERSECT` and `EXCEPT` set operations
 
-*Note: Tables are joined in the order in which they are specified in the 
`FROM` clause. Join orders that include Cartesian products are possibly 
reordered to resolve the Cartesian products into inner equi-joins. Please 
specify join orders without Cartesian products to avoid such reorderings.
+*Note: Tables are joined in the order in which they are specified in the 
`FROM` clause. In some cases the table order must be manually tweaked to 
resolve Cartesian products.* 
 
 ### SQL on Streaming Tables
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0b9e8d3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index cc8fd40..a2ec08d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -60,9 +60,6 @@ object FlinkRuleSets {
 
     // join rules
     JoinPushExpressionsRule.INSTANCE,
-    // reorder Joins and Cross products to resolve join orders with Cartesian 
products
-    JoinCrossAssociateRule.INSTANCE,
-    CrossCommuteRule.INSTANCE,
 
     // remove union with only a single child
     UnionEliminatorRule.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/e0b9e8d3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/CrossCommuteRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/CrossCommuteRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/CrossCommuteRule.scala
deleted file mode 100644
index 36a083a..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/CrossCommuteRule.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.RelOptRuleCall
-import org.apache.calcite.rel.core.RelFactories
-import org.apache.calcite.rel.logical.LogicalJoin
-import org.apache.calcite.rel.rules.JoinCommuteRule
-
-class CrossCommuteRule(swapOuter: Boolean) extends JoinCommuteRule(
-    classOf[LogicalJoin],
-    RelFactories.LOGICAL_BUILDER,
-    swapOuter) {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-
-    val join = call.rel(0).asInstanceOf[LogicalJoin]
-
-    // check if Join is a Cartesian product
-    join.getCondition.isAlwaysTrue
-  }
-}
-
-object CrossCommuteRule {
-  val INSTANCE = new CrossCommuteRule(true)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0b9e8d3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/JoinCrossAssociateRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/JoinCrossAssociateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/JoinCrossAssociateRule.scala
deleted file mode 100644
index 4f1051a..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/JoinCrossAssociateRule.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.plan.{RelOptUtil, RelOptRule, RelOptRuleCall}
-import org.apache.calcite.plan.volcano.RelSubset
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.{JoinRelType, Join}
-import org.apache.calcite.rel.logical.LogicalJoin
-import org.apache.calcite.rex.{RexUtil, RexNode, RexPermuteInputsShuttle}
-import org.apache.calcite.util.ImmutableBitSet
-import org.apache.calcite.util.mapping.Mappings
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-/**
-  * Based on Apache Calcite's JoinAssociateRule.
-  * JoinAssociateRule could not be extended because it does not provide a 
public constructor.
-  *
-  * - onMatch() is the same as in JoinAssociateRule (except for porting Java 
to Scala code)
-  * - matches() was added to restrict the rule to only switch if at least one 
join is a
-  *     Cartesian product.
-  *
-  */
-class JoinCrossAssociateRule extends RelOptRule(
-  operand(classOf[Join], operand(classOf[Join], any), 
operand(classOf[RelSubset], any))) {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-
-    val left = call.rel(0).asInstanceOf[LogicalJoin]
-    val right = call.rel(1).asInstanceOf[LogicalJoin]
-
-    // check if either Join is a Cartesian product
-    left.getCondition.isAlwaysTrue || right.getCondition.isAlwaysTrue
-  }
-
-  def onMatch(call: RelOptRuleCall) {
-
-    val topJoin = call.rel(0).asInstanceOf[Join]
-    val bottomJoin = call.rel(1).asInstanceOf[Join]
-    val relA: RelNode = bottomJoin.getLeft
-    val relB: RelNode = bottomJoin.getRight
-    val relC = call.rel(2).asInstanceOf[RelSubset]
-    val cluster = topJoin.getCluster
-    val rexBuilder = cluster.getRexBuilder
-
-    if (relC.getConvention != relA.getConvention) {
-      // relC could have any trait-set. But if we're matching say
-      // EnumerableConvention, we're only interested in enumerable subsets.
-      return
-    }
-
-    val aCount = relA.getRowType.getFieldCount
-    val bCount = relB.getRowType.getFieldCount
-    val cCount = relC.getRowType.getFieldCount
-    val aBitSet: ImmutableBitSet = ImmutableBitSet.range(0, aCount)
-    val bBitSet: ImmutableBitSet = ImmutableBitSet.range(aCount, aCount + 
bCount)
-    if (!topJoin.getSystemFieldList.isEmpty) {
-      // FIXME Enable this rule for joins with system fields
-      return
-    }
-
-    // If either join is not inner, we cannot proceed.
-    // (Is this too strict?)
-    if (topJoin.getJoinType != JoinRelType.INNER || bottomJoin.getJoinType != 
JoinRelType.INNER) {
-      return
-    }
-
-    // Goal is to transform to
-    //
-    //       newTopJoin
-    //        /     \
-    //       A   newBottomJoin
-    //               /    \
-    //              B      C
-
-    // Split the condition of topJoin and bottomJoin into a conjunctions. A
-    // condition can be pushed down if it does not use columns from A.
-    val top = new mutable.ArrayBuffer[RexNode]().asJava
-    val bottom = new mutable.ArrayBuffer[RexNode]().asJava
-    split(topJoin.getCondition, aBitSet, top, bottom)
-    split(bottomJoin.getCondition, aBitSet, top, bottom)
-
-    // Mapping for moving conditions from topJoin or bottomJoin to
-    // newBottomJoin.
-    // target: | B | C      |
-    // source: | A       | B | C      |
-    val bottomMapping: Mappings.TargetMapping = Mappings.createShiftMapping(
-      aCount + bCount + cCount, 0, aCount, bCount, bCount, aCount + bCount, 
cCount)
-    val newBottomList = new mutable.ArrayBuffer[RexNode]().asJava
-    new RexPermuteInputsShuttle(bottomMapping, relB, relC)
-      .visitList(bottom, newBottomList)
-    val newBottomCondition: RexNode =
-      RexUtil.composeConjunction(rexBuilder, newBottomList, false)
-
-    val newBottomJoin: Join = bottomJoin.copy(
-      bottomJoin.getTraitSet, newBottomCondition, relB, relC, 
JoinRelType.INNER, false)
-
-    // Condition for newTopJoin consists of pieces from bottomJoin and topJoin.
-    // Field ordinals do not need to be changed.
-    val newTopCondition: RexNode = RexUtil.composeConjunction(rexBuilder, top, 
false)
-    val newTopJoin: Join = topJoin.copy(
-      topJoin.getTraitSet, newTopCondition, relA, newBottomJoin, 
JoinRelType.INNER, false)
-
-    call.transformTo(newTopJoin)
-  }
-
-  /**
-    * Copied from Calcite's JoinPushThroughJoinRule
-    *
-    * Splits a condition into conjunctions that do or do not intersect with
-    * a given bit set.
-    */
-  private def split(
-      condition: RexNode,
-      bitSet: ImmutableBitSet,
-      intersecting: java.util.List[RexNode],
-      nonIntersecting: java.util.List[RexNode]) {
-
-    import scala.collection.JavaConversions._
-    for (node <- RelOptUtil.conjunctions(condition)) {
-      val inputBitSet: ImmutableBitSet = RelOptUtil.InputFinder.bits(node)
-      if (bitSet.intersects(inputBitSet)) {
-        intersecting.add(node)
-      }
-      else {
-        nonIntersecting.add(node)
-      }
-    }
-  }
-
-}
-
-object JoinCrossAssociateRule {
-  val INSTANCE = new JoinCrossAssociateRule()
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0b9e8d3/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
index bcc2bef..db629e6 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
@@ -374,23 +374,4 @@ class JoinITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode)
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
-  def testJoinOrderWithCross(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a1, 
'b1, 'c1)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a2, 
'b2, 'c2)
-    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a3, 
'b3, 'c3)
-
-    val joinT = ds1.join(ds2).join(ds3)
-      .where('a1 === 'a3)
-      .where('b2 === 'b3)
-      .select('a1, 'b2)
-
-    val expected = "1,1\n" + "2,2\n" + "2,2\n" + "3,2\n" + "3,2\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
 }

Reply via email to