This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 3038a81 [SPARK-30556][SQL][FOLLOWUP] Reset the status changed in
SQLExecution.withThreadLocalCaptured
3038a81 is described below
commit 3038a81ecdb526b01e80eeb34a7eacc6ac48d360
Author: Yuanjian Li <[email protected]>
AuthorDate: Mon Feb 10 22:16:25 2020 +0100
[SPARK-30556][SQL][FOLLOWUP] Reset the status changed in
SQLExecution.withThreadLocalCaptured
### What changes were proposed in this pull request?
Follow up for #27267, reset the status changed in
SQLExecution.withThreadLocalCaptured.
### Why are the changes needed?
For code safety.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing UT.
Closes #27516 from xuanyuanking/SPARK-30556-follow.
Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: herman <[email protected]>
(cherry picked from commit a6b91d2bf727e175d0e175295001db85647539b1)
Signed-off-by: herman <[email protected]>
---
.../scala/org/apache/spark/sql/execution/SQLExecution.scala | 12 +++++++++++-
.../apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala | 10 ++++++----
2 files changed, 17 insertions(+), 5 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 995d94e..9f17781 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -177,9 +177,19 @@ object SQLExecution {
val sc = sparkSession.sparkContext
val localProps = Utils.cloneProperties(sc.getLocalProperties)
Future {
+ val originalSession = SparkSession.getActiveSession
+ val originalLocalProps = sc.getLocalProperties
SparkSession.setActiveSession(activeSession)
sc.setLocalProperties(localProps)
- body
+ val res = body
+ // reset active session and local props.
+ sc.setLocalProperties(originalLocalProps)
+ if (originalSession.nonEmpty) {
+ SparkSession.setActiveSession(originalSession.get)
+ } else {
+ SparkSession.clearActiveSession()
+ }
+ res
}(exec)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
index 0cc658c..46d0c64 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.internal
+import java.util.UUID
+
import org.scalatest.Assertions._
import org.apache.spark.{SparkException, SparkFunSuite, TaskContext}
@@ -144,16 +146,16 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with
SQLTestUtils {
}
// set local configuration and assert
- val confValue1 = "e"
+ val confValue1 = UUID.randomUUID().toString()
createDataframe(confKey, confValue1).createOrReplaceTempView("m")
spark.sparkContext.setLocalProperty(confKey, confValue1)
- assert(sql("SELECT * FROM l WHERE EXISTS (SELECT * FROM
m)").collect.size == 1)
+ assert(sql("SELECT * FROM l WHERE EXISTS (SELECT * FROM
m)").collect().length == 1)
// change the conf value and assert again
- val confValue2 = "f"
+ val confValue2 = UUID.randomUUID().toString()
createDataframe(confKey, confValue2).createOrReplaceTempView("n")
spark.sparkContext.setLocalProperty(confKey, confValue2)
- assert(sql("SELECT * FROM l WHERE EXISTS (SELECT * FROM
n)").collect().size == 1)
+ assert(sql("SELECT * FROM l WHERE EXISTS (SELECT * FROM
n)").collect().length == 1)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]