Repository: spark Updated Branches: refs/heads/master 4133c1b0a -> 32d6d9d72
Revert "[SPARK-21845][SQL] Make codegen fallback of expressions configurable" This reverts commit 3d0e174244bc293f11dff0f11ef705ba6cd5fe3a. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32d6d9d7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32d6d9d7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32d6d9d7 Branch: refs/heads/master Commit: 32d6d9d72019404ebd47f6aa64197d9f574bac8b Parents: 4133c1b Author: gatorsmile <[email protected]> Authored: Wed Aug 30 09:08:40 2017 -0700 Committer: gatorsmile <[email protected]> Committed: Wed Aug 30 09:08:40 2017 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/internal/SQLConf.scala | 6 +++--- .../org/apache/spark/sql/execution/SparkPlan.scala | 15 ++++++++++----- .../spark/sql/execution/WholeStageCodegenExec.scala | 2 +- .../apache/spark/sql/DataFrameFunctionsSuite.scala | 2 +- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 12 +----------- .../org/apache/spark/sql/test/SharedSQLContext.scala | 2 -- .../org/apache/spark/sql/hive/test/TestHive.scala | 1 - 7 files changed, 16 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/32d6d9d7/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 24f51ef..a685099 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -551,9 +551,9 @@ object SQLConf { .intConf .createWithDefault(100) - val CODEGEN_FALLBACK = buildConf("spark.sql.codegen.fallback") + val WHOLESTAGE_FALLBACK = buildConf("spark.sql.codegen.fallback") .internal() - .doc("When true, (whole stage) codegen could be temporary disabled for the part of query that" + + .doc("When true, whole stage codegen could be temporary disabled for the part of query that" + " fail to compile generated code") .booleanConf .createWithDefault(true) @@ -1041,7 +1041,7 @@ class SQLConf extends Serializable with Logging { def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS) - def codegenFallback: Boolean = getConf(CODEGEN_FALLBACK) + def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK) def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES) http://git-wip-us.apache.org/repos/asf/spark/blob/32d6d9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index b1db9dd..c7277c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -56,10 +56,14 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ protected def sparkContext = sqlContext.sparkContext - // whether we should fallback when hitting compilation errors caused by codegen - private val codeGenFallBack = sqlContext.conf.codegenFallback - - protected val subexpressionEliminationEnabled = sqlContext.conf.subexpressionEliminationEnabled + // sqlContext will be null when we are being deserialized on the slaves. In this instance + // the value of subexpressionEliminationEnabled will be set by the deserializer after the + // constructor has run. + val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) { + sqlContext.conf.subexpressionEliminationEnabled + } else { + false + } /** Overridden make copy also propagates sqlContext to copied plan. */ override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = { @@ -366,7 +370,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ try { GeneratePredicate.generate(expression, inputSchema) } catch { - case _ @ (_: JaninoRuntimeException | _: CompileException) if codeGenFallBack => + case e @ (_: JaninoRuntimeException | _: CompileException) + if sqlContext == null || sqlContext.conf.wholeStageFallback => genInterpretedPredicate(expression, inputSchema) } } http://git-wip-us.apache.org/repos/asf/spark/blob/32d6d9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index a41a7ca..bacb709 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -382,7 +382,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co try { CodeGenerator.compile(cleanedSource) } catch { - case _: Exception if !Utils.isTesting && sqlContext.conf.codegenFallback => + case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback => // We should already saw the error message logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString") return child.execute() http://git-wip-us.apache.org/repos/asf/spark/blob/32d6d9d7/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index 50e4759..0681b9c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -422,7 +422,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { v } withSQLConf( - (SQLConf.CODEGEN_FALLBACK.key, codegenFallback.toString), + (SQLConf.WHOLESTAGE_FALLBACK.key, codegenFallback.toString), (SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString)) { val df = spark.range(0, 4, 1, 4).withColumn("c", c) val rows = df.collect() http://git-wip-us.apache.org/repos/asf/spark/blob/32d6d9d7/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 1334164..5eb34e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2011,17 +2011,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val filter = (0 until N) .foldLeft(lit(false))((e, index) => e.or(df.col(df.columns(index)) =!= "string")) - - withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "true") { - df.filter(filter).count() - } - - withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "false") { - val e = intercept[SparkException] { - df.filter(filter).count() - }.getMessage - assert(e.contains("grows beyond 64 KB")) - } + df.filter(filter).count } test("SPARK-20897: cached self-join should not fail") { http://git-wip-us.apache.org/repos/asf/spark/blob/32d6d9d7/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index cd8d070..1f073d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -24,7 +24,6 @@ import org.scalatest.concurrent.Eventually import org.apache.spark.{DebugFilesystem, SparkConf} import org.apache.spark.sql.{SparkSession, SQLContext} -import org.apache.spark.sql.internal.SQLConf /** * Helper trait for SQL test suites where all tests share a single [[TestSparkSession]]. @@ -35,7 +34,6 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventua new SparkConf() .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName) .set("spark.unsafe.exceptionOnMemoryLeak", "true") - .set(SQLConf.CODEGEN_FALLBACK.key, "false") } /** http://git-wip-us.apache.org/repos/asf/spark/blob/32d6d9d7/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 0f6a81b..10c9a2d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -51,7 +51,6 @@ object TestHive "TestSQLContext", new SparkConf() .set("spark.sql.test", "") - .set(SQLConf.CODEGEN_FALLBACK.key, "false") .set("spark.sql.hive.metastore.barrierPrefixes", "org.apache.spark.sql.hive.execution.PairSerDe") .set("spark.sql.warehouse.dir", TestHiveContext.makeWarehouseDir().toURI.getPath) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
