Daniel Darabos created SPARK-12964:
--------------------------------------

             Summary: SparkContext.localProperties leaked
                 Key: SPARK-12964
                 URL: https://issues.apache.org/jira/browse/SPARK-12964
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 1.6.0
            Reporter: Daniel Darabos
            Priority: Minor


I have a non-deterministic but quite reliable reproduction for a case where 
{{spark.sql.execution.id}} is leaked. Operations then die with 
{{spark.sql.execution.id is already set}}. These threads never recover because 
there is nothing to unset {{spark.sql.execution.id}}. (It's not a case of 
nested {{withNewExecutionId}} calls.)

I have figured out why this happens. We are within a {{withNewExecutionId}} 
block. At some point we call back to user code. (In our case this is a custom 
data source's {{buildScan}} method.) The user code calls 
{{scala.concurrent.Await.result}}. Because our thread is a member of a 
{{ForkJoinPool}} (this is a Play HTTP serving thread) {{Await.result}} starts a 
new thread. {{SparkContext.localProperties}} is cloned for this thread and then 
it's ready to serve an HTTP request.

The first thread then finishes waiting, finishes {{buildScan}}, and leaves 
{{withNewExecutionId}}, clearing {{spark.sql.execution.id}} in the {{finally} 
block. All good. But some time later another HTTP request will be served by the 
second thread. This thread is "poisoned" with a {{spark.sql.execution.id}}. 
When it tries to use {{withNewExecutionId}} it fails.

----

I don't know who's at fault here. 

 - I don't like the {{ThreadLocal}} properties anyway. Why not create an 
Execution object and let it wrap the operation? Then you could have two 
executions in parallel on the same thread, and other stuff like that. It would 
be much clearer than storing the execution ID in a kind-of-global variable.
 - Why do we have to inherit the {{ThreadLocal}} properties? I'm sure there is 
a good reason, but this is essentially a bug-generator in my view. (It has 
already generated https://issues.apache.org/jira/browse/SPARK-10563.)
 - {{Await.result}} --- I never would have thought it starts threads.
 - We probably shouldn't be calling {{Await.result}} inside {{buildScan}}.
 - We probably shouldn't call Spark things from HTTP serving threads.

I'm not sure what could be done on the Spark side, but I thought I should 
mention this interesting issue. For supporting evidence here is the stack trace 
when {{localProperties}} is getting cloned. It's contents at that point are: {{ 
spark.sql.execution.id=0, spark.rdd.scope.noOverride=true, 
spark.rdd.scope={"id":"4","name":"ExecutedCommand"} }}

{noformat}
  at org.apache.spark.SparkContext$$anon$2.childValue(SparkContext.scala:364) 
[spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at org.apache.spark.SparkContext$$anon$2.childValue(SparkContext.scala:362) 
[spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at java.lang.ThreadLocal$ThreadLocalMap.<init>(ThreadLocal.java:353) 
[na:1.7.0_91]                
  at java.lang.ThreadLocal$ThreadLocalMap.<init>(ThreadLocal.java:261) 
[na:1.7.0_91]                
  at java.lang.ThreadLocal.createInheritedMap(ThreadLocal.java:236) 
[na:1.7.0_91]                   
  at java.lang.Thread.init(Thread.java:416) [na:1.7.0_91]                       
                    
  at java.lang.Thread.init(Thread.java:349) [na:1.7.0_91]                       
                    
  at java.lang.Thread.<init>(Thread.java:508) [na:1.7.0_91]                     
                    
  at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.<init>(ForkJoinWorkerThread.java:48)
 [org.scala-lang.scala-library-2.10.5.jar:na]
  at 
scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$2.<init>(ExecutionContextImpl.scala:42)
 [org.scala-lang.scala-library-2.10.5.jar:na]
  at 
scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory.newThread(ExecutionContextImpl.scala:42)
 [org.scala-lang.scala-library-2.10.5.jar:na]
  at 
scala.concurrent.forkjoin.ForkJoinPool.tryCompensate(ForkJoinPool.java:2341) 
[org.scala-lang.scala-library-2.10.5.jar:na]
  at 
scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3638) 
[org.scala-lang.scala-library-2.10.5.jar:na]
  at 
scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$2.blockOn(ExecutionContextImpl.scala:45)
 [org.scala-lang.scala-library-2.10.5.jar:na]
  at scala.concurrent.Await$.result(package.scala:107) 
[org.scala-lang.scala-library-2.10.5.jar:na] 
  at 
com.lynxanalytics.biggraph.graph_api.SafeFuture.awaitResult(SafeFuture.scala:50)
 [biggraph.jar]
  at 
com.lynxanalytics.biggraph.graph_api.DataManager.get(DataManager.scala:315) 
[biggraph.jar]     
  at 
com.lynxanalytics.biggraph.graph_api.Scripting$.getData(Scripting.scala:87) 
[biggraph.jar]     
  at 
com.lynxanalytics.biggraph.table.TableRelation$$anonfun$1.apply(DefaultSource.scala:46)
 [biggraph.jar]
  at 
com.lynxanalytics.biggraph.table.TableRelation$$anonfun$1.apply(DefaultSource.scala:46)
 [biggraph.jar]
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 [org.scala-lang.scala-library-2.10.5.jar:na]
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 [org.scala-lang.scala-library-2.10.5.jar:na]
  at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 [org.scala-lang.scala-library-2.10.5.jar:na]
  at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
[org.scala-lang.scala-library-2.10.5.jar:na]
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
[org.scala-lang.scala-library-2.10.5.jar:na]
  at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
[org.scala-lang.scala-library-2.10.5.jar:na]
  at 
com.lynxanalytics.biggraph.table.TableRelation.buildScan(DefaultSource.scala:46)
 [biggraph.jar]
  at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$4.apply(DataSourceStrategy.scala:64)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$4.apply(DataSourceStrategy.scala:64)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:274)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:273)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:352)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:269)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:60)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
[org.scala-lang.scala-library-2.10.5.jar:na]
  at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59) 
[spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:47)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:45)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:52)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:52)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$5.apply(QueryExecution.scala:78)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$5.apply(QueryExecution.scala:78)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:58)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:78) 
[spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) 
[spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
[spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
[spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) 
[spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
 [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) 
[spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139) 
[spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
  at 
com.lynxanalytics.biggraph.controllers.SQLController.exportSQLQuery(SQLController.scala:112)
 [biggraph.jar]
  at 
com.lynxanalytics.biggraph.serving.ProductionJsonServer$$anonfun$exportSQLQuery$1.apply(JsonServer.scala:357)
 [biggraph.jar]
  at 
com.lynxanalytics.biggraph.serving.ProductionJsonServer$$anonfun$exportSQLQuery$1.apply(JsonServer.scala:357)
 [biggraph.jar]
  at 
com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1$$anonfun$1.apply(JsonServer.scala:65)
 [biggraph.jar]
  at 
com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1$$anonfun$1.apply(JsonServer.scala:65)
 [biggraph.jar]
  at scala.util.Try$.apply(Try.scala:161) 
[org.scala-lang.scala-library-2.10.5.jar:na]              
  at 
com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1.apply(JsonServer.scala:65)
 [biggraph.jar]
  at 
com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1.apply(JsonServer.scala:58)
 [biggraph.jar]
  at 
com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$action$1$$anonfun$apply$1.apply(JsonServer.scala:34)
 [biggraph.jar]
  at 
com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$action$1$$anonfun$apply$1.apply(JsonServer.scala:34)
 [biggraph.jar]
  at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
 [org.scala-lang.scala-library-2.10.5.jar:na]
  at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
[org.scala-lang.scala-library-2.10.5.jar:na]
  at 
scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
 [org.scala-lang.scala-library-2.10.5.jar:na]
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[org.scala-lang.scala-library-2.10.5.jar:na]
  at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 [org.scala-lang.scala-library-2.10.5.jar:na]
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[org.scala-lang.scala-library-2.10.5.jar:na]
  at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 [org.scala-lang.scala-library-2.10.5.jar:na]
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to