Repository: spark
Updated Branches:
refs/heads/branch-1.3 47cce984e -> 5c16ced1e
[SPARK-6299][CORE] ClassNotFoundException in standalone mode when running
groupByKey with class defined in REPL
```
case class ClassA(value: String)
val rdd = sc.parallelize(List(("k1", ClassA("v1")), ("k1", ClassA("v2")) ))
rdd.groupByKey.collect
```
This code used to be throw exception in spark-shell, because while shuffling
```JavaSerializer```uses ```defaultClassLoader``` which was defined like
```env.serializer.setDefaultClassLoader(urlClassLoader)```.
It should be ```env.serializer.setDefaultClassLoader(replClassLoader)```, like
```
override def run() {
val deserializeStartTime = System.currentTimeMillis()
Thread.currentThread.setContextClassLoader(replClassLoader)
```
in TaskRunner.
When ```replClassLoader``` cannot be defined, it's identical with
```urlClassLoader```
Author: Kevin (Sangwoo) Kim <[email protected]>
Closes #5046 from swkimme/master and squashes the following commits:
fa2b9ee [Kevin (Sangwoo) Kim] stylish test codes ( collect -> collect() )
6e9620b [Kevin (Sangwoo) Kim] stylish test codes ( collect -> collect() )
d23e4e2 [Kevin (Sangwoo) Kim] stylish test codes ( collect -> collect() )
a4a3c8a [Kevin (Sangwoo) Kim] add 'class defined in repl - shuffle' test to
ReplSuite
bd00da5 [Kevin (Sangwoo) Kim] add 'class defined in repl - shuffle' test to
ReplSuite
c1b1fc7 [Kevin (Sangwoo) Kim] use REPL class loader for executor's serializer
(cherry picked from commit f0edeae7f9ab7eae02c227be9162ec69d22c92bd)
Signed-off-by: Reynold Xin <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c16ced1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c16ced1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c16ced1
Branch: refs/heads/branch-1.3
Commit: 5c16ced1e6c2dcadc0179eda8b273071254e285b
Parents: 47cce98
Author: Kevin (Sangwoo) Kim <[email protected]>
Authored: Mon Mar 16 23:49:23 2015 -0700
Committer: Reynold Xin <[email protected]>
Committed: Mon Mar 16 23:49:55 2015 -0700
----------------------------------------------------------------------
.../org/apache/spark/executor/Executor.scala | 2 +-
.../scala/org/apache/spark/repl/ReplSuite.scala | 50 ++++++++++++--------
.../scala/org/apache/spark/repl/ReplSuite.scala | 50 ++++++++++++--------
3 files changed, 63 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/5c16ced1/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index bed0a08..c6ff38d 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -104,7 +104,7 @@ private[spark] class Executor(
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
// Set the classloader for serializer
- env.serializer.setDefaultClassLoader(urlClassLoader)
+ env.serializer.setDefaultClassLoader(replClassLoader)
// Akka's message frame size. If task result is bigger than this, we use the
block manager
// to send the result back.
http://git-wip-us.apache.org/repos/asf/spark/blob/5c16ced1/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git
a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 249f438..934daae 100644
--- a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -121,9 +121,9 @@ class ReplSuite extends FunSuite {
val output = runInterpreter("local",
"""
|var v = 7
- |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
|v = 10
- |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -137,7 +137,7 @@ class ReplSuite extends FunSuite {
|class C {
|def foo = 5
|}
- |sc.parallelize(1 to 10).map(x => (new C).foo).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => (new
C).foo).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -148,7 +148,7 @@ class ReplSuite extends FunSuite {
val output = runInterpreter("local",
"""
|def double(x: Int) = x + x
- |sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => double(x)).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -160,9 +160,9 @@ class ReplSuite extends FunSuite {
"""
|var v = 7
|def getV() = v
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|v = 10
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -178,9 +178,9 @@ class ReplSuite extends FunSuite {
"""
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
|array(0) = 5
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -216,14 +216,14 @@ class ReplSuite extends FunSuite {
"""
|var v = 7
|def getV() = v
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|v = 10
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
|array(0) = 5
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -262,7 +262,7 @@ class ReplSuite extends FunSuite {
|val sqlContext = new org.apache.spark.sql.SQLContext(sc)
|import sqlContext.implicits._
|case class TestCaseClass(value: Int)
- |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF.collect()
+ |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF().collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -275,7 +275,7 @@ class ReplSuite extends FunSuite {
|val t = new TestClass
|import t.testMethod
|case class TestCaseClass(value: Int)
- |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect
+ |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -287,14 +287,14 @@ class ReplSuite extends FunSuite {
"""
|var v = 7
|def getV() = v
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|v = 10
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
|array(0) = 5
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -309,10 +309,22 @@ class ReplSuite extends FunSuite {
val output = runInterpreter("local[2]",
"""
|case class Foo(i: Int)
- |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect
+ |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("ret: Array[Foo] = Array(Foo(1),", output)
}
+
+ test("collecting objects of class defined in repl - shuffling") {
+ val output = runInterpreter("local-cluster[1,1,512]",
+ """
+ |case class Foo(i: Int)
+ |val list = List((1, Foo(1)), (1, Foo(2)))
+ |val ret = sc.parallelize(list).groupByKey().collect()
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains("ret: Array[(Int, Iterable[Foo])] = Array((1,", output)
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/5c16ced1/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git
a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index b3bd135..fbef5b2 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -128,9 +128,9 @@ class ReplSuite extends FunSuite {
val output = runInterpreter("local",
"""
|var v = 7
- |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
|v = 10
- |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -144,7 +144,7 @@ class ReplSuite extends FunSuite {
|class C {
|def foo = 5
|}
- |sc.parallelize(1 to 10).map(x => (new C).foo).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => (new
C).foo).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -155,7 +155,7 @@ class ReplSuite extends FunSuite {
val output = runInterpreter("local",
"""
|def double(x: Int) = x + x
- |sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => double(x)).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -167,9 +167,9 @@ class ReplSuite extends FunSuite {
"""
|var v = 7
|def getV() = v
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|v = 10
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -185,9 +185,9 @@ class ReplSuite extends FunSuite {
"""
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
|array(0) = 5
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -224,14 +224,14 @@ class ReplSuite extends FunSuite {
"""
|var v = 7
|def getV() = v
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|v = 10
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
|array(0) = 5
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -270,7 +270,7 @@ class ReplSuite extends FunSuite {
|val sqlContext = new org.apache.spark.sql.SQLContext(sc)
|import sqlContext.implicits._
|case class TestCaseClass(value: Int)
- |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF.collect
+ |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF().collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -283,7 +283,7 @@ class ReplSuite extends FunSuite {
|val t = new TestClass
|import t.testMethod
|case class TestCaseClass(value: Int)
- |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect
+ |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -295,14 +295,14 @@ class ReplSuite extends FunSuite {
"""
|var v = 7
|def getV() = v
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|v = 10
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
|array(0) = 5
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -317,10 +317,22 @@ class ReplSuite extends FunSuite {
val output = runInterpreter("local[2]",
"""
|case class Foo(i: Int)
- |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect
+ |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("ret: Array[Foo] = Array(Foo(1),", output)
}
+
+ test("collecting objects of class defined in repl - shuffling") {
+ val output = runInterpreter("local-cluster[1,1,512]",
+ """
+ |case class Foo(i: Int)
+ |val list = List((1, Foo(1)), (1, Foo(2)))
+ |val ret = sc.parallelize(list).groupByKey().collect()
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains("ret: Array[(Int, Iterable[Foo])] = Array((1,", output)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]