Repository: spark Updated Branches: refs/heads/branch-2.0 a780848af -> 71e8aaeaa
[SPARK-6320][SQL] Move planLater method into GenericStrategy. ## What changes were proposed in this pull request? This PR is the minimal version of #13147 for `branch-2.0`. ## How was this patch tested? Picked `SparkPlannerSuite` from #13147. Author: Takuya UESHIN <[email protected]> Closes #13426 from ueshin/issues/SPARK-6320_2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71e8aaea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71e8aaea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71e8aaea Branch: refs/heads/branch-2.0 Commit: 71e8aaeaa9c2983d6f2ab8c512e59f5b13e8844e Parents: a780848 Author: Takuya UESHIN <[email protected]> Authored: Wed Jun 1 10:28:48 2016 -0700 Committer: Michael Armbrust <[email protected]> Committed: Wed Jun 1 10:28:48 2016 -0700 ---------------------------------------------------------------------- .../sql/catalyst/planning/QueryPlanner.scala | 15 ++--- .../spark/sql/execution/SparkPlanner.scala | 11 ++++ .../spark/sql/execution/SparkStrategies.scala | 23 +++++++ .../scala/org/apache/spark/sql/package.scala | 4 +- .../spark/sql/execution/SparkPlannerSuite.scala | 63 ++++++++++++++++++++ 5 files changed, 107 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/71e8aaea/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 8b1a34f..327a048 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -27,6 +27,14 @@ import org.apache.spark.sql.catalyst.trees.TreeNode * empty list should be returned. */ abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging { + + /** + * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be + * filled in automatically by the QueryPlanner using the other execution strategies that are + * available. + */ + protected def planLater(plan: LogicalPlan): PhysicalPlan + def apply(plan: LogicalPlan): Seq[PhysicalPlan] } @@ -47,13 +55,6 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { /** A list of execution strategies that can be used by the planner */ def strategies: Seq[GenericStrategy[PhysicalPlan]] - /** - * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be - * filled in automatically by the QueryPlanner using the other execution strategies that are - * available. - */ - protected def planLater(plan: LogicalPlan): PhysicalPlan = this.plan(plan).next() - def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still... val iter = strategies.view.flatMap(_(plan)).toIterator http://git-wip-us.apache.org/repos/asf/spark/blob/71e8aaea/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index de832ec..319fff1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.SparkContext import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileSourceStrategy} import org.apache.spark.sql.internal.SQLConf @@ -42,6 +43,16 @@ class SparkPlanner( InMemoryScans :: BasicOperators :: Nil) + override def plan(plan: LogicalPlan): Iterator[SparkPlan] = { + super.plan(plan).map { + _.transformUp { + case PlanLater(p) => + // TODO: use the first plan for now, but we will implement plan space exploaration later. + this.plan(p).next() + } + } + } + /** * Used to build table scan operators where complex projection and filtering are done using * separate physical operators. This function returns the given scan operator with Project and http://git-wip-us.apache.org/repos/asf/spark/blob/71e8aaea/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 7e3e45e..5a069f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder @@ -35,6 +37,27 @@ import org.apache.spark.sql.execution.streaming.MemoryPlan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.ContinuousQuery +/** + * Converts a logical plan into zero or more SparkPlans. This API is exposed for experimenting + * with the query planner and is not designed to be stable across spark releases. Developers + * writing libraries should instead consider using the stable APIs provided in + * [[org.apache.spark.sql.sources]] + */ +@DeveloperApi +abstract class SparkStrategy extends GenericStrategy[SparkPlan] { + + override protected def planLater(plan: LogicalPlan): SparkPlan = PlanLater(plan) +} + +private[sql] case class PlanLater(plan: LogicalPlan) extends LeafExecNode { + + override def output: Seq[Attribute] = plan.output + + protected override def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException() + } +} + private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SparkPlanner => http://git-wip-us.apache.org/repos/asf/spark/blob/71e8aaea/sql/core/src/main/scala/org/apache/spark/sql/package.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala index 97e35bb..28d8bc3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala @@ -18,7 +18,7 @@ package org.apache.spark import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} /** * Allows the execution of relational queries, including those expressed in SQL using Spark. @@ -40,7 +40,7 @@ package object sql { * [[org.apache.spark.sql.sources]] */ @DeveloperApi - type Strategy = org.apache.spark.sql.catalyst.planning.GenericStrategy[SparkPlan] + type Strategy = SparkStrategy type DataFrame = Dataset[Row] } http://git-wip-us.apache.org/repos/asf/spark/blob/71e8aaea/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala new file mode 100644 index 0000000..aecfd30 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.Strategy +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, LogicalPlan, ReturnAnswer, Union} +import org.apache.spark.sql.test.SharedSQLContext + +class SparkPlannerSuite extends SharedSQLContext { + import testImplicits._ + + test("Ensure to go down only the first branch, not any other possible branches") { + + case object NeverPlanned extends LeafNode { + override def output: Seq[Attribute] = Nil + } + + var planned = 0 + object TestStrategy extends Strategy { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case ReturnAnswer(child) => + planned += 1 + planLater(child) :: planLater(NeverPlanned) :: Nil + case Union(children) => + planned += 1 + UnionExec(children.map(planLater)) :: planLater(NeverPlanned) :: Nil + case LocalRelation(output, data) => + planned += 1 + LocalTableScanExec(output, data) :: planLater(NeverPlanned) :: Nil + case NeverPlanned => + fail("QueryPlanner should not go down to this branch.") + case _ => Nil + } + } + + try { + spark.experimental.extraStrategies = TestStrategy :: Nil + + val ds = Seq("a", "b", "c").toDS().union(Seq("d", "e", "f").toDS()) + + assert(ds.collect().toSeq === Seq("a", "b", "c", "d", "e", "f")) + assert(planned === 4) + } finally { + spark.experimental.extraStrategies = Nil + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
