Daniel Darabos created SPARK-12964: -------------------------------------- Summary: SparkContext.localProperties leaked Key: SPARK-12964 URL: https://issues.apache.org/jira/browse/SPARK-12964 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.6.0 Reporter: Daniel Darabos Priority: Minor
I have a non-deterministic but quite reliable reproduction for a case where {{spark.sql.execution.id}} is leaked. Operations then die with {{spark.sql.execution.id is already set}}. These threads never recover because there is nothing to unset {{spark.sql.execution.id}}. (It's not a case of nested {{withNewExecutionId}} calls.) I have figured out why this happens. We are within a {{withNewExecutionId}} block. At some point we call back to user code. (In our case this is a custom data source's {{buildScan}} method.) The user code calls {{scala.concurrent.Await.result}}. Because our thread is a member of a {{ForkJoinPool}} (this is a Play HTTP serving thread) {{Await.result}} starts a new thread. {{SparkContext.localProperties}} is cloned for this thread and then it's ready to serve an HTTP request. The first thread then finishes waiting, finishes {{buildScan}}, and leaves {{withNewExecutionId}}, clearing {{spark.sql.execution.id}} in the {{finally} block. All good. But some time later another HTTP request will be served by the second thread. This thread is "poisoned" with a {{spark.sql.execution.id}}. When it tries to use {{withNewExecutionId}} it fails. ---- I don't know who's at fault here. - I don't like the {{ThreadLocal}} properties anyway. Why not create an Execution object and let it wrap the operation? Then you could have two executions in parallel on the same thread, and other stuff like that. It would be much clearer than storing the execution ID in a kind-of-global variable. - Why do we have to inherit the {{ThreadLocal}} properties? I'm sure there is a good reason, but this is essentially a bug-generator in my view. (It has already generated https://issues.apache.org/jira/browse/SPARK-10563.) - {{Await.result}} --- I never would have thought it starts threads. - We probably shouldn't be calling {{Await.result}} inside {{buildScan}}. - We probably shouldn't call Spark things from HTTP serving threads. I'm not sure what could be done on the Spark side, but I thought I should mention this interesting issue. For supporting evidence here is the stack trace when {{localProperties}} is getting cloned. It's contents at that point are: {{ spark.sql.execution.id=0, spark.rdd.scope.noOverride=true, spark.rdd.scope={"id":"4","name":"ExecutedCommand"} }} {noformat} at org.apache.spark.SparkContext$$anon$2.childValue(SparkContext.scala:364) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.SparkContext$$anon$2.childValue(SparkContext.scala:362) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at java.lang.ThreadLocal$ThreadLocalMap.<init>(ThreadLocal.java:353) [na:1.7.0_91] at java.lang.ThreadLocal$ThreadLocalMap.<init>(ThreadLocal.java:261) [na:1.7.0_91] at java.lang.ThreadLocal.createInheritedMap(ThreadLocal.java:236) [na:1.7.0_91] at java.lang.Thread.init(Thread.java:416) [na:1.7.0_91] at java.lang.Thread.init(Thread.java:349) [na:1.7.0_91] at java.lang.Thread.<init>(Thread.java:508) [na:1.7.0_91] at scala.concurrent.forkjoin.ForkJoinWorkerThread.<init>(ForkJoinWorkerThread.java:48) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$2.<init>(ExecutionContextImpl.scala:42) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory.newThread(ExecutionContextImpl.scala:42) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.tryCompensate(ForkJoinPool.java:2341) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3638) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$2.blockOn(ExecutionContextImpl.scala:45) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.Await$.result(package.scala:107) [org.scala-lang.scala-library-2.10.5.jar:na] at com.lynxanalytics.biggraph.graph_api.SafeFuture.awaitResult(SafeFuture.scala:50) [biggraph.jar] at com.lynxanalytics.biggraph.graph_api.DataManager.get(DataManager.scala:315) [biggraph.jar] at com.lynxanalytics.biggraph.graph_api.Scripting$.getData(Scripting.scala:87) [biggraph.jar] at com.lynxanalytics.biggraph.table.TableRelation$$anonfun$1.apply(DefaultSource.scala:46) [biggraph.jar] at com.lynxanalytics.biggraph.table.TableRelation$$anonfun$1.apply(DefaultSource.scala:46) [biggraph.jar] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.collection.AbstractTraversable.map(Traversable.scala:105) [org.scala-lang.scala-library-2.10.5.jar:na] at com.lynxanalytics.biggraph.table.TableRelation.buildScan(DefaultSource.scala:46) [biggraph.jar] at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$4.apply(DataSourceStrategy.scala:64) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$4.apply(DataSourceStrategy.scala:64) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:274) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:273) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:352) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:269) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:60) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) [org.scala-lang.scala-library-2.10.5.jar:na] at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:47) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:45) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:52) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:52) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$5.apply(QueryExecution.scala:78) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$5.apply(QueryExecution.scala:78) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:58) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:78) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at com.lynxanalytics.biggraph.controllers.SQLController.exportSQLQuery(SQLController.scala:112) [biggraph.jar] at com.lynxanalytics.biggraph.serving.ProductionJsonServer$$anonfun$exportSQLQuery$1.apply(JsonServer.scala:357) [biggraph.jar] at com.lynxanalytics.biggraph.serving.ProductionJsonServer$$anonfun$exportSQLQuery$1.apply(JsonServer.scala:357) [biggraph.jar] at com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1$$anonfun$1.apply(JsonServer.scala:65) [biggraph.jar] at com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1$$anonfun$1.apply(JsonServer.scala:65) [biggraph.jar] at scala.util.Try$.apply(Try.scala:161) [org.scala-lang.scala-library-2.10.5.jar:na] at com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1.apply(JsonServer.scala:65) [biggraph.jar] at com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1.apply(JsonServer.scala:58) [biggraph.jar] at com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$action$1$$anonfun$apply$1.apply(JsonServer.scala:34) [biggraph.jar] at com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$action$1$$anonfun$apply$1.apply(JsonServer.scala:34) [biggraph.jar] at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [org.scala-lang.scala-library-2.10.5.jar:na] {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org