This is an automated email from the ASF dual-hosted git repository.
yamamuro pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new a2a0c52 [SPARK-31532][SQL] Builder should not propagate static sql
configs to the existing active or default SparkSession
a2a0c52 is described below
commit a2a0c52d7b21ef1e5f06cee6c8c83ad82f8b1b0b
Author: Kent Yao <[email protected]>
AuthorDate: Sat Apr 25 08:53:00 2020 +0900
[SPARK-31532][SQL] Builder should not propagate static sql configs to the
existing active or default SparkSession
### What changes were proposed in this pull request?
SparkSessionBuilder shoud not propagate static sql configurations to the
existing active/default SparkSession
This seems a long-standing bug.
```scala
scala> spark.sql("set spark.sql.warehouse.dir").show
+--------------------+--------------------+
| key| value|
+--------------------+--------------------+
|spark.sql.warehou...|file:/Users/kenty...|
+--------------------+--------------------+
scala> spark.sql("set spark.sql.warehouse.dir=2");
org.apache.spark.sql.AnalysisException: Cannot modify the value of a static
config: spark.sql.warehouse.dir;
at
org.apache.spark.sql.RuntimeConfig.requireNonStaticConf(RuntimeConfig.scala:154)
at org.apache.spark.sql.RuntimeConfig.set(RuntimeConfig.scala:42)
at
org.apache.spark.sql.execution.command.SetCommand.$anonfun$x$7$6(SetCommand.scala:100)
at
org.apache.spark.sql.execution.command.SetCommand.run(SetCommand.scala:156)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602)
... 47 elided
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").get
getClass getOrCreate
scala> SparkSession.builder.config("spark.sql.warehouse.dir",
"xyz").getOrCreate
20/04/23 23:49:13 WARN SparkSession$Builder: Using an existing
SparkSession; some configuration may not take effect.
res7: org.apache.spark.sql.SparkSession =
org.apache.spark.sql.SparkSession6403d574
scala> spark.sql("set spark.sql.warehouse.dir").show
+--------------------+-----+
| key|value|
+--------------------+-----+
|spark.sql.warehou...| xyz|
+--------------------+-----+
scala>
OptionsAttachments
```
### Why are the changes needed?
bugfix as shown in the previous section
### Does this PR introduce any user-facing change?
Yes, static SQL configurations with SparkSession.builder.config do not
propagate to any existing or new SparkSession instances.
### How was this patch tested?
new ut.
Closes #28316 from yaooqinn/SPARK-31532.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>
(cherry picked from commit 8424f552293677717da7411ed43e68e73aa7f0d6)
Signed-off-by: Takeshi Yamamuro <[email protected]>
---
.../scala/org/apache/spark/sql/SparkSession.scala | 28 +++++++++----
.../spark/sql/SparkSessionBuilderSuite.scala | 49 +++++++++++++++++++++-
2 files changed, 67 insertions(+), 10 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index edbd02b..69297f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -894,7 +894,7 @@ object SparkSession extends Logging {
* SparkSession exists, the method creates a new SparkSession and assigns
the
* newly created SparkSession as the global default.
*
- * In case an existing SparkSession is returned, the config options
specified in
+ * In case an existing SparkSession is returned, the non-static config
options specified in
* this builder will be applied to the existing SparkSession.
*
* @since 2.0.0
@@ -904,10 +904,7 @@ object SparkSession extends Logging {
// Get the session from current thread's active session.
var session = activeThreadSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
- options.foreach { case (k, v) =>
session.sessionState.conf.setConfString(k, v) }
- if (options.nonEmpty) {
- logWarning("Using an existing SparkSession; some configuration may
not take effect.")
- }
+ applyModifiableSettings(session)
return session
}
@@ -916,10 +913,7 @@ object SparkSession extends Logging {
// If the current thread does not have an active session, get it from
the global session.
session = defaultSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
- options.foreach { case (k, v) =>
session.sessionState.conf.setConfString(k, v) }
- if (options.nonEmpty) {
- logWarning("Using an existing SparkSession; some configuration may
not take effect.")
- }
+ applyModifiableSettings(session)
return session
}
@@ -972,6 +966,22 @@ object SparkSession extends Logging {
return session
}
+
+ private def applyModifiableSettings(session: SparkSession): Unit = {
+ val (staticConfs, otherConfs) =
+ options.partition(kv => SQLConf.staticConfKeys.contains(kv._1))
+
+ otherConfs.foreach { case (k, v) =>
session.sessionState.conf.setConfString(k, v) }
+
+ if (staticConfs.nonEmpty) {
+ logWarning("Using an existing SparkSession; the static sql
configurations will not take" +
+ " effect.")
+ }
+ if (otherConfs.nonEmpty) {
+ logWarning("Using an existing SparkSession; some spark core
configurations may not take" +
+ " effect.")
+ }
+ }
}
/**
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
index a57f09e..632856d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
@@ -21,7 +21,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
+import org.apache.spark.sql.internal.StaticSQLConf._
/**
* Test cases for the builder pattern of [[SparkSession]].
@@ -167,4 +167,51 @@ class SparkSessionBuilderSuite extends SparkFunSuite with
BeforeAndAfterEach {
assert(session.sessionState.conf.getConfString("spark.app.name") ===
"test-app-SPARK-31234")
assert(session.sessionState.conf.getConf(GLOBAL_TEMP_DATABASE) ===
"globalTempDB-SPARK-31234")
}
+
+ test("SPARK-31532: should not propagate static sql configs to the existing" +
+ " active/default SparkSession") {
+ val session = SparkSession.builder()
+ .master("local")
+ .config(GLOBAL_TEMP_DATABASE.key, value = "globalTempDB-SPARK-31532")
+ .config("spark.app.name", "test-app-SPARK-31532")
+ .getOrCreate()
+ // do not propagate static sql configs to the existing active session
+ val session1 = SparkSession
+ .builder()
+ .config(GLOBAL_TEMP_DATABASE.key, "globalTempDB-SPARK-31532-1")
+ .getOrCreate()
+ assert(session.conf.get(GLOBAL_TEMP_DATABASE) ===
"globaltempdb-spark-31532")
+ assert(session1.conf.get(GLOBAL_TEMP_DATABASE) ===
"globaltempdb-spark-31532")
+
+ // do not propagate static sql configs to the existing default session
+ SparkSession.clearActiveSession()
+ val session2 = SparkSession
+ .builder()
+ .config(WAREHOUSE_PATH.key, "SPARK-31532-db")
+ .config(GLOBAL_TEMP_DATABASE.key, value = "globalTempDB-SPARK-31532-2")
+ .getOrCreate()
+
+ assert(!session.conf.get(WAREHOUSE_PATH).contains("SPARK-31532-db"))
+ assert(session.conf.get(WAREHOUSE_PATH) ===
session2.conf.get(WAREHOUSE_PATH))
+ assert(session2.conf.get(GLOBAL_TEMP_DATABASE) ===
"globaltempdb-spark-31532")
+ }
+
+ test("SPARK-31532: propagate static sql configs if no existing
SparkSession") {
+ val conf = new SparkConf()
+ .setMaster("local")
+ .setAppName("test-app-SPARK-31532-2")
+ .set(GLOBAL_TEMP_DATABASE.key, "globaltempdb-spark-31532")
+ .set(WAREHOUSE_PATH.key, "SPARK-31532-db")
+ SparkContext.getOrCreate(conf)
+
+ // propagate static sql configs if no existing session
+ val session = SparkSession
+ .builder()
+ .config(GLOBAL_TEMP_DATABASE.key, "globalTempDB-SPARK-31532-2")
+ .config(WAREHOUSE_PATH.key, "SPARK-31532-db-2")
+ .getOrCreate()
+ assert(session.conf.get("spark.app.name") === "test-app-SPARK-31532-2")
+ assert(session.conf.get(GLOBAL_TEMP_DATABASE) ===
"globaltempdb-spark-31532-2")
+ assert(session.conf.get(WAREHOUSE_PATH) === "SPARK-31532-db-2")
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]