Repository: flink
Updated Branches:
  refs/heads/master 2bffd7b9e -> 85793a25a


[FLINK-3944] [tableAPI] Add rewrite rules to reorder Cartesian products and 
joins.

These rules are necessary to resolve join orders that initially contain 
Cartesian products
due to the order in which base relations are added in the FROM clause (SQL) or 
joined (Table API).

This closes #2044


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/85793a25
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/85793a25
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/85793a25

Branch: refs/heads/master
Commit: 85793a25a78ba5be96fabc2a26569318c6b53853
Parents: 2bffd7b
Author: Fabian Hueske <[email protected]>
Authored: Fri May 27 13:04:12 2016 +0200
Committer: Fabian Hueske <[email protected]>
Committed: Fri Jun 10 19:49:40 2016 +0200

----------------------------------------------------------------------
 docs/apis/table.md                              |   2 +-
 .../api/table/plan/rules/FlinkRuleSets.scala    |   3 +
 .../plan/rules/dataSet/CrossCommuteRule.scala   |  42 +++++
 .../rules/dataSet/JoinCrossAssociateRule.scala  | 154 +++++++++++++++++++
 .../api/scala/batch/table/JoinITCase.scala      |  19 +++
 5 files changed, 219 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/85793a25/docs/apis/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/table.md b/docs/apis/table.md
index 1b25099..5b5b7f0 100644
--- a/docs/apis/table.md
+++ b/docs/apis/table.md
@@ -788,7 +788,7 @@ Among others, the following SQL features are not supported, 
yet:
 - Grouping sets
 - `INTERSECT` and `EXCEPT` set operations
 
-*Note: Tables are joined in the order in which they are specified in the 
`FROM` clause. In some cases the table order must be manually tweaked to 
resolve Cartesian products.* 
+*Note: Tables are joined in the order in which they are specified in the 
`FROM` clause. Join orders that include Cartesian products are possibly 
reordered to resolve the Cartesian products into inner equi-joins. Please 
specify join orders without Cartesian products to avoid such reorderings.
 
 ### SQL on Streaming Tables
 

http://git-wip-us.apache.org/repos/asf/flink/blob/85793a25/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index a2ec08d..cc8fd40 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -60,6 +60,9 @@ object FlinkRuleSets {
 
     // join rules
     JoinPushExpressionsRule.INSTANCE,
+    // reorder Joins and Cross products to resolve join orders with Cartesian 
products
+    JoinCrossAssociateRule.INSTANCE,
+    CrossCommuteRule.INSTANCE,
 
     // remove union with only a single child
     UnionEliminatorRule.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/85793a25/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/CrossCommuteRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/CrossCommuteRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/CrossCommuteRule.scala
new file mode 100644
index 0000000..36a083a
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/CrossCommuteRule.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.RelOptRuleCall
+import org.apache.calcite.rel.core.RelFactories
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rel.rules.JoinCommuteRule
+
+class CrossCommuteRule(swapOuter: Boolean) extends JoinCommuteRule(
+    classOf[LogicalJoin],
+    RelFactories.LOGICAL_BUILDER,
+    swapOuter) {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+
+    val join = call.rel(0).asInstanceOf[LogicalJoin]
+
+    // check if Join is a Cartesian product
+    join.getCondition.isAlwaysTrue
+  }
+}
+
+object CrossCommuteRule {
+  val INSTANCE = new CrossCommuteRule(true)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/85793a25/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/JoinCrossAssociateRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/JoinCrossAssociateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/JoinCrossAssociateRule.scala
new file mode 100644
index 0000000..4f1051a
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/JoinCrossAssociateRule.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptUtil, RelOptRule, RelOptRuleCall}
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{JoinRelType, Join}
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rex.{RexUtil, RexNode, RexPermuteInputsShuttle}
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.calcite.util.mapping.Mappings
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+  * Based on Apache Calcite's JoinAssociateRule.
+  * JoinAssociateRule could not be extended because it does not provide a 
public constructor.
+  *
+  * - onMatch() is the same as in JoinAssociateRule (except for porting Java 
to Scala code)
+  * - matches() was added to restrict the rule to only switch if at least one 
join is a
+  *     Cartesian product.
+  *
+  */
+class JoinCrossAssociateRule extends RelOptRule(
+  operand(classOf[Join], operand(classOf[Join], any), 
operand(classOf[RelSubset], any))) {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+
+    val left = call.rel(0).asInstanceOf[LogicalJoin]
+    val right = call.rel(1).asInstanceOf[LogicalJoin]
+
+    // check if either Join is a Cartesian product
+    left.getCondition.isAlwaysTrue || right.getCondition.isAlwaysTrue
+  }
+
+  def onMatch(call: RelOptRuleCall) {
+
+    val topJoin = call.rel(0).asInstanceOf[Join]
+    val bottomJoin = call.rel(1).asInstanceOf[Join]
+    val relA: RelNode = bottomJoin.getLeft
+    val relB: RelNode = bottomJoin.getRight
+    val relC = call.rel(2).asInstanceOf[RelSubset]
+    val cluster = topJoin.getCluster
+    val rexBuilder = cluster.getRexBuilder
+
+    if (relC.getConvention != relA.getConvention) {
+      // relC could have any trait-set. But if we're matching say
+      // EnumerableConvention, we're only interested in enumerable subsets.
+      return
+    }
+
+    val aCount = relA.getRowType.getFieldCount
+    val bCount = relB.getRowType.getFieldCount
+    val cCount = relC.getRowType.getFieldCount
+    val aBitSet: ImmutableBitSet = ImmutableBitSet.range(0, aCount)
+    val bBitSet: ImmutableBitSet = ImmutableBitSet.range(aCount, aCount + 
bCount)
+    if (!topJoin.getSystemFieldList.isEmpty) {
+      // FIXME Enable this rule for joins with system fields
+      return
+    }
+
+    // If either join is not inner, we cannot proceed.
+    // (Is this too strict?)
+    if (topJoin.getJoinType != JoinRelType.INNER || bottomJoin.getJoinType != 
JoinRelType.INNER) {
+      return
+    }
+
+    // Goal is to transform to
+    //
+    //       newTopJoin
+    //        /     \
+    //       A   newBottomJoin
+    //               /    \
+    //              B      C
+
+    // Split the condition of topJoin and bottomJoin into a conjunctions. A
+    // condition can be pushed down if it does not use columns from A.
+    val top = new mutable.ArrayBuffer[RexNode]().asJava
+    val bottom = new mutable.ArrayBuffer[RexNode]().asJava
+    split(topJoin.getCondition, aBitSet, top, bottom)
+    split(bottomJoin.getCondition, aBitSet, top, bottom)
+
+    // Mapping for moving conditions from topJoin or bottomJoin to
+    // newBottomJoin.
+    // target: | B | C      |
+    // source: | A       | B | C      |
+    val bottomMapping: Mappings.TargetMapping = Mappings.createShiftMapping(
+      aCount + bCount + cCount, 0, aCount, bCount, bCount, aCount + bCount, 
cCount)
+    val newBottomList = new mutable.ArrayBuffer[RexNode]().asJava
+    new RexPermuteInputsShuttle(bottomMapping, relB, relC)
+      .visitList(bottom, newBottomList)
+    val newBottomCondition: RexNode =
+      RexUtil.composeConjunction(rexBuilder, newBottomList, false)
+
+    val newBottomJoin: Join = bottomJoin.copy(
+      bottomJoin.getTraitSet, newBottomCondition, relB, relC, 
JoinRelType.INNER, false)
+
+    // Condition for newTopJoin consists of pieces from bottomJoin and topJoin.
+    // Field ordinals do not need to be changed.
+    val newTopCondition: RexNode = RexUtil.composeConjunction(rexBuilder, top, 
false)
+    val newTopJoin: Join = topJoin.copy(
+      topJoin.getTraitSet, newTopCondition, relA, newBottomJoin, 
JoinRelType.INNER, false)
+
+    call.transformTo(newTopJoin)
+  }
+
+  /**
+    * Copied from Calcite's JoinPushThroughJoinRule
+    *
+    * Splits a condition into conjunctions that do or do not intersect with
+    * a given bit set.
+    */
+  private def split(
+      condition: RexNode,
+      bitSet: ImmutableBitSet,
+      intersecting: java.util.List[RexNode],
+      nonIntersecting: java.util.List[RexNode]) {
+
+    import scala.collection.JavaConversions._
+    for (node <- RelOptUtil.conjunctions(condition)) {
+      val inputBitSet: ImmutableBitSet = RelOptUtil.InputFinder.bits(node)
+      if (bitSet.intersects(inputBitSet)) {
+        intersecting.add(node)
+      }
+      else {
+        nonIntersecting.add(node)
+      }
+    }
+  }
+
+}
+
+object JoinCrossAssociateRule {
+  val INSTANCE = new JoinCrossAssociateRule()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/85793a25/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
index db629e6..bcc2bef 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
@@ -374,4 +374,23 @@ class JoinITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode)
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testJoinOrderWithCross(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a1, 
'b1, 'c1)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a2, 
'b2, 'c2)
+    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a3, 
'b3, 'c3)
+
+    val joinT = ds1.join(ds2).join(ds3)
+      .where('a1 === 'a3)
+      .where('b2 === 'b3)
+      .select('a1, 'b2)
+
+    val expected = "1,1\n" + "2,2\n" + "2,2\n" + "3,2\n" + "3,2\n"
+    val results = joinT.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
 }

Reply via email to