This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new a2a0c52 [SPARK-31532][SQL] Builder should not propagate static sql configs to the existing active or default SparkSession a2a0c52 is described below commit a2a0c52d7b21ef1e5f06cee6c8c83ad82f8b1b0b Author: Kent Yao <yaooq...@hotmail.com> AuthorDate: Sat Apr 25 08:53:00 2020 +0900 [SPARK-31532][SQL] Builder should not propagate static sql configs to the existing active or default SparkSession ### What changes were proposed in this pull request? SparkSessionBuilder shoud not propagate static sql configurations to the existing active/default SparkSession This seems a long-standing bug. ```scala scala> spark.sql("set spark.sql.warehouse.dir").show +--------------------+--------------------+ | key| value| +--------------------+--------------------+ |spark.sql.warehou...|file:/Users/kenty...| +--------------------+--------------------+ scala> spark.sql("set spark.sql.warehouse.dir=2"); org.apache.spark.sql.AnalysisException: Cannot modify the value of a static config: spark.sql.warehouse.dir; at org.apache.spark.sql.RuntimeConfig.requireNonStaticConf(RuntimeConfig.scala:154) at org.apache.spark.sql.RuntimeConfig.set(RuntimeConfig.scala:42) at org.apache.spark.sql.execution.command.SetCommand.$anonfun$x$7$6(SetCommand.scala:100) at org.apache.spark.sql.execution.command.SetCommand.run(SetCommand.scala:156) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602) ... 47 elided scala> import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession scala> SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").get getClass getOrCreate scala> SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").getOrCreate 20/04/23 23:49:13 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect. res7: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession6403d574 scala> spark.sql("set spark.sql.warehouse.dir").show +--------------------+-----+ | key|value| +--------------------+-----+ |spark.sql.warehou...| xyz| +--------------------+-----+ scala> OptionsAttachments ``` ### Why are the changes needed? bugfix as shown in the previous section ### Does this PR introduce any user-facing change? Yes, static SQL configurations with SparkSession.builder.config do not propagate to any existing or new SparkSession instances. ### How was this patch tested? new ut. Closes #28316 from yaooqinn/SPARK-31532. Authored-by: Kent Yao <yaooq...@hotmail.com> Signed-off-by: Takeshi Yamamuro <yamam...@apache.org> (cherry picked from commit 8424f552293677717da7411ed43e68e73aa7f0d6) Signed-off-by: Takeshi Yamamuro <yamam...@apache.org> --- .../scala/org/apache/spark/sql/SparkSession.scala | 28 +++++++++---- .../spark/sql/SparkSessionBuilderSuite.scala | 49 +++++++++++++++++++++- 2 files changed, 67 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index edbd02b..69297f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -894,7 +894,7 @@ object SparkSession extends Logging { * SparkSession exists, the method creates a new SparkSession and assigns the * newly created SparkSession as the global default. * - * In case an existing SparkSession is returned, the config options specified in + * In case an existing SparkSession is returned, the non-static config options specified in * this builder will be applied to the existing SparkSession. * * @since 2.0.0 @@ -904,10 +904,7 @@ object SparkSession extends Logging { // Get the session from current thread's active session. var session = activeThreadSession.get() if ((session ne null) && !session.sparkContext.isStopped) { - options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } - if (options.nonEmpty) { - logWarning("Using an existing SparkSession; some configuration may not take effect.") - } + applyModifiableSettings(session) return session } @@ -916,10 +913,7 @@ object SparkSession extends Logging { // If the current thread does not have an active session, get it from the global session. session = defaultSession.get() if ((session ne null) && !session.sparkContext.isStopped) { - options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } - if (options.nonEmpty) { - logWarning("Using an existing SparkSession; some configuration may not take effect.") - } + applyModifiableSettings(session) return session } @@ -972,6 +966,22 @@ object SparkSession extends Logging { return session } + + private def applyModifiableSettings(session: SparkSession): Unit = { + val (staticConfs, otherConfs) = + options.partition(kv => SQLConf.staticConfKeys.contains(kv._1)) + + otherConfs.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } + + if (staticConfs.nonEmpty) { + logWarning("Using an existing SparkSession; the static sql configurations will not take" + + " effect.") + } + if (otherConfs.nonEmpty) { + logWarning("Using an existing SparkSession; some spark core configurations may not take" + + " effect.") + } + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index a57f09e..632856d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -21,7 +21,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE +import org.apache.spark.sql.internal.StaticSQLConf._ /** * Test cases for the builder pattern of [[SparkSession]]. @@ -167,4 +167,51 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { assert(session.sessionState.conf.getConfString("spark.app.name") === "test-app-SPARK-31234") assert(session.sessionState.conf.getConf(GLOBAL_TEMP_DATABASE) === "globalTempDB-SPARK-31234") } + + test("SPARK-31532: should not propagate static sql configs to the existing" + + " active/default SparkSession") { + val session = SparkSession.builder() + .master("local") + .config(GLOBAL_TEMP_DATABASE.key, value = "globalTempDB-SPARK-31532") + .config("spark.app.name", "test-app-SPARK-31532") + .getOrCreate() + // do not propagate static sql configs to the existing active session + val session1 = SparkSession + .builder() + .config(GLOBAL_TEMP_DATABASE.key, "globalTempDB-SPARK-31532-1") + .getOrCreate() + assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532") + assert(session1.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532") + + // do not propagate static sql configs to the existing default session + SparkSession.clearActiveSession() + val session2 = SparkSession + .builder() + .config(WAREHOUSE_PATH.key, "SPARK-31532-db") + .config(GLOBAL_TEMP_DATABASE.key, value = "globalTempDB-SPARK-31532-2") + .getOrCreate() + + assert(!session.conf.get(WAREHOUSE_PATH).contains("SPARK-31532-db")) + assert(session.conf.get(WAREHOUSE_PATH) === session2.conf.get(WAREHOUSE_PATH)) + assert(session2.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532") + } + + test("SPARK-31532: propagate static sql configs if no existing SparkSession") { + val conf = new SparkConf() + .setMaster("local") + .setAppName("test-app-SPARK-31532-2") + .set(GLOBAL_TEMP_DATABASE.key, "globaltempdb-spark-31532") + .set(WAREHOUSE_PATH.key, "SPARK-31532-db") + SparkContext.getOrCreate(conf) + + // propagate static sql configs if no existing session + val session = SparkSession + .builder() + .config(GLOBAL_TEMP_DATABASE.key, "globalTempDB-SPARK-31532-2") + .config(WAREHOUSE_PATH.key, "SPARK-31532-db-2") + .getOrCreate() + assert(session.conf.get("spark.app.name") === "test-app-SPARK-31532-2") + assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532-2") + assert(session.conf.get(WAREHOUSE_PATH) === "SPARK-31532-db-2") + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org