Repository: spark
Updated Branches:
  refs/heads/master 9667b9f9c -> f0edeae7f


[SPARK-6299][CORE] ClassNotFoundException in standalone mode when running 
groupByKey with class defined in REPL

```
case class ClassA(value: String)
val rdd = sc.parallelize(List(("k1", ClassA("v1")), ("k1", ClassA("v2")) ))
rdd.groupByKey.collect
```
This code used to be throw exception in spark-shell, because while shuffling 
```JavaSerializer```uses ```defaultClassLoader``` which was defined like 
```env.serializer.setDefaultClassLoader(urlClassLoader)```.

It should be ```env.serializer.setDefaultClassLoader(replClassLoader)```, like
```
    override def run() {
      val deserializeStartTime = System.currentTimeMillis()
      Thread.currentThread.setContextClassLoader(replClassLoader)
```
in TaskRunner.

When ```replClassLoader``` cannot be defined, it's identical with 
```urlClassLoader```

Author: Kevin (Sangwoo) Kim <sangwookim...@gmail.com>

Closes #5046 from swkimme/master and squashes the following commits:

fa2b9ee [Kevin (Sangwoo) Kim] stylish test codes ( collect -> collect() )
6e9620b [Kevin (Sangwoo) Kim] stylish test codes ( collect -> collect() )
d23e4e2 [Kevin (Sangwoo) Kim] stylish test codes ( collect -> collect() )
a4a3c8a [Kevin (Sangwoo) Kim] add 'class defined in repl - shuffle' test to 
ReplSuite
bd00da5 [Kevin (Sangwoo) Kim] add 'class defined in repl - shuffle' test to 
ReplSuite
c1b1fc7 [Kevin (Sangwoo) Kim] use REPL class loader for executor's serializer


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0edeae7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0edeae7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0edeae7

Branch: refs/heads/master
Commit: f0edeae7f9ab7eae02c227be9162ec69d22c92bd
Parents: 9667b9f
Author: Kevin (Sangwoo) Kim <sangwookim...@gmail.com>
Authored: Mon Mar 16 23:49:23 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Mon Mar 16 23:49:23 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/executor/Executor.scala    |  2 +-
 .../scala/org/apache/spark/repl/ReplSuite.scala | 50 ++++++++++++--------
 .../scala/org/apache/spark/repl/ReplSuite.scala | 50 ++++++++++++--------
 3 files changed, 63 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f0edeae7/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a897e53..6196f7b 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -103,7 +103,7 @@ private[spark] class Executor(
   private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
 
   // Set the classloader for serializer
-  env.serializer.setDefaultClassLoader(urlClassLoader)
+  env.serializer.setDefaultClassLoader(replClassLoader)
 
   // Akka's message frame size. If task result is bigger than this, we use the 
block manager
   // to send the result back.

http://git-wip-us.apache.org/repos/asf/spark/blob/f0edeae7/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala 
b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 249f438..934daae 100644
--- a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -121,9 +121,9 @@ class ReplSuite extends FunSuite {
     val output = runInterpreter("local",
       """
         |var v = 7
-        |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+        |sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
         |v = 10
-        |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+        |sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
@@ -137,7 +137,7 @@ class ReplSuite extends FunSuite {
         |class C {
         |def foo = 5
         |}
-        |sc.parallelize(1 to 10).map(x => (new C).foo).collect.reduceLeft(_+_)
+        |sc.parallelize(1 to 10).map(x => (new 
C).foo).collect().reduceLeft(_+_)
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
@@ -148,7 +148,7 @@ class ReplSuite extends FunSuite {
     val output = runInterpreter("local",
       """
         |def double(x: Int) = x + x
-        |sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_)
+        |sc.parallelize(1 to 10).map(x => double(x)).collect().reduceLeft(_+_)
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
@@ -160,9 +160,9 @@ class ReplSuite extends FunSuite {
       """
         |var v = 7
         |def getV() = v
-        |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+        |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
         |v = 10
-        |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+        |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
@@ -178,9 +178,9 @@ class ReplSuite extends FunSuite {
       """
         |var array = new Array[Int](5)
         |val broadcastArray = sc.broadcast(array)
-        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
         |array(0) = 5
-        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
@@ -216,14 +216,14 @@ class ReplSuite extends FunSuite {
       """
         |var v = 7
         |def getV() = v
-        |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+        |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
         |v = 10
-        |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+        |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
         |var array = new Array[Int](5)
         |val broadcastArray = sc.broadcast(array)
-        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
         |array(0) = 5
-        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
@@ -262,7 +262,7 @@ class ReplSuite extends FunSuite {
         |val sqlContext = new org.apache.spark.sql.SQLContext(sc)
         |import sqlContext.implicits._
         |case class TestCaseClass(value: Int)
-        |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF.collect()
+        |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF().collect()
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
@@ -275,7 +275,7 @@ class ReplSuite extends FunSuite {
       |val t = new TestClass
       |import t.testMethod
       |case class TestCaseClass(value: Int)
-      |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect
+      |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect()
     """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
@@ -287,14 +287,14 @@ class ReplSuite extends FunSuite {
         """
           |var v = 7
           |def getV() = v
-          |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+          |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
           |v = 10
-          |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+          |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
           |var array = new Array[Int](5)
           |val broadcastArray = sc.broadcast(array)
-          |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+          |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
           |array(0) = 5
-          |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+          |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
         """.stripMargin)
       assertDoesNotContain("error:", output)
       assertDoesNotContain("Exception", output)
@@ -309,10 +309,22 @@ class ReplSuite extends FunSuite {
     val output = runInterpreter("local[2]",
       """
         |case class Foo(i: Int)
-        |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect
+        |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect()
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
     assertContains("ret: Array[Foo] = Array(Foo(1),", output)
   }
+  
+  test("collecting objects of class defined in repl - shuffling") {
+    val output = runInterpreter("local-cluster[1,1,512]",
+      """
+        |case class Foo(i: Int)
+        |val list = List((1, Foo(1)), (1, Foo(2)))
+        |val ret = sc.parallelize(list).groupByKey().collect()
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("ret: Array[(Int, Iterable[Foo])] = Array((1,", output)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f0edeae7/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala 
b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index b3bd135..fbef5b2 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -128,9 +128,9 @@ class ReplSuite extends FunSuite {
     val output = runInterpreter("local",
       """
         |var v = 7
-        |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+        |sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
         |v = 10
-        |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+        |sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
@@ -144,7 +144,7 @@ class ReplSuite extends FunSuite {
         |class C {
         |def foo = 5
         |}
-        |sc.parallelize(1 to 10).map(x => (new C).foo).collect.reduceLeft(_+_)
+        |sc.parallelize(1 to 10).map(x => (new 
C).foo).collect().reduceLeft(_+_)
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
@@ -155,7 +155,7 @@ class ReplSuite extends FunSuite {
     val output = runInterpreter("local",
       """
         |def double(x: Int) = x + x
-        |sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_)
+        |sc.parallelize(1 to 10).map(x => double(x)).collect().reduceLeft(_+_)
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
@@ -167,9 +167,9 @@ class ReplSuite extends FunSuite {
       """
         |var v = 7
         |def getV() = v
-        |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+        |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
         |v = 10
-        |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+        |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
@@ -185,9 +185,9 @@ class ReplSuite extends FunSuite {
       """
         |var array = new Array[Int](5)
         |val broadcastArray = sc.broadcast(array)
-        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
         |array(0) = 5
-        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
@@ -224,14 +224,14 @@ class ReplSuite extends FunSuite {
       """
         |var v = 7
         |def getV() = v
-        |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+        |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
         |v = 10
-        |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+        |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
         |var array = new Array[Int](5)
         |val broadcastArray = sc.broadcast(array)
-        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
         |array(0) = 5
-        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
@@ -270,7 +270,7 @@ class ReplSuite extends FunSuite {
         |val sqlContext = new org.apache.spark.sql.SQLContext(sc)
         |import sqlContext.implicits._
         |case class TestCaseClass(value: Int)
-        |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF.collect
+        |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF().collect()
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
@@ -283,7 +283,7 @@ class ReplSuite extends FunSuite {
       |val t = new TestClass
       |import t.testMethod
       |case class TestCaseClass(value: Int)
-      |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect
+      |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect()
     """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
@@ -295,14 +295,14 @@ class ReplSuite extends FunSuite {
         """
           |var v = 7
           |def getV() = v
-          |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+          |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
           |v = 10
-          |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+          |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
           |var array = new Array[Int](5)
           |val broadcastArray = sc.broadcast(array)
-          |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+          |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
           |array(0) = 5
-          |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+          |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
         """.stripMargin)
       assertDoesNotContain("error:", output)
       assertDoesNotContain("Exception", output)
@@ -317,10 +317,22 @@ class ReplSuite extends FunSuite {
     val output = runInterpreter("local[2]",
       """
         |case class Foo(i: Int)
-        |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect
+        |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect()
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
     assertContains("ret: Array[Foo] = Array(Foo(1),", output)
   }
+  
+  test("collecting objects of class defined in repl - shuffling") {
+    val output = runInterpreter("local-cluster[1,1,512]",
+      """
+        |case class Foo(i: Int)
+        |val list = List((1, Foo(1)), (1, Foo(2)))
+        |val ret = sc.parallelize(list).groupByKey().collect()
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("ret: Array[(Int, Iterable[Foo])] = Array((1,", output)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to