Repository: spark Updated Branches: refs/heads/master 58353d7f4 -> 5ed7660d1
[SPARK-24802][SQL][FOLLOW-UP] Add a new config for Optimization Rule Exclusion ## What changes were proposed in this pull request? This is an extension to the original PR, in which rule exclusion did not work for classes derived from Optimizer, e.g., SparkOptimizer. To solve this issue, Optimizer and its derived classes will define/override `defaultBatches` and `nonExcludableRules` in order to define its default rule set as well as rules that cannot be excluded by the SQL config. In the meantime, Optimizer's `batches` method is dedicated to the rule exclusion logic and is defined "final". ## How was this patch tested? Added UT. Author: maryannxue <maryann...@apache.org> Closes #21876 from maryannxue/rule-exclusion. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ed7660d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ed7660d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ed7660d Branch: refs/heads/master Commit: 5ed7660d14022eb65396e28496c06e47c1dbab1d Parents: 58353d7 Author: maryannxue <maryann...@apache.org> Authored: Thu Jul 26 11:06:23 2018 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Thu Jul 26 11:06:23 2018 -0700 ---------------------------------------------------------------------- .../sql/catalyst/optimizer/Optimizer.scala | 24 ++++++++- .../optimizer/OptimizerExtendableSuite.scala | 2 +- .../optimizer/OptimizerRuleExclusionSuite.scala | 53 +++++++++++++++----- ...timizerStructuralIntegrityCheckerSuite.scala | 2 +- .../spark/sql/execution/SparkOptimizer.scala | 5 +- 5 files changed, 69 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5ed7660d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index adb1350..3c264eb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -46,6 +46,13 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) + /** + * Defines the default rule batches in the Optimizer. + * + * Implementations of this class should override this method, and [[nonExcludableRules]] if + * necessary, instead of [[batches]]. The rule batches that eventually run in the Optimizer, + * i.e., returned by [[batches]], will be (defaultBatches - (excludedRules - nonExcludableRules)). + */ def defaultBatches: Seq[Batch] = { val operatorOptimizationRuleSet = Seq( @@ -160,6 +167,14 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) UpdateNullabilityInAttributeReferences) } + /** + * Defines rules that cannot be excluded from the Optimizer even if they are specified in + * SQL config "excludedRules". + * + * Implementations of this class can override this method if necessary. The rule batches + * that eventually run in the Optimizer, i.e., returned by [[batches]], will be + * (defaultBatches - (excludedRules - nonExcludableRules)). + */ def nonExcludableRules: Seq[String] = EliminateDistinct.ruleName :: EliminateSubqueryAliases.ruleName :: @@ -202,7 +217,14 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) */ def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil - override def batches: Seq[Batch] = { + /** + * Returns (defaultBatches - (excludedRules - nonExcludableRules)), the rule batches that + * eventually run in the Optimizer. + * + * Implementations of this class should override [[defaultBatches]], and [[nonExcludableRules]] + * if necessary, instead of this method. + */ + final override def batches: Seq[Batch] = { val excludedRulesConf = SQLConf.get.optimizerExcludedRules.toSeq.flatMap(Utils.stringToSeq) val excludedRules = excludedRulesConf.filter { ruleName => http://git-wip-us.apache.org/repos/asf/spark/blob/5ed7660d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala index 7112c03..36b083a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala @@ -47,7 +47,7 @@ class OptimizerExtendableSuite extends SparkFunSuite { DummyRule) :: Nil } - override def batches: Seq[Batch] = super.batches ++ myBatches + override def defaultBatches: Seq[Batch] = super.defaultBatches ++ myBatches } test("Extending batches possible") { http://git-wip-us.apache.org/repos/asf/spark/blob/5ed7660d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala index 5a5396e..30c80d2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala @@ -28,8 +28,10 @@ class OptimizerRuleExclusionSuite extends PlanTest { val testRelation = LocalRelation('a.int, 'b.int, 'c.int) - private def verifyExcludedRules(excludedRuleNames: Seq[String]) { - val optimizer = new SimpleTestOptimizer() + private def verifyExcludedRules(optimizer: Optimizer, rulesToExclude: Seq[String]) { + val nonExcludableRules = optimizer.nonExcludableRules + + val excludedRuleNames = rulesToExclude.filter(!nonExcludableRules.contains(_)) // Batches whose rules are all to be excluded should be removed as a whole. val excludedBatchNames = optimizer.batches .filter(batch => batch.rules.forall(rule => excludedRuleNames.contains(rule.ruleName))) @@ -38,21 +40,31 @@ class OptimizerRuleExclusionSuite extends PlanTest { withSQLConf( OPTIMIZER_EXCLUDED_RULES.key -> excludedRuleNames.foldLeft("")((l, r) => l + "," + r)) { val batches = optimizer.batches + // Verify removed batches. assert(batches.forall(batch => !excludedBatchNames.contains(batch.name))) + // Verify removed rules. assert( batches .forall(batch => batch.rules.forall(rule => !excludedRuleNames.contains(rule.ruleName)))) + // Verify non-excludable rules retained. + nonExcludableRules.foreach { nonExcludableRule => + assert( + optimizer.batches + .exists(batch => batch.rules.exists(rule => rule.ruleName == nonExcludableRule))) + } } } test("Exclude a single rule from multiple batches") { verifyExcludedRules( + new SimpleTestOptimizer(), Seq( PushPredicateThroughJoin.ruleName)) } test("Exclude multiple rules from single or multiple batches") { verifyExcludedRules( + new SimpleTestOptimizer(), Seq( CombineUnions.ruleName, RemoveLiteralFromGroupExpressions.ruleName, @@ -61,6 +73,7 @@ class OptimizerRuleExclusionSuite extends PlanTest { test("Exclude non-existent rule with other valid rules") { verifyExcludedRules( + new SimpleTestOptimizer(), Seq( LimitPushDown.ruleName, InferFiltersFromConstraints.ruleName, @@ -68,20 +81,34 @@ class OptimizerRuleExclusionSuite extends PlanTest { } test("Try to exclude a non-excludable rule") { - val excludedRules = Seq( - ReplaceIntersectWithSemiJoin.ruleName, - PullupCorrelatedPredicates.ruleName) + verifyExcludedRules( + new SimpleTestOptimizer(), + Seq( + ReplaceIntersectWithSemiJoin.ruleName, + PullupCorrelatedPredicates.ruleName)) + } - val optimizer = new SimpleTestOptimizer() + test("Custom optimizer") { + val optimizer = new SimpleTestOptimizer() { + override def defaultBatches: Seq[Batch] = + Batch("push", Once, + PushDownPredicate, + PushPredicateThroughJoin, + PushProjectionThroughUnion) :: + Batch("pull", Once, + PullupCorrelatedPredicates) :: Nil - withSQLConf( - OPTIMIZER_EXCLUDED_RULES.key -> excludedRules.foldLeft("")((l, r) => l + "," + r)) { - excludedRules.foreach { excludedRule => - assert( - optimizer.batches - .exists(batch => batch.rules.exists(rule => rule.ruleName == excludedRule))) - } + override def nonExcludableRules: Seq[String] = + PushDownPredicate.ruleName :: + PullupCorrelatedPredicates.ruleName :: Nil } + + verifyExcludedRules( + optimizer, + Seq( + PushDownPredicate.ruleName, + PushProjectionThroughUnion.ruleName, + PullupCorrelatedPredicates.ruleName)) } test("Verify optimized plan after excluding CombineUnions rule") { http://git-wip-us.apache.org/repos/asf/spark/blob/5ed7660d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala index 6e183d8..a22a81e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala @@ -44,7 +44,7 @@ class OptimizerStructuralIntegrityCheckerSuite extends PlanTest { EmptyFunctionRegistry, new SQLConf())) { val newBatch = Batch("OptimizeRuleBreakSI", Once, OptimizeRuleBreakSI) - override def batches: Seq[Batch] = Seq(newBatch) ++ super.batches + override def defaultBatches: Seq[Batch] = Seq(newBatch) ++ super.defaultBatches } test("check for invalid plan after execution of rule") { http://git-wip-us.apache.org/repos/asf/spark/blob/5ed7660d/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index 00ff4c8..64d3f2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -28,13 +28,16 @@ class SparkOptimizer( experimentalMethods: ExperimentalMethods) extends Optimizer(catalog) { - override def batches: Seq[Batch] = (preOptimizationBatches ++ super.batches :+ + override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+ Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+ Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+ Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions)) ++ postHocOptimizationBatches :+ Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*) + override def nonExcludableRules: Seq[String] = + super.nonExcludableRules :+ ExtractPythonUDFFromAggregate.ruleName + /** * Optimization batches that are executed before the regular optimization batches (also before * the finish analysis batch). --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org