Repository: spark
Updated Branches:
refs/heads/branch-0.9 6665df6b7 -> 2f03fc17c
[SPARK-1210] Prevent ContextClassLoader of Actor from becoming ClassLoader of
Executo...
...r.
Constructor of `org.apache.spark.executor.Executor` should not set context
class loader of current thread, which is backend Actor's thread.
Run the following code in local-mode REPL.
```
scala> case class Foo(i: Int)
scala> val ret = sc.parallelize((1 to 100).map(Foo), 10).collect
```
This causes errors as follows:
```
ERROR actor.OneForOneStrategy: [L$line5.$read$$iwC$$iwC$$iwC$$iwC$Foo;
java.lang.ArrayStoreException: [L$line5.$read$$iwC$$iwC$$iwC$$iwC$Foo;
at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
at
org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:870)
at
org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:870)
at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:859)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```
This is because the class loaders to deserialize result `Foo` instances might
be different from backend Actor's, and the Actor's class loader should be the
same as Driver's.
Author: Takuya UESHIN <[email protected]>
Closes #15 from ueshin/wip/wrongcontextclassloader and squashes the following
commits:
d79e8c0 [Takuya UESHIN] Change a parent class loader of ExecutorURLClassLoader.
c6c09b6 [Takuya UESHIN] Add a test to collect objects of class defined in repl.
43e0feb [Takuya UESHIN] Prevent ContextClassLoader of Actor from becoming
ClassLoader of Executor.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f03fc17
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f03fc17
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f03fc17
Branch: refs/heads/branch-0.9
Commit: 2f03fc17c3511bc6ef0965c3371d6a3789853486
Parents: 6665df6
Author: Takuya UESHIN <[email protected]>
Authored: Thu Mar 27 22:17:15 2014 -0700
Committer: Sean Owen <[email protected]>
Committed: Sat Mar 14 08:48:47 2015 +0000
----------------------------------------------------------------------
.../main/scala/org/apache/spark/executor/Executor.scala | 2 +-
.../src/test/scala/org/apache/spark/repl/ReplSuite.scala | 11 +++++++++++
2 files changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2f03fc17/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 989806c..255d990 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -288,7 +288,7 @@ private[spark] class Executor(
* created by the interpreter to the search path
*/
private def createClassLoader(): ExecutorURLClassLoader = {
- val loader = this.getClass.getClassLoader
+ val loader = Thread.currentThread().getContextClassLoader
// For each of the jars in the jarSet, add them to the class loader.
// We assume each of the files has already been fetched.
http://git-wip-us.apache.org/repos/asf/spark/blob/2f03fc17/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 8203b8f..4155007 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -242,4 +242,15 @@ class ReplSuite extends FunSuite {
assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output)
}
}
+
+ test("collecting objects of class defined in repl") {
+ val output = runInterpreter("local[2]",
+ """
+ |case class Foo(i: Int)
+ |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains("ret: Array[Foo] = Array(Foo(1),", output)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]