[
https://issues.apache.org/jira/browse/SPARK-12964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon resolved SPARK-12964.
----------------------------------
Resolution: Incomplete
> SparkContext.localProperties leaked
> -----------------------------------
>
> Key: SPARK-12964
> URL: https://issues.apache.org/jira/browse/SPARK-12964
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.6.0
> Reporter: Daniel Darabos
> Priority: Minor
> Labels: bulk-closed
>
> I have a non-deterministic but quite reliable reproduction for a case where
> {{spark.sql.execution.id}} is leaked. Operations then die with
> {{spark.sql.execution.id is already set}}. These threads never recover
> because there is nothing to unset {{spark.sql.execution.id}}. (It's not a
> case of nested {{withNewExecutionId}} calls.)
> I have figured out why this happens. We are within a {{withNewExecutionId}}
> block. At some point we call back to user code. (In our case this is a custom
> data source's {{buildScan}} method.) The user code calls
> {{scala.concurrent.Await.result}}. Because our thread is a member of a
> {{ForkJoinPool}} (this is a Play HTTP serving thread) {{Await.result}} starts
> a new thread. {{SparkContext.localProperties}} is cloned for this thread and
> then it's ready to serve an HTTP request.
> The first thread then finishes waiting, finishes {{buildScan}}, and leaves
> {{withNewExecutionId}}, clearing {{spark.sql.execution.id}} in the
> {{finally}} block. All good. But some time later another HTTP request will be
> served by the second thread. This thread is "poisoned" with a
> {{spark.sql.execution.id}}. When it tries to use {{withNewExecutionId}} it
> fails.
> ----
> I don't know who's at fault here.
> - I don't like the {{ThreadLocal}} properties anyway. Why not create an
> Execution object and let it wrap the operation? Then you could have two
> executions in parallel on the same thread, and other stuff like that. It
> would be much clearer than storing the execution ID in a kind-of-global
> variable.
> - Why do we have to inherit the {{ThreadLocal}} properties? I'm sure there
> is a good reason, but this is essentially a bug-generator in my view. (It has
> already generated https://issues.apache.org/jira/browse/SPARK-10563.)
> - {{Await.result}} --- I never would have thought it starts threads.
> - We probably shouldn't be calling {{Await.result}} inside {{buildScan}}.
> - We probably shouldn't call Spark things from HTTP serving threads.
> I'm not sure what could be done on the Spark side, but I thought I should
> mention this interesting issue. For supporting evidence here is the stack
> trace when {{localProperties}} is getting cloned. It's contents at that point
> are:
> {noformat}
> {spark.sql.execution.id=0, spark.rdd.scope.noOverride=true,
> spark.rdd.scope={"id":"4","name":"ExecutedCommand"}}
> {noformat}
> {noformat}
> at org.apache.spark.SparkContext$$anon$2.childValue(SparkContext.scala:364)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at org.apache.spark.SparkContext$$anon$2.childValue(SparkContext.scala:362)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at java.lang.ThreadLocal$ThreadLocalMap.<init>(ThreadLocal.java:353)
> [na:1.7.0_91]
> at java.lang.ThreadLocal$ThreadLocalMap.<init>(ThreadLocal.java:261)
> [na:1.7.0_91]
> at java.lang.ThreadLocal.createInheritedMap(ThreadLocal.java:236)
> [na:1.7.0_91]
> at java.lang.Thread.init(Thread.java:416) [na:1.7.0_91]
>
> at java.lang.Thread.init(Thread.java:349) [na:1.7.0_91]
>
> at java.lang.Thread.<init>(Thread.java:508) [na:1.7.0_91]
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.<init>(ForkJoinWorkerThread.java:48)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> at
> scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$2.<init>(ExecutionContextImpl.scala:42)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> at
> scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory.newThread(ExecutionContextImpl.scala:42)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> at
> scala.concurrent.forkjoin.ForkJoinPool.tryCompensate(ForkJoinPool.java:2341)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> at
> scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3638)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> at
> scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$2.blockOn(ExecutionContextImpl.scala:45)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> at scala.concurrent.Await$.result(package.scala:107)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> at
> com.lynxanalytics.biggraph.graph_api.SafeFuture.awaitResult(SafeFuture.scala:50)
> [biggraph.jar]
> at
> com.lynxanalytics.biggraph.graph_api.DataManager.get(DataManager.scala:315)
> [biggraph.jar]
> at
> com.lynxanalytics.biggraph.graph_api.Scripting$.getData(Scripting.scala:87)
> [biggraph.jar]
> at
> com.lynxanalytics.biggraph.table.TableRelation$$anonfun$1.apply(DefaultSource.scala:46)
> [biggraph.jar]
> at
> com.lynxanalytics.biggraph.table.TableRelation$$anonfun$1.apply(DefaultSource.scala:46)
> [biggraph.jar]
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> at
> com.lynxanalytics.biggraph.table.TableRelation.buildScan(DefaultSource.scala:46)
> [biggraph.jar]
> at
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$4.apply(DataSourceStrategy.scala:64)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$4.apply(DataSourceStrategy.scala:64)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:274)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:273)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:352)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:269)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:60)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:47)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:45)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:52)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:52)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$5.apply(QueryExecution.scala:78)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$5.apply(QueryExecution.scala:78)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:58)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:78)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
> [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
> at
> com.lynxanalytics.biggraph.controllers.SQLController.exportSQLQuery(SQLController.scala:112)
> [biggraph.jar]
> at
> com.lynxanalytics.biggraph.serving.ProductionJsonServer$$anonfun$exportSQLQuery$1.apply(JsonServer.scala:357)
> [biggraph.jar]
> at
> com.lynxanalytics.biggraph.serving.ProductionJsonServer$$anonfun$exportSQLQuery$1.apply(JsonServer.scala:357)
> [biggraph.jar]
> at
> com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1$$anonfun$1.apply(JsonServer.scala:65)
> [biggraph.jar]
> at
> com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1$$anonfun$1.apply(JsonServer.scala:65)
> [biggraph.jar]
> at scala.util.Try$.apply(Try.scala:161)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> at
> com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1.apply(JsonServer.scala:65)
> [biggraph.jar]
> at
> com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1.apply(JsonServer.scala:58)
> [biggraph.jar]
> at
> com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$action$1$$anonfun$apply$1.apply(JsonServer.scala:34)
> [biggraph.jar]
> at
> com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$action$1$$anonfun$apply$1.apply(JsonServer.scala:34)
> [biggraph.jar]
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> at
> scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [org.scala-lang.scala-library-2.10.5.jar:na]
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]