Repository: spark Updated Branches: refs/heads/master 12206058e -> 4000f128b
[SPARK-20231][SQL] Refactor star schema code for the subsequent star join detection in CBO ## What changes were proposed in this pull request? This commit moves star schema code from ```join.scala``` to ```StarSchemaDetection.scala```. It also applies some minor fixes in ```StarJoinReorderSuite.scala```. ## How was this patch tested? Run existing ```StarJoinReorderSuite.scala```. Author: Ioana Delaney <ioanamdela...@gmail.com> Closes #17544 from ioana-delaney/starSchemaCBOv2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4000f128 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4000f128 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4000f128 Branch: refs/heads/master Commit: 4000f128b7101484ba618115504ca916c22fa84a Parents: 1220605 Author: Ioana Delaney <ioanamdela...@gmail.com> Authored: Wed Apr 5 18:02:53 2017 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Wed Apr 5 18:02:53 2017 -0700 ---------------------------------------------------------------------- .../optimizer/StarSchemaDetection.scala | 351 +++++++++++++++++++ .../spark/sql/catalyst/optimizer/joins.scala | 328 +---------------- .../optimizer/StarJoinReorderSuite.scala | 4 +- 3 files changed, 354 insertions(+), 329 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4000f128/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala new file mode 100644 index 0000000..91cb004 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import scala.annotation.tailrec + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.internal.SQLConf + +/** + * Encapsulates star-schema detection logic. + */ +case class StarSchemaDetection(conf: SQLConf) extends PredicateHelper { + + /** + * Star schema consists of one or more fact tables referencing a number of dimension + * tables. In general, star-schema joins are detected using the following conditions: + * 1. Informational RI constraints (reliable detection) + * + Dimension contains a primary key that is being joined to the fact table. + * + Fact table contains foreign keys referencing multiple dimension tables. + * 2. Cardinality based heuristics + * + Usually, the table with the highest cardinality is the fact table. + * + Table being joined with the most number of tables is the fact table. + * + * To detect star joins, the algorithm uses a combination of the above two conditions. + * The fact table is chosen based on the cardinality heuristics, and the dimension + * tables are chosen based on the RI constraints. A star join will consist of the largest + * fact table joined with the dimension tables on their primary keys. To detect that a + * column is a primary key, the algorithm uses table and column statistics. + * + * The algorithm currently returns only the star join with the largest fact table. + * Choosing the largest fact table on the driving arm to avoid large inners is in + * general a good heuristic. This restriction will be lifted to observe multiple + * star joins. + * + * The highlights of the algorithm are the following: + * + * Given a set of joined tables/plans, the algorithm first verifies if they are eligible + * for star join detection. An eligible plan is a base table access with valid statistics. + * A base table access represents Project or Filter operators above a LeafNode. Conservatively, + * the algorithm only considers base table access as part of a star join since they provide + * reliable statistics. This restriction can be lifted with the CBO enablement by default. + * + * If some of the plans are not base table access, or statistics are not available, the algorithm + * returns an empty star join plan since, in the absence of statistics, it cannot make + * good planning decisions. Otherwise, the algorithm finds the table with the largest cardinality + * (number of rows), which is assumed to be a fact table. + * + * Next, it computes the set of dimension tables for the current fact table. A dimension table + * is assumed to be in a RI relationship with a fact table. To infer column uniqueness, + * the algorithm compares the number of distinct values with the total number of rows in the + * table. If their relative difference is within certain limits (i.e. ndvMaxError * 2, adjusted + * based on 1TB TPC-DS data), the column is assumed to be unique. + */ + def findStarJoins( + input: Seq[LogicalPlan], + conditions: Seq[Expression]): Seq[LogicalPlan] = { + + val emptyStarJoinPlan = Seq.empty[LogicalPlan] + + if (!conf.starSchemaDetection || input.size < 2) { + emptyStarJoinPlan + } else { + // Find if the input plans are eligible for star join detection. + // An eligible plan is a base table access with valid statistics. + val foundEligibleJoin = input.forall { + case PhysicalOperation(_, _, t: LeafNode) if t.stats(conf).rowCount.isDefined => true + case _ => false + } + + if (!foundEligibleJoin) { + // Some plans don't have stats or are complex plans. Conservatively, + // return an empty star join. This restriction can be lifted + // once statistics are propagated in the plan. + emptyStarJoinPlan + } else { + // Find the fact table using cardinality based heuristics i.e. + // the table with the largest number of rows. + val sortedFactTables = input.map { plan => + TableAccessCardinality(plan, getTableAccessCardinality(plan)) + }.collect { case t @ TableAccessCardinality(_, Some(_)) => + t + }.sortBy(_.size)(implicitly[Ordering[Option[BigInt]]].reverse) + + sortedFactTables match { + case Nil => + emptyStarJoinPlan + case table1 :: table2 :: _ + if table2.size.get.toDouble > conf.starSchemaFTRatio * table1.size.get.toDouble => + // If the top largest tables have comparable number of rows, return an empty star plan. + // This restriction will be lifted when the algorithm is generalized + // to return multiple star plans. + emptyStarJoinPlan + case TableAccessCardinality(factTable, _) :: rest => + // Find the fact table joins. + val allFactJoins = rest.collect { case TableAccessCardinality(plan, _) + if findJoinConditions(factTable, plan, conditions).nonEmpty => + plan + } + + // Find the corresponding join conditions. + val allFactJoinCond = allFactJoins.flatMap { plan => + val joinCond = findJoinConditions(factTable, plan, conditions) + joinCond + } + + // Verify if the join columns have valid statistics. + // Allow any relational comparison between the tables. Later + // we will heuristically choose a subset of equi-join + // tables. + val areStatsAvailable = allFactJoins.forall { dimTable => + allFactJoinCond.exists { + case BinaryComparison(lhs: AttributeReference, rhs: AttributeReference) => + val dimCol = if (dimTable.outputSet.contains(lhs)) lhs else rhs + val factCol = if (factTable.outputSet.contains(lhs)) lhs else rhs + hasStatistics(dimCol, dimTable) && hasStatistics(factCol, factTable) + case _ => false + } + } + + if (!areStatsAvailable) { + emptyStarJoinPlan + } else { + // Find the subset of dimension tables. A dimension table is assumed to be in a + // RI relationship with the fact table. Only consider equi-joins + // between a fact and a dimension table to avoid expanding joins. + val eligibleDimPlans = allFactJoins.filter { dimTable => + allFactJoinCond.exists { + case cond @ Equality(lhs: AttributeReference, rhs: AttributeReference) => + val dimCol = if (dimTable.outputSet.contains(lhs)) lhs else rhs + isUnique(dimCol, dimTable) + case _ => false + } + } + + if (eligibleDimPlans.isEmpty || eligibleDimPlans.size < 2) { + // An eligible star join was not found since the join is not + // an RI join, or the star join is an expanding join. + // Also, a star would involve more than one dimension table. + emptyStarJoinPlan + } else { + factTable +: eligibleDimPlans + } + } + } + } + } + } + + /** + * Determines if a column referenced by a base table access is a primary key. + * A column is a PK if it is not nullable and has unique values. + * To determine if a column has unique values in the absence of informational + * RI constraints, the number of distinct values is compared to the total + * number of rows in the table. If their relative difference + * is within the expected limits (i.e. 2 * spark.sql.statistics.ndv.maxError based + * on TPC-DS data results), the column is assumed to have unique values. + */ + private def isUnique( + column: Attribute, + plan: LogicalPlan): Boolean = plan match { + case PhysicalOperation(_, _, t: LeafNode) => + val leafCol = findLeafNodeCol(column, plan) + leafCol match { + case Some(col) if t.outputSet.contains(col) => + val stats = t.stats(conf) + stats.rowCount match { + case Some(rowCount) if rowCount >= 0 => + if (stats.attributeStats.nonEmpty && stats.attributeStats.contains(col)) { + val colStats = stats.attributeStats.get(col) + if (colStats.get.nullCount > 0) { + false + } else { + val distinctCount = colStats.get.distinctCount + val relDiff = math.abs((distinctCount.toDouble / rowCount.toDouble) - 1.0d) + // ndvMaxErr adjusted based on TPCDS 1TB data results + relDiff <= conf.ndvMaxError * 2 + } + } else { + false + } + case None => false + } + case None => false + } + case _ => false + } + + /** + * Given a column over a base table access, it returns + * the leaf node column from which the input column is derived. + */ + @tailrec + private def findLeafNodeCol( + column: Attribute, + plan: LogicalPlan): Option[Attribute] = plan match { + case pl @ PhysicalOperation(_, _, _: LeafNode) => + pl match { + case t: LeafNode if t.outputSet.contains(column) => + Option(column) + case p: Project if p.outputSet.exists(_.semanticEquals(column)) => + val col = p.outputSet.find(_.semanticEquals(column)).get + findLeafNodeCol(col, p.child) + case f: Filter => + findLeafNodeCol(column, f.child) + case _ => None + } + case _ => None + } + + /** + * Checks if a column has statistics. + * The column is assumed to be over a base table access. + */ + private def hasStatistics( + column: Attribute, + plan: LogicalPlan): Boolean = plan match { + case PhysicalOperation(_, _, t: LeafNode) => + val leafCol = findLeafNodeCol(column, plan) + leafCol match { + case Some(col) if t.outputSet.contains(col) => + val stats = t.stats(conf) + stats.attributeStats.nonEmpty && stats.attributeStats.contains(col) + case None => false + } + case _ => false + } + + /** + * Returns the join predicates between two input plans. It only + * considers basic comparison operators. + */ + @inline + private def findJoinConditions( + plan1: LogicalPlan, + plan2: LogicalPlan, + conditions: Seq[Expression]): Seq[Expression] = { + val refs = plan1.outputSet ++ plan2.outputSet + conditions.filter { + case BinaryComparison(_, _) => true + case _ => false + }.filterNot(canEvaluate(_, plan1)) + .filterNot(canEvaluate(_, plan2)) + .filter(_.references.subsetOf(refs)) + } + + /** + * Checks if a star join is a selective join. A star join is assumed + * to be selective if there are local predicates on the dimension + * tables. + */ + private def isSelectiveStarJoin( + dimTables: Seq[LogicalPlan], + conditions: Seq[Expression]): Boolean = dimTables.exists { + case plan @ PhysicalOperation(_, p, _: LeafNode) => + // Checks if any condition applies to the dimension tables. + // Exclude the IsNotNull predicates until predicate selectivity is available. + // In most cases, this predicate is artificially introduced by the Optimizer + // to enforce nullability constraints. + val localPredicates = conditions.filterNot(_.isInstanceOf[IsNotNull]) + .exists(canEvaluate(_, plan)) + + // Checks if there are any predicates pushed down to the base table access. + val pushedDownPredicates = p.nonEmpty && !p.forall(_.isInstanceOf[IsNotNull]) + + localPredicates || pushedDownPredicates + case _ => false + } + + /** + * Helper case class to hold (plan, rowCount) pairs. + */ + private case class TableAccessCardinality(plan: LogicalPlan, size: Option[BigInt]) + + /** + * Returns the cardinality of a base table access. A base table access represents + * a LeafNode, or Project or Filter operators above a LeafNode. + */ + private def getTableAccessCardinality( + input: LogicalPlan): Option[BigInt] = input match { + case PhysicalOperation(_, cond, t: LeafNode) if t.stats(conf).rowCount.isDefined => + if (conf.cboEnabled && input.stats(conf).rowCount.isDefined) { + Option(input.stats(conf).rowCount.get) + } else { + Option(t.stats(conf).rowCount.get) + } + case _ => None + } + + /** + * Reorders a star join based on heuristics. It is called from ReorderJoin if CBO is disabled. + * 1) Finds the star join with the largest fact table. + * 2) Places the fact table the driving arm of the left-deep tree. + * This plan avoids large table access on the inner, and thus favor hash joins. + * 3) Applies the most selective dimensions early in the plan to reduce the amount of + * data flow. + */ + def reorderStarJoins( + input: Seq[(LogicalPlan, InnerLike)], + conditions: Seq[Expression]): Seq[(LogicalPlan, InnerLike)] = { + assert(input.size >= 2) + + val emptyStarJoinPlan = Seq.empty[(LogicalPlan, InnerLike)] + + // Find the eligible star plans. Currently, it only returns + // the star join with the largest fact table. + val eligibleJoins = input.collect{ case (plan, Inner) => plan } + val starPlan = findStarJoins(eligibleJoins, conditions) + + if (starPlan.isEmpty) { + emptyStarJoinPlan + } else { + val (factTable, dimTables) = (starPlan.head, starPlan.tail) + + // Only consider selective joins. This case is detected by observing local predicates + // on the dimension tables. In a star schema relationship, the join between the fact and the + // dimension table is a FK-PK join. Heuristically, a selective dimension may reduce + // the result of a join. + if (isSelectiveStarJoin(dimTables, conditions)) { + val reorderDimTables = dimTables.map { plan => + TableAccessCardinality(plan, getTableAccessCardinality(plan)) + }.sortBy(_.size).map { + case TableAccessCardinality(p1, _) => p1 + } + + val reorderStarPlan = factTable +: reorderDimTables + reorderStarPlan.map(plan => (plan, Inner)) + } else { + emptyStarJoinPlan + } + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/4000f128/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 250dd07..c3ab587 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -20,339 +20,13 @@ package org.apache.spark.sql.catalyst.optimizer import scala.annotation.tailrec import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.planning.{ExtractFiltersAndInnerJoins, PhysicalOperation} +import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.internal.SQLConf /** - * Encapsulates star-schema join detection. - */ -case class StarSchemaDetection(conf: SQLConf) extends PredicateHelper { - - /** - * Star schema consists of one or more fact tables referencing a number of dimension - * tables. In general, star-schema joins are detected using the following conditions: - * 1. Informational RI constraints (reliable detection) - * + Dimension contains a primary key that is being joined to the fact table. - * + Fact table contains foreign keys referencing multiple dimension tables. - * 2. Cardinality based heuristics - * + Usually, the table with the highest cardinality is the fact table. - * + Table being joined with the most number of tables is the fact table. - * - * To detect star joins, the algorithm uses a combination of the above two conditions. - * The fact table is chosen based on the cardinality heuristics, and the dimension - * tables are chosen based on the RI constraints. A star join will consist of the largest - * fact table joined with the dimension tables on their primary keys. To detect that a - * column is a primary key, the algorithm uses table and column statistics. - * - * Since Catalyst only supports left-deep tree plans, the algorithm currently returns only - * the star join with the largest fact table. Choosing the largest fact table on the - * driving arm to avoid large inners is in general a good heuristic. This restriction can - * be lifted with support for bushy tree plans. - * - * The highlights of the algorithm are the following: - * - * Given a set of joined tables/plans, the algorithm first verifies if they are eligible - * for star join detection. An eligible plan is a base table access with valid statistics. - * A base table access represents Project or Filter operators above a LeafNode. Conservatively, - * the algorithm only considers base table access as part of a star join since they provide - * reliable statistics. - * - * If some of the plans are not base table access, or statistics are not available, the algorithm - * returns an empty star join plan since, in the absence of statistics, it cannot make - * good planning decisions. Otherwise, the algorithm finds the table with the largest cardinality - * (number of rows), which is assumed to be a fact table. - * - * Next, it computes the set of dimension tables for the current fact table. A dimension table - * is assumed to be in a RI relationship with a fact table. To infer column uniqueness, - * the algorithm compares the number of distinct values with the total number of rows in the - * table. If their relative difference is within certain limits (i.e. ndvMaxError * 2, adjusted - * based on 1TB TPC-DS data), the column is assumed to be unique. - */ - def findStarJoins( - input: Seq[LogicalPlan], - conditions: Seq[Expression]): Seq[Seq[LogicalPlan]] = { - - val emptyStarJoinPlan = Seq.empty[Seq[LogicalPlan]] - - if (!conf.starSchemaDetection || input.size < 2) { - emptyStarJoinPlan - } else { - // Find if the input plans are eligible for star join detection. - // An eligible plan is a base table access with valid statistics. - val foundEligibleJoin = input.forall { - case PhysicalOperation(_, _, t: LeafNode) if t.stats(conf).rowCount.isDefined => true - case _ => false - } - - if (!foundEligibleJoin) { - // Some plans don't have stats or are complex plans. Conservatively, - // return an empty star join. This restriction can be lifted - // once statistics are propagated in the plan. - emptyStarJoinPlan - } else { - // Find the fact table using cardinality based heuristics i.e. - // the table with the largest number of rows. - val sortedFactTables = input.map { plan => - TableAccessCardinality(plan, getTableAccessCardinality(plan)) - }.collect { case t @ TableAccessCardinality(_, Some(_)) => - t - }.sortBy(_.size)(implicitly[Ordering[Option[BigInt]]].reverse) - - sortedFactTables match { - case Nil => - emptyStarJoinPlan - case table1 :: table2 :: _ - if table2.size.get.toDouble > conf.starSchemaFTRatio * table1.size.get.toDouble => - // If the top largest tables have comparable number of rows, return an empty star plan. - // This restriction will be lifted when the algorithm is generalized - // to return multiple star plans. - emptyStarJoinPlan - case TableAccessCardinality(factTable, _) :: rest => - // Find the fact table joins. - val allFactJoins = rest.collect { case TableAccessCardinality(plan, _) - if findJoinConditions(factTable, plan, conditions).nonEmpty => - plan - } - - // Find the corresponding join conditions. - val allFactJoinCond = allFactJoins.flatMap { plan => - val joinCond = findJoinConditions(factTable, plan, conditions) - joinCond - } - - // Verify if the join columns have valid statistics. - // Allow any relational comparison between the tables. Later - // we will heuristically choose a subset of equi-join - // tables. - val areStatsAvailable = allFactJoins.forall { dimTable => - allFactJoinCond.exists { - case BinaryComparison(lhs: AttributeReference, rhs: AttributeReference) => - val dimCol = if (dimTable.outputSet.contains(lhs)) lhs else rhs - val factCol = if (factTable.outputSet.contains(lhs)) lhs else rhs - hasStatistics(dimCol, dimTable) && hasStatistics(factCol, factTable) - case _ => false - } - } - - if (!areStatsAvailable) { - emptyStarJoinPlan - } else { - // Find the subset of dimension tables. A dimension table is assumed to be in a - // RI relationship with the fact table. Only consider equi-joins - // between a fact and a dimension table to avoid expanding joins. - val eligibleDimPlans = allFactJoins.filter { dimTable => - allFactJoinCond.exists { - case cond @ Equality(lhs: AttributeReference, rhs: AttributeReference) => - val dimCol = if (dimTable.outputSet.contains(lhs)) lhs else rhs - isUnique(dimCol, dimTable) - case _ => false - } - } - - if (eligibleDimPlans.isEmpty) { - // An eligible star join was not found because the join is not - // an RI join, or the star join is an expanding join. - emptyStarJoinPlan - } else { - Seq(factTable +: eligibleDimPlans) - } - } - } - } - } - } - - /** - * Reorders a star join based on heuristics: - * 1) Finds the star join with the largest fact table and places it on the driving - * arm of the left-deep tree. This plan avoids large table access on the inner, and - * thus favor hash joins. - * 2) Applies the most selective dimensions early in the plan to reduce the amount of - * data flow. - */ - def reorderStarJoins( - input: Seq[(LogicalPlan, InnerLike)], - conditions: Seq[Expression]): Seq[(LogicalPlan, InnerLike)] = { - assert(input.size >= 2) - - val emptyStarJoinPlan = Seq.empty[(LogicalPlan, InnerLike)] - - // Find the eligible star plans. Currently, it only returns - // the star join with the largest fact table. - val eligibleJoins = input.collect{ case (plan, Inner) => plan } - val starPlans = findStarJoins(eligibleJoins, conditions) - - if (starPlans.isEmpty) { - emptyStarJoinPlan - } else { - val starPlan = starPlans.head - val (factTable, dimTables) = (starPlan.head, starPlan.tail) - - // Only consider selective joins. This case is detected by observing local predicates - // on the dimension tables. In a star schema relationship, the join between the fact and the - // dimension table is a FK-PK join. Heuristically, a selective dimension may reduce - // the result of a join. - // Also, conservatively assume that a fact table is joined with more than one dimension. - if (dimTables.size >= 2 && isSelectiveStarJoin(dimTables, conditions)) { - val reorderDimTables = dimTables.map { plan => - TableAccessCardinality(plan, getTableAccessCardinality(plan)) - }.sortBy(_.size).map { - case TableAccessCardinality(p1, _) => p1 - } - - val reorderStarPlan = factTable +: reorderDimTables - reorderStarPlan.map(plan => (plan, Inner)) - } else { - emptyStarJoinPlan - } - } - } - - /** - * Determines if a column referenced by a base table access is a primary key. - * A column is a PK if it is not nullable and has unique values. - * To determine if a column has unique values in the absence of informational - * RI constraints, the number of distinct values is compared to the total - * number of rows in the table. If their relative difference - * is within the expected limits (i.e. 2 * spark.sql.statistics.ndv.maxError based - * on TPCDS data results), the column is assumed to have unique values. - */ - private def isUnique( - column: Attribute, - plan: LogicalPlan): Boolean = plan match { - case PhysicalOperation(_, _, t: LeafNode) => - val leafCol = findLeafNodeCol(column, plan) - leafCol match { - case Some(col) if t.outputSet.contains(col) => - val stats = t.stats(conf) - stats.rowCount match { - case Some(rowCount) if rowCount >= 0 => - if (stats.attributeStats.nonEmpty && stats.attributeStats.contains(col)) { - val colStats = stats.attributeStats.get(col) - if (colStats.get.nullCount > 0) { - false - } else { - val distinctCount = colStats.get.distinctCount - val relDiff = math.abs((distinctCount.toDouble / rowCount.toDouble) - 1.0d) - // ndvMaxErr adjusted based on TPCDS 1TB data results - relDiff <= conf.ndvMaxError * 2 - } - } else { - false - } - case None => false - } - case None => false - } - case _ => false - } - - /** - * Given a column over a base table access, it returns - * the leaf node column from which the input column is derived. - */ - @tailrec - private def findLeafNodeCol( - column: Attribute, - plan: LogicalPlan): Option[Attribute] = plan match { - case pl @ PhysicalOperation(_, _, _: LeafNode) => - pl match { - case t: LeafNode if t.outputSet.contains(column) => - Option(column) - case p: Project if p.outputSet.exists(_.semanticEquals(column)) => - val col = p.outputSet.find(_.semanticEquals(column)).get - findLeafNodeCol(col, p.child) - case f: Filter => - findLeafNodeCol(column, f.child) - case _ => None - } - case _ => None - } - - /** - * Checks if a column has statistics. - * The column is assumed to be over a base table access. - */ - private def hasStatistics( - column: Attribute, - plan: LogicalPlan): Boolean = plan match { - case PhysicalOperation(_, _, t: LeafNode) => - val leafCol = findLeafNodeCol(column, plan) - leafCol match { - case Some(col) if t.outputSet.contains(col) => - val stats = t.stats(conf) - stats.attributeStats.nonEmpty && stats.attributeStats.contains(col) - case None => false - } - case _ => false - } - - /** - * Returns the join predicates between two input plans. It only - * considers basic comparison operators. - */ - @inline - private def findJoinConditions( - plan1: LogicalPlan, - plan2: LogicalPlan, - conditions: Seq[Expression]): Seq[Expression] = { - val refs = plan1.outputSet ++ plan2.outputSet - conditions.filter { - case BinaryComparison(_, _) => true - case _ => false - }.filterNot(canEvaluate(_, plan1)) - .filterNot(canEvaluate(_, plan2)) - .filter(_.references.subsetOf(refs)) - } - - /** - * Checks if a star join is a selective join. A star join is assumed - * to be selective if there are local predicates on the dimension - * tables. - */ - private def isSelectiveStarJoin( - dimTables: Seq[LogicalPlan], - conditions: Seq[Expression]): Boolean = dimTables.exists { - case plan @ PhysicalOperation(_, p, _: LeafNode) => - // Checks if any condition applies to the dimension tables. - // Exclude the IsNotNull predicates until predicate selectivity is available. - // In most cases, this predicate is artificially introduced by the Optimizer - // to enforce nullability constraints. - val localPredicates = conditions.filterNot(_.isInstanceOf[IsNotNull]) - .exists(canEvaluate(_, plan)) - - // Checks if there are any predicates pushed down to the base table access. - val pushedDownPredicates = p.nonEmpty && !p.forall(_.isInstanceOf[IsNotNull]) - - localPredicates || pushedDownPredicates - case _ => false - } - - /** - * Helper case class to hold (plan, rowCount) pairs. - */ - private case class TableAccessCardinality(plan: LogicalPlan, size: Option[BigInt]) - - /** - * Returns the cardinality of a base table access. A base table access represents - * a LeafNode, or Project or Filter operators above a LeafNode. - */ - private def getTableAccessCardinality( - input: LogicalPlan): Option[BigInt] = input match { - case PhysicalOperation(_, cond, t: LeafNode) if t.stats(conf).rowCount.isDefined => - if (conf.cboEnabled && input.stats(conf).rowCount.isDefined) { - Option(input.stats(conf).rowCount.get) - } else { - Option(t.stats(conf).rowCount.get) - } - case _ => None - } -} - -/** * Reorder the joins and push all the conditions into join, so that the bottom ones have at least * one condition. * http://git-wip-us.apache.org/repos/asf/spark/blob/4000f128/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala index 003ce49..605c01b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala @@ -206,7 +206,7 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase { // and d3_fk1 = s3_pk1 // // Default join reordering: d1, f1, d2, d3, s3 - // Star join reordering: f1, d1, d3, d2,, d3 + // Star join reordering: f1, d1, d3, d2, s3 val query = d1.join(f1).join(d2).join(s3).join(d3) @@ -242,7 +242,7 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase { // and d3_fk1 = s3_pk1 // // Default join reordering: d1, f1, d2, d3, s3 - // Star join reordering: f1, d1, d3, d2, d3 + // Star join reordering: f1, d1, d3, d2, s3 val query = d1.join(f1).join(d2).join(s3).join(d3) .where((nameToAttr("f1_fk1") === nameToAttr("d1_pk1")) && --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org