Repository: spark
Updated Branches:
  refs/heads/branch-1.6 a589736a1 -> 32911de77


[SPARK-11700] [SQL] Remove thread local SQLContext  in SparkPlan

In 1.6, we introduce a public API to have a SQLContext for current thread, 
SparkPlan should use that.

Author: Davies Liu <[email protected]>

Closes #9990 from davies/leak_context.

(cherry picked from commit 17275fa99c670537c52843df405279a52b5c9594)
Signed-off-by: Davies Liu <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32911de7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32911de7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32911de7

Branch: refs/heads/branch-1.6
Commit: 32911de77d0ff346f821c7cd1dc607c611780164
Parents: a589736
Author: Davies Liu <[email protected]>
Authored: Mon Nov 30 10:32:13 2015 -0800
Committer: Davies Liu <[email protected]>
Committed: Mon Nov 30 10:32:30 2015 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/sql/SQLContext.scala  | 10 +++++-----
 .../apache/spark/sql/execution/QueryExecution.scala   |  3 +--
 .../org/apache/spark/sql/execution/SparkPlan.scala    | 14 ++++----------
 .../org/apache/spark/sql/MultiSQLContextsSuite.scala  |  2 +-
 .../sql/execution/ExchangeCoordinatorSuite.scala      |  2 +-
 .../sql/execution/RowFormatConvertersSuite.scala      |  4 ++--
 6 files changed, 14 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/32911de7/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 46bf544..9cc65de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -26,7 +26,6 @@ import scala.collection.immutable
 import scala.reflect.runtime.universe.TypeTag
 import scala.util.control.NonFatal
 
-import org.apache.spark.{SparkException, SparkContext}
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.rdd.RDD
@@ -45,9 +44,10 @@ import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{execution => sparkexecution}
 import org.apache.spark.sql.util.ExecutionListenerManager
+import org.apache.spark.sql.{execution => sparkexecution}
 import org.apache.spark.util.Utils
+import org.apache.spark.{SparkContext, SparkException}
 
 /**
  * The entry point for working with structured data (rows and columns) in 
Spark.  Allows the
@@ -401,7 +401,7 @@ class SQLContext private[sql](
    */
   @Experimental
   def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = {
-    SparkPlan.currentContext.set(self)
+    SQLContext.setActive(self)
     val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
     val attributeSeq = schema.toAttributes
     val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType))
@@ -417,7 +417,7 @@ class SQLContext private[sql](
    */
   @Experimental
   def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame = {
-    SparkPlan.currentContext.set(self)
+    SQLContext.setActive(self)
     val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
     val attributeSeq = schema.toAttributes
     DataFrame(self, LocalRelation.fromProduct(attributeSeq, data))
@@ -1328,7 +1328,7 @@ object SQLContext {
     activeContext.remove()
   }
 
-  private[sql] def getActiveContextOption(): Option[SQLContext] = {
+  private[sql] def getActive(): Option[SQLContext] = {
     Option(activeContext.get())
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/32911de7/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 5da5aea..107570f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -42,9 +42,8 @@ class QueryExecution(val sqlContext: SQLContext, val logical: 
LogicalPlan) {
 
   lazy val optimizedPlan: LogicalPlan = 
sqlContext.optimizer.execute(withCachedData)
 
-  // TODO: Don't just pick the first one...
   lazy val sparkPlan: SparkPlan = {
-    SparkPlan.currentContext.set(sqlContext)
+    SQLContext.setActive(sqlContext)
     sqlContext.planner.plan(optimizedPlan).next()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/32911de7/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 534a3bc..507641f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -23,21 +23,15 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.Logging
 import org.apache.spark.rdd.{RDD, RDDOperationScope}
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetric}
 import org.apache.spark.sql.types.DataType
 
-object SparkPlan {
-  protected[sql] val currentContext = new ThreadLocal[SQLContext]()
-}
-
 /**
  * The base class for physical operators.
  */
@@ -49,7 +43,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
    * populated by the query planning infrastructure.
    */
   @transient
-  protected[spark] final val sqlContext = SparkPlan.currentContext.get()
+  protected[spark] final val sqlContext = SQLContext.getActive().get
 
   protected def sparkContext = sqlContext.sparkContext
 
@@ -69,7 +63,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
 
   /** Overridden make copy also propogates sqlContext to copied plan. */
   override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = {
-    SparkPlan.currentContext.set(sqlContext)
+    SQLContext.setActive(sqlContext)
     super.makeCopy(newArgs)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/32911de7/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala
index 34c5c68..162c0b5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala
@@ -27,7 +27,7 @@ class MultiSQLContextsSuite extends SparkFunSuite with 
BeforeAndAfterAll {
   private var sparkConf: SparkConf = _
 
   override protected def beforeAll(): Unit = {
-    originalActiveSQLContext = SQLContext.getActiveContextOption()
+    originalActiveSQLContext = SQLContext.getActive()
     originalInstantiatedSQLContext = SQLContext.getInstantiatedContextOption()
 
     SQLContext.clearActive()

http://git-wip-us.apache.org/repos/asf/spark/blob/32911de7/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
index b96d50a..180050b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
@@ -30,7 +30,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with 
BeforeAndAfterAll {
   private var originalInstantiatedSQLContext: Option[SQLContext] = _
 
   override protected def beforeAll(): Unit = {
-    originalActiveSQLContext = SQLContext.getActiveContextOption()
+    originalActiveSQLContext = SQLContext.getActive()
     originalInstantiatedSQLContext = SQLContext.getInstantiatedContextOption()
 
     SQLContext.clearActive()

http://git-wip-us.apache.org/repos/asf/spark/blob/32911de7/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
index 6876ab0..13d68a1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{SQLContext, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Attribute, Literal, IsNull}
 import org.apache.spark.sql.catalyst.util.GenericArrayData
@@ -94,7 +94,7 @@ class RowFormatConvertersSuite extends SparkPlanTest with 
SharedSQLContext {
   }
 
   test("SPARK-9683: copy UTF8String when convert unsafe array/map to safe") {
-    SparkPlan.currentContext.set(sqlContext)
+    SQLContext.setActive(sqlContext)
     val schema = ArrayType(StringType)
     val rows = (1 to 100).map { i =>
       InternalRow(new 
GenericArrayData(Array[Any](UTF8String.fromString(i.toString))))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to