Repository: spark Updated Branches: refs/heads/branch-2.0 91dffcabd -> f0fa0a894
[SPARK-15759] [SQL] Fallback to non-codegen when fail to compile generated code ## What changes were proposed in this pull request? In case of any bugs in whole-stage codegen, the generated code can't be compiled, we should fallback to non-codegen to make sure that query could run. The batch mode of new parquet reader depends on codegen, can't be easily switched to non-batch mode, so we still use codegen for batched scan (for parquet). Because it only support primitive types and the number of columns is less than spark.sql.codegen.maxFields (100), it should not fail. This could be configurable by `spark.sql.codegen.fallback` ## How was this patch tested? Manual test it with buggy operator, it worked well. Author: Davies Liu <[email protected]> Closes #13501 from davies/codegen_fallback. (cherry picked from commit 7504bc73f20fe0e6546a019ed91c3fd3804287ba) Signed-off-by: Davies Liu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0fa0a89 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0fa0a89 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0fa0a89 Branch: refs/heads/branch-2.0 Commit: f0fa0a8946fb4bdf0f4697a8e389f49e98422871 Parents: 91dffca Author: Davies Liu <[email protected]> Authored: Fri Jun 10 21:12:06 2016 -0700 Committer: Davies Liu <[email protected]> Committed: Fri Jun 10 21:12:15 2016 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/execution/ExistingRDD.scala | 5 ++++- .../spark/sql/execution/WholeStageCodegenExec.scala | 11 ++++++++++- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 11 ++++++++++- 3 files changed, 24 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f0fa0a89/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 9ab98fd1..ee72a70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -234,7 +234,10 @@ private[sql] case class BatchedDataSourceScanExec( "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) protected override def doExecute(): RDD[InternalRow] = { - throw new UnsupportedOperationException + // in the case of fallback, this batched scan should never fail because of: + // 1) only primitive types are supported + // 2) the number of columns should be smaller than spark.sql.codegen.maxFields + WholeStageCodegenExec(this).execute() } override def simpleString: String = { http://git-wip-us.apache.org/repos/asf/spark/blob/f0fa0a89/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index e0d8e35..ac4c3aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoi import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils /** * An interface for those physical operators that support codegen. @@ -339,12 +340,20 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co new CodeAndComment(CodeFormatter.stripExtraNewLines(source), ctx.getPlaceHolderToComments())) logDebug(s"\n${CodeFormatter.format(cleanedSource)}") - CodeGenerator.compile(cleanedSource) (ctx, cleanedSource) } override def doExecute(): RDD[InternalRow] = { val (ctx, cleanedSource) = doCodeGen() + // try to compile and fallback if it failed + try { + CodeGenerator.compile(cleanedSource) + } catch { + case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback => + // We should already saw the error message + logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString") + return child.execute() + } val references = ctx.references.toArray val durationMs = longMetric("pipelineTime") http://git-wip-us.apache.org/repos/asf/spark/blob/f0fa0a89/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 437e093..27b1fff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -433,7 +433,14 @@ object SQLConf { .doc("The maximum number of fields (including nested fields) that will be supported before" + " deactivating whole-stage codegen.") .intConf - .createWithDefault(200) + .createWithDefault(100) + + val WHOLESTAGE_FALLBACK = SQLConfigBuilder("spark.sql.codegen.fallback") + .internal() + .doc("When true, whole stage codegen could be temporary disabled for the part of query that" + + " fail to compile generated code") + .booleanConf + .createWithDefault(true) val MAX_CASES_BRANCHES = SQLConfigBuilder("spark.sql.codegen.maxCaseBranches") .internal() @@ -605,6 +612,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS) + def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK) + def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES) def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
