This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 94dc3c7 [SPARK-35881][SQL][FOLLOWUP] Add a boolean flag in AdaptiveSparkPlanExec to ask for columnar output 94dc3c7 is described below commit 94dc3c77c2228efd6234aaa3c87bec06e489c63c Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Mon Aug 9 16:33:52 2021 +0800 [SPARK-35881][SQL][FOLLOWUP] Add a boolean flag in AdaptiveSparkPlanExec to ask for columnar output ### What changes were proposed in this pull request? This is a follow-up of https://github.com/apache/spark/pull/33140 to propose a simpler idea for integrating columnar execution into AQE. Instead of making the `ColumnarToRowExec` and `RowToColumnarExec` dynamic to handle `AdaptiveSparkPlanExec`, it's simpler to let the consumer decide if it needs columnar output or not, and pass a boolean flag to `AdaptiveSparkPlanExec`. For Spark vendors, they can set the flag differently in their custom columnar parquet writing command when the input plan is `AdaptiveSparkPlanExec`. One argument is if we need to look at the final plan of AQE and consume the data differently (either row or columnar format). I can't think of a use case and I think we can always statically know if the AQE plan should output row or columnar data. ### Why are the changes needed? code simplification. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? manual test Closes #33624 from cloud-fan/aqe. Lead-authored-by: Wenchen Fan <wenc...@databricks.com> Co-authored-by: Wenchen Fan <cloud0...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 8714eefe6f975e6b106b59e2ab3af53df4555dce) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../org/apache/spark/sql/execution/Columnar.scala | 156 +++++++++------------ .../spark/sql/execution/QueryExecution.scala | 3 +- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 52 +++---- .../spark/sql/execution/ColumnarRulesSuite.scala | 4 +- 4 files changed, 93 insertions(+), 122 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index 406e757..a70dba8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector} import org.apache.spark.sql.types._ @@ -66,9 +65,7 @@ trait ColumnarToRowTransition extends UnaryExecNode * [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations. */ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition with CodegenSupport { - // child plan must be columnar or an adaptive plan, which could either be row-based or - // columnar, but we don't know until we execute it - assert(child.supportsColumnar || child.isInstanceOf[AdaptiveSparkPlanExec]) + assert(child.supportsColumnar) override def output: Seq[Attribute] = child.output @@ -86,25 +83,18 @@ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition w ) override def doExecute(): RDD[InternalRow] = { - child match { - case a: AdaptiveSparkPlanExec if !a.finalPlanSupportsColumnar() => - // if the child plan is adaptive and resulted in rows rather than columnar data - // then we can bypass any transition - a.execute() - case _ => - val numOutputRows = longMetric("numOutputRows") - val numInputBatches = longMetric("numInputBatches") - // This avoids calling `output` in the RDD closure, so that we don't need to include - // the entire plan (this) in the closure. - val localOutput = this.output - child.executeColumnar().mapPartitionsInternal { batches => - val toUnsafe = UnsafeProjection.create(localOutput, localOutput) - batches.flatMap { batch => - numInputBatches += 1 - numOutputRows += batch.numRows() - batch.rowIterator().asScala.map(toUnsafe) - } - } + val numOutputRows = longMetric("numOutputRows") + val numInputBatches = longMetric("numInputBatches") + // This avoids calling `output` in the RDD closure, so that we don't need to include the entire + // plan (this) in the closure. + val localOutput = this.output + child.executeColumnar().mapPartitionsInternal { batches => + val toUnsafe = UnsafeProjection.create(localOutput, localOutput) + batches.flatMap { batch => + numInputBatches += 1 + numOutputRows += batch.numRows() + batch.rowIterator().asScala.map(toUnsafe) + } } } @@ -429,10 +419,6 @@ trait RowToColumnarTransition extends UnaryExecNode * would only be to reduce code. */ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition { - // child plan must be row-based or an adaptive plan, which could either be row-based or - // columnar, but we don't know until we execute it - assert(!child.supportsColumnar || child.isInstanceOf[AdaptiveSparkPlanExec]) - override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning @@ -455,60 +441,52 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition { ) override def doExecuteColumnar(): RDD[ColumnarBatch] = { - child match { - case a: AdaptiveSparkPlanExec if a.finalPlanSupportsColumnar() => - // if the child plan is adaptive and resulted in columnar data - // then we can bypass any transition - a.executeColumnar() - case _ => - val enableOffHeapColumnVector = conf.offHeapColumnVectorEnabled - val numInputRows = longMetric("numInputRows") - val numOutputBatches = longMetric("numOutputBatches") - // Instead of creating a new config we are reusing columnBatchSize. In the future if we do - // combine with some of the Arrow conversion tools we will need to unify some of the - // configs. - val numRows = conf.columnBatchSize - // This avoids calling `schema` in the RDD closure, so that we don't need to include the - // entire plan (this) in the closure. - val localSchema = this.schema - child.execute().mapPartitionsInternal { rowIterator => - if (rowIterator.hasNext) { - new Iterator[ColumnarBatch] { - private val converters = new RowToColumnConverter(localSchema) - private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) { - OffHeapColumnVector.allocateColumns(numRows, localSchema) - } else { - OnHeapColumnVector.allocateColumns(numRows, localSchema) - } - private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray) - - TaskContext.get().addTaskCompletionListener[Unit] { _ => - cb.close() - } - - override def hasNext: Boolean = { - rowIterator.hasNext - } - - override def next(): ColumnarBatch = { - cb.setNumRows(0) - vectors.foreach(_.reset()) - var rowCount = 0 - while (rowCount < numRows && rowIterator.hasNext) { - val row = rowIterator.next() - converters.convert(row, vectors.toArray) - rowCount += 1 - } - cb.setNumRows(rowCount) - numInputRows += rowCount - numOutputBatches += 1 - cb - } - } + val enableOffHeapColumnVector = conf.offHeapColumnVectorEnabled + val numInputRows = longMetric("numInputRows") + val numOutputBatches = longMetric("numOutputBatches") + // Instead of creating a new config we are reusing columnBatchSize. In the future if we do + // combine with some of the Arrow conversion tools we will need to unify some of the configs. + val numRows = conf.columnBatchSize + // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire + // plan (this) in the closure. + val localSchema = this.schema + child.execute().mapPartitionsInternal { rowIterator => + if (rowIterator.hasNext) { + new Iterator[ColumnarBatch] { + private val converters = new RowToColumnConverter(localSchema) + private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) { + OffHeapColumnVector.allocateColumns(numRows, localSchema) } else { - Iterator.empty + OnHeapColumnVector.allocateColumns(numRows, localSchema) + } + private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray) + + TaskContext.get().addTaskCompletionListener[Unit] { _ => + cb.close() + } + + override def hasNext: Boolean = { + rowIterator.hasNext + } + + override def next(): ColumnarBatch = { + cb.setNumRows(0) + vectors.foreach(_.reset()) + var rowCount = 0 + while (rowCount < numRows && rowIterator.hasNext) { + val row = rowIterator.next() + converters.convert(row, vectors.toArray) + rowCount += 1 + } + cb.setNumRows(rowCount) + numInputRows += rowCount + numOutputBatches += 1 + cb } } + } else { + Iterator.empty + } } } @@ -519,9 +497,13 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition { /** * Apply any user defined [[ColumnarRule]]s and find the correct place to insert transitions * to/from columnar formatted data. + * + * @param columnarRules custom columnar rules + * @param outputsColumnar whether or not the produced plan should output columnar format. */ case class ApplyColumnarRulesAndInsertTransitions( - columnarRules: Seq[ColumnarRule]) + columnarRules: Seq[ColumnarRule], + outputsColumnar: Boolean) extends Rule[SparkPlan] { /** @@ -531,7 +513,7 @@ case class ApplyColumnarRulesAndInsertTransitions( if (!plan.supportsColumnar) { // The tree feels kind of backwards // Columnar Processing will start here, so transition from row to columnar - RowToColumnarExec(insertTransitions(plan)) + RowToColumnarExec(insertTransitions(plan, outputsColumnar = false)) } else if (!plan.isInstanceOf[RowToColumnarTransition]) { plan.withNewChildren(plan.children.map(insertRowToColumnar)) } else { @@ -542,13 +524,15 @@ case class ApplyColumnarRulesAndInsertTransitions( /** * Inserts RowToColumnarExecs and ColumnarToRowExecs where needed. */ - private def insertTransitions(plan: SparkPlan): SparkPlan = { - if (plan.supportsColumnar) { - // The tree feels kind of backwards - // This is the end of the columnar processing so go back to rows + private def insertTransitions(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = { + if (outputsColumnar) { + insertRowToColumnar(plan) + } else if (plan.supportsColumnar) { + // `outputsColumnar` is false but the plan outputs columnar format, so add a + // to-row transition here. ColumnarToRowExec(insertRowToColumnar(plan)) } else if (!plan.isInstanceOf[ColumnarToRowTransition]) { - plan.withNewChildren(plan.children.map(insertTransitions)) + plan.withNewChildren(plan.children.map(insertTransitions(_, outputsColumnar = false))) } else { plan } @@ -558,7 +542,7 @@ case class ApplyColumnarRulesAndInsertTransitions( var preInsertPlan: SparkPlan = plan columnarRules.foreach((r : ColumnarRule) => preInsertPlan = r.preColumnarTransitions(preInsertPlan)) - var postInsertPlan = insertTransitions(preInsertPlan) + var postInsertPlan = insertTransitions(preInsertPlan, outputsColumnar) columnarRules.reverse.foreach((r : ColumnarRule) => postInsertPlan = r.postColumnarTransitions(postInsertPlan)) postInsertPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 6c16dce..361a910 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -414,7 +414,8 @@ object QueryExecution { // number of partitions when instantiating PartitioningCollection. RemoveRedundantSorts, DisableUnnecessaryBucketedScan, - ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.columnarRules), + ApplyColumnarRulesAndInsertTransitions( + sparkSession.sessionState.columnarRules, outputsColumnar = false), CollapseCodegenStages()) ++ (if (subquery) { Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 9db4574..2c242d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -65,7 +65,8 @@ case class AdaptiveSparkPlanExec( inputPlan: SparkPlan, @transient context: AdaptiveExecutionContext, @transient preprocessingRules: Seq[Rule[SparkPlan]], - @transient isSubquery: Boolean) + @transient isSubquery: Boolean, + @transient override val supportsColumnar: Boolean = false) extends LeafExecNode { @transient private val lock = new Object() @@ -99,7 +100,7 @@ case class AdaptiveSparkPlanExec( // A list of physical plan rules to be applied before creation of query stages. The physical // plan should reach a final status of query stages (i.e., no more addition or removal of // Exchange nodes) after running these rules. - private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq( + @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq( RemoveRedundantProjects, // For cases like `df.repartition(a, b).select(c)`, there is no distribution requirement for // the final plan, but we do need to respect the user-specified repartition. Here we ask @@ -124,12 +125,14 @@ case class AdaptiveSparkPlanExec( OptimizeShuffleWithLocalRead ) + @transient private val staticPostStageCreationRules: Seq[Rule[SparkPlan]] = + CollapseCodegenStages() +: context.session.sessionState.postStageCreationRules + // A list of physical optimizer rules to be applied right after a new stage is created. The input // plan to these rules has exchange as its root node. - @transient private val postStageCreationRules = Seq( - ApplyColumnarRulesAndInsertTransitions(context.session.sessionState.columnarRules), - CollapseCodegenStages() - ) ++ context.session.sessionState.postStageCreationRules + private def postStageCreationRules(outputsColumnar: Boolean) = + ApplyColumnarRulesAndInsertTransitions( + context.session.sessionState.columnarRules, outputsColumnar) +: staticPostStageCreationRules private def optimizeQueryStage(plan: SparkPlan, isFinalStage: Boolean): SparkPlan = { val optimized = queryStageOptimizerRules.foldLeft(plan) { case (latestPlan, rule) => @@ -196,13 +199,6 @@ case class AdaptiveSparkPlanExec( override def doCanonicalize(): SparkPlan = inputPlan.canonicalized - // This operator reports that output is row-based but because of the adaptive nature of - // execution, we don't really know whether the output is going to row-based or columnar - // until we start running the query, so there is a finalPlanSupportsColumnar method that - // can be called at execution time to determine what the output format is. - // This operator can safely be wrapped in either RowToColumnarExec or ColumnarToRowExec. - override def supportsColumnar: Boolean = false - override def resetMetrics(): Unit = { metrics.valuesIterator.foreach(_.reset()) executedPlan.resetMetrics() @@ -313,7 +309,7 @@ case class AdaptiveSparkPlanExec( // Run the final plan when there's no more unfinished stages. currentPhysicalPlan = applyPhysicalRules( optimizeQueryStage(result.newPlan, isFinalStage = true), - postStageCreationRules, + postStageCreationRules(supportsColumnar), Some((planChangeLogger, "AQE Post Stage Creation"))) isFinalPlan = true executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan))) @@ -348,21 +344,17 @@ case class AdaptiveSparkPlanExec( withFinalPlanUpdate(_.execute()) } - /** - * Determine if the final query stage supports columnar execution. Calling this method - * will trigger query execution of child query stages if they have not already executed. - * - * If this method returns true then it is safe to call doExecuteColumnar to execute the - * final stage. - */ - def finalPlanSupportsColumnar(): Boolean = { - getFinalPhysicalPlan().supportsColumnar - } - override def doExecuteColumnar(): RDD[ColumnarBatch] = { withFinalPlanUpdate(_.executeColumnar()) } + override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { + withFinalPlanUpdate { finalPlan => + assert(finalPlan.isInstanceOf[BroadcastQueryStageExec]) + finalPlan.doExecuteBroadcast() + } + } + private def withFinalPlanUpdate[T](fun: SparkPlan => T): T = { val plan = getFinalPhysicalPlan() val result = fun(plan) @@ -370,12 +362,6 @@ case class AdaptiveSparkPlanExec( result } - override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { - val finalPlan = getFinalPhysicalPlan() - assert(finalPlan.isInstanceOf[BroadcastQueryStageExec]) - finalPlan.doExecuteBroadcast() - } - protected override def stringArgs: Iterator[Any] = Iterator(s"isFinalPlan=$isFinalPlan") override def generateTreeString( @@ -536,7 +522,7 @@ case class AdaptiveSparkPlanExec( case s: ShuffleExchangeLike => val newShuffle = applyPhysicalRules( s.withNewChildren(Seq(optimizedPlan)), - postStageCreationRules, + postStageCreationRules(outputsColumnar = false), Some((planChangeLogger, "AQE Post Stage Creation"))) if (!newShuffle.isInstanceOf[ShuffleExchangeLike]) { throw new IllegalStateException( @@ -546,7 +532,7 @@ case class AdaptiveSparkPlanExec( case b: BroadcastExchangeLike => val newBroadcast = applyPhysicalRules( b.withNewChildren(Seq(optimizedPlan)), - postStageCreationRules, + postStageCreationRules(outputsColumnar = false), Some((planChangeLogger, "AQE Post Stage Creation"))) if (!newBroadcast.isInstanceOf[BroadcastExchangeLike]) { throw new IllegalStateException( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala index df08acd..75223a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala @@ -27,7 +27,7 @@ class ColumnarRulesSuite extends PlanTest with SharedSparkSession { test("Idempotency of columnar rules - RowToColumnar/ColumnarToRow") { val rules = ApplyColumnarRulesAndInsertTransitions( - spark.sessionState.columnarRules) + spark.sessionState.columnarRules, false) val plan = UnaryOp(UnaryOp(LeafOp(false), true), false) val expected = @@ -40,7 +40,7 @@ class ColumnarRulesSuite extends PlanTest with SharedSparkSession { test("Idempotency of columnar rules - ColumnarToRow/RowToColumnar") { val rules = ApplyColumnarRulesAndInsertTransitions( - spark.sessionState.columnarRules) + spark.sessionState.columnarRules, false) val plan = UnaryOp(UnaryOp(LeafOp(true), false), true) val expected = ColumnarToRowExec( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org