Repository: spark
Updated Branches:
  refs/heads/master a6cd9dbc6 -> ae83c2112


[SPARK-18998][SQL] Add a cbo conf to switch between default statistics and 
estimated statistics

## What changes were proposed in this pull request?

We add a cbo configuration to switch between default stats and estimated stats.
We also define a new statistics method `planStats` in LogicalPlan with conf as 
its parameter, in order to pass the cbo switch and other estimation related 
configurations in the future. `planStats` is used on the caller sides (i.e. in 
Optimizer and Strategies) to make transformation decisions based on stats.

## How was this patch tested?

Add a test case using a dummy LogicalPlan.

Author: Zhenhua Wang <wzh_...@163.com>

Closes #16401 from wzhfy/cboSwitch.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ae83c211
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ae83c211
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ae83c211

Branch: refs/heads/master
Commit: ae83c211257c508989c703d54f2aeec8b2b5f14d
Parents: a6cd9db
Author: Zhenhua Wang <wzh_...@163.com>
Authored: Tue Jan 3 12:19:52 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue Jan 3 12:19:52 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/CatalystConf.scala       |  6 ++
 .../sql/catalyst/optimizer/Optimizer.scala      |  6 +-
 .../catalyst/plans/logical/LogicalPlan.scala    | 24 +++++++
 .../catalyst/optimizer/LimitPushdownSuite.scala |  3 +-
 .../spark/sql/execution/SparkStrategies.scala   | 12 ++--
 .../org/apache/spark/sql/internal/SQLConf.scala |  9 +++
 .../statsEstimation/StatsEstimationSuite.scala  | 67 ++++++++++++++++++++
 7 files changed, 117 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ae83c211/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index 75ae588..b805cfe 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -49,6 +49,11 @@ trait CatalystConf {
   def resolver: Resolver = {
     if (caseSensitiveAnalysis) caseSensitiveResolution else 
caseInsensitiveResolution
   }
+
+  /**
+   * Enables CBO for estimation of plan statistics when set true.
+   */
+  def cboEnabled: Boolean
 }
 
 
@@ -62,5 +67,6 @@ case class SimpleCatalystConf(
     maxCaseBranchesForCodegen: Int = 20,
     runSQLonFile: Boolean = true,
     crossJoinEnabled: Boolean = false,
+    cboEnabled: Boolean = false,
     warehousePath: String = "/user/hive/warehouse")
   extends CatalystConf

http://git-wip-us.apache.org/repos/asf/spark/blob/ae83c211/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index dfd66aa..d1f90e6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -82,7 +82,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, 
conf: CatalystConf)
       EliminateOuterJoin,
       PushPredicateThroughJoin,
       PushDownPredicate,
-      LimitPushDown,
+      LimitPushDown(conf),
       ColumnPruning,
       InferFiltersFromConstraints,
       // Operator combine
@@ -209,7 +209,7 @@ object RemoveAliasOnlyProject extends Rule[LogicalPlan] {
 /**
  * Pushes down [[LocalLimit]] beneath UNION ALL and beneath the streamed 
inputs of outer joins.
  */
-object LimitPushDown extends Rule[LogicalPlan] {
+case class LimitPushDown(conf: CatalystConf) extends Rule[LogicalPlan] {
 
   private def stripGlobalLimitIfPresent(plan: LogicalPlan): LogicalPlan = {
     plan match {
@@ -253,7 +253,7 @@ object LimitPushDown extends Rule[LogicalPlan] {
         case FullOuter =>
           (left.maxRows, right.maxRows) match {
             case (None, None) =>
-              if (left.statistics.sizeInBytes >= right.statistics.sizeInBytes) 
{
+              if (left.planStats(conf).sizeInBytes >= 
right.planStats(conf).sizeInBytes) {
                 join.copy(left = maybePushLimit(exp, left))
               } else {
                 join.copy(right = maybePushLimit(exp, right))

http://git-wip-us.apache.org/repos/asf/spark/blob/ae83c211/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index b0a4145..4f634cb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.CatalystConf
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
@@ -95,6 +96,29 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] 
with Logging {
   }
 
   /**
+   * Returns the default statistics or statistics estimated by cbo based on 
configuration.
+   */
+  final def planStats(conf: CatalystConf): Statistics = {
+    if (conf.cboEnabled) {
+      if (estimatedStats.isEmpty) {
+        estimatedStats = Some(cboStatistics(conf))
+      }
+      estimatedStats.get
+    } else {
+      statistics
+    }
+  }
+
+  /**
+   * Returns statistics estimated by cbo. If the plan doesn't override this, 
it returns the
+   * default statistics.
+   */
+  protected def cboStatistics(conf: CatalystConf): Statistics = statistics
+
+  /** A cache for the estimated statistics, such that it will only be computed 
once. */
+  private var estimatedStats: Option[Statistics] = None
+
+  /**
    * Returns the maximum number of rows that this plan may compute.
    *
    * Any operator that a Limit can be pushed passed should override this 
function (e.g., Union).

http://git-wip-us.apache.org/repos/asf/spark/blob/ae83c211/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
index dcbc793..9ec9983 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
@@ -32,7 +33,7 @@ class LimitPushdownSuite extends PlanTest {
       Batch("Subqueries", Once,
         EliminateSubqueryAliases) ::
       Batch("Limit pushdown", FixedPoint(100),
-        LimitPushDown,
+        LimitPushDown(SimpleCatalystConf(caseSensitiveAnalysis = true)),
         CombineLimits,
         ConstantFolding,
         BooleanSimplification) :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/ae83c211/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index ba82ec1..81cd5ef 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -114,9 +114,9 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
      * Matches a plan whose output should be small enough to be used in 
broadcast join.
      */
     private def canBroadcast(plan: LogicalPlan): Boolean = {
-      plan.statistics.isBroadcastable ||
-        (plan.statistics.sizeInBytes >= 0 &&
-          plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold)
+      plan.planStats(conf).isBroadcastable ||
+        (plan.planStats(conf).sizeInBytes >= 0 &&
+          plan.planStats(conf).sizeInBytes <= conf.autoBroadcastJoinThreshold)
     }
 
     /**
@@ -126,7 +126,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
      * dynamic.
      */
     private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
-      plan.statistics.sizeInBytes < conf.autoBroadcastJoinThreshold * 
conf.numShufflePartitions
+      plan.planStats(conf).sizeInBytes < conf.autoBroadcastJoinThreshold * 
conf.numShufflePartitions
     }
 
     /**
@@ -137,7 +137,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
      * use the size of bytes here as estimation.
      */
     private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
-      a.statistics.sizeInBytes * 3 <= b.statistics.sizeInBytes
+      a.planStats(conf).sizeInBytes * 3 <= b.planStats(conf).sizeInBytes
     }
 
     private def canBuildRight(joinType: JoinType): Boolean = joinType match {
@@ -206,7 +206,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
       case logical.Join(left, right, joinType, condition) =>
         val buildSide =
-          if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) {
+          if (right.planStats(conf).sizeInBytes <= 
left.planStats(conf).sizeInBytes) {
             BuildRight
           } else {
             BuildLeft

http://git-wip-us.apache.org/repos/asf/spark/blob/ae83c211/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 304dcb6..322cc7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -642,6 +642,12 @@ object SQLConf {
       .doubleConf
       .createWithDefault(0.05)
 
+  val CBO_ENABLED =
+    SQLConfigBuilder("spark.sql.cbo.enabled")
+      .doc("Enables CBO for estimation of plan statistics when set true.")
+      .booleanConf
+      .createWithDefault(false)
+
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
   }
@@ -841,6 +847,9 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf with Logging {
   override def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
 
   def ndvMaxError: Double = getConf(NDV_MAX_ERROR)
+
+  override def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED)
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */

http://git-wip-us.apache.org/repos/asf/spark/blob/ae83c211/sql/core/src/test/scala/org/apache/spark/sql/statsEstimation/StatsEstimationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/statsEstimation/StatsEstimationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/statsEstimation/StatsEstimationSuite.scala
new file mode 100644
index 0000000..78f2ce1
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/statsEstimation/StatsEstimationSuite.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.statsEstimation
+
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.IntegerType
+
+
+class StatsEstimationSuite extends SharedSQLContext {
+  test("statistics for a plan based on the cbo switch") {
+    val expectedDefaultStats =
+      Statistics(
+        sizeInBytes = 40,
+        rowCount = Some(10),
+        attributeStats = AttributeMap(Seq(
+          AttributeReference("c1", IntegerType)() -> ColumnStat(10, Some(1), 
Some(10), 0, 4, 4))),
+        isBroadcastable = false)
+    val expectedCboStats =
+      Statistics(
+        sizeInBytes = 4,
+        rowCount = Some(1),
+        attributeStats = AttributeMap(Seq(
+          AttributeReference("c1", IntegerType)() -> ColumnStat(1, Some(5), 
Some(5), 0, 4, 4))),
+        isBroadcastable = false)
+
+    val plan = DummyLogicalPlan(defaultStats = expectedDefaultStats, cboStats 
= expectedCboStats)
+    withSQLConf("spark.sql.cbo.enabled" -> "true") {
+      // Use the statistics estimated by cbo
+      assert(plan.planStats(spark.sessionState.conf) == expectedCboStats)
+    }
+    withSQLConf("spark.sql.cbo.enabled" -> "false") {
+      // Use the default statistics
+      assert(plan.planStats(spark.sessionState.conf) == expectedDefaultStats)
+    }
+  }
+}
+
+/**
+ * This class is used for unit-testing the cbo switch, it mimics a logical 
plan which has both
+ * default statistics and cbo estimated statistics.
+ */
+private case class DummyLogicalPlan(
+    defaultStats: Statistics,
+    cboStats: Statistics) extends LogicalPlan {
+  override lazy val statistics = defaultStats
+  override def cboStatistics(conf: CatalystConf): Statistics = cboStats
+  override def output: Seq[Attribute] = Nil
+  override def children: Seq[LogicalPlan] = Nil
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to