Repository: spark
Updated Branches:
  refs/heads/branch-1.5 4b8dc2556 -> f802b07ab


[SPARK-11195][CORE] Use correct classloader for TaskResultGetter

Make sure we are using the context classloader when deserializing failed 
TaskResults instead of the Spark classloader.

The issue is that `enqueueFailedTask` was using the incorrect classloader which 
results in `ClassNotFoundException`.

Adds a test in TaskResultGetterSuite that compiles a custom exception, throws 
it on the executor, and asserts that Spark handles the TaskResult 
deserialization instead of returning `UnknownReason`.

See #9367 for previous comments
See SPARK-11195 for a full repro

Author: Hurshal Patel <[email protected]>

Closes #9779 from choochootrain/spark-11195-master.

(cherry picked from commit 3cca5ffb3d60d5de9a54bc71cf0b8279898936d2)
Signed-off-by: Yin Huai <[email protected]>

Conflicts:
        core/src/main/scala/org/apache/spark/TestUtils.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f802b07a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f802b07a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f802b07a

Branch: refs/heads/branch-1.5
Commit: f802b07ab36756c8b4976fadecf66169dd1d7d68
Parents: 4b8dc25
Author: Hurshal Patel <[email protected]>
Authored: Wed Nov 18 09:28:59 2015 -0800
Committer: Yin Huai <[email protected]>
Committed: Wed Nov 18 09:36:33 2015 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/TestUtils.scala | 11 ++--
 .../spark/scheduler/TaskResultGetter.scala      |  4 +-
 .../spark/scheduler/TaskResultGetterSuite.scala | 65 +++++++++++++++++++-
 3 files changed, 72 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f802b07a/core/src/main/scala/org/apache/spark/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index a1ebbec..9ffdc89 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -19,6 +19,7 @@ package org.apache.spark
 
 import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
 import java.net.{URI, URL}
+import java.nio.file.Paths
 import java.util.jar.{JarEntry, JarOutputStream}
 
 import scala.collection.JavaConversions._
@@ -78,15 +79,15 @@ private[spark] object TestUtils {
   }
 
   /**
-   * Create a jar file that contains this set of files. All files will be 
located at the root
-   * of the jar.
+   * Create a jar file that contains this set of files. All files will be 
located in the specified
+   * directory or at the root of the jar.
    */
-  def createJar(files: Seq[File], jarFile: File): URL = {
+  def createJar(files: Seq[File], jarFile: File, directoryPrefix: 
Option[String] = None): URL = {
     val jarFileStream = new FileOutputStream(jarFile)
     val jarStream = new JarOutputStream(jarFileStream, new 
java.util.jar.Manifest())
 
     for (file <- files) {
-      val jarEntry = new JarEntry(file.getName)
+      val jarEntry = new JarEntry(Paths.get(directoryPrefix.getOrElse(""), 
file.getName).toString)
       jarStream.putNextEntry(jarEntry)
 
       val in = new FileInputStream(file)
@@ -118,7 +119,7 @@ private[spark] object TestUtils {
       classpathUrls: Seq[URL]): File = {
     val compiler = ToolProvider.getSystemJavaCompiler
 
-    // Calling this outputs a class file in pwd. It's easier to just rename 
the file than
+    // Calling this outputs a class file in pwd. It's easier to just rename 
the files than
     // build a custom FileManager that controls the output location.
     val options = if (classpathUrls.nonEmpty) {
       Seq("-classpath", classpathUrls.map { _.getFile 
}.mkString(File.pathSeparator))

http://git-wip-us.apache.org/repos/asf/spark/blob/f802b07a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 46a6f65..f496599 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -103,16 +103,16 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, 
scheduler: TaskSchedul
     try {
       getTaskResultExecutor.execute(new Runnable {
         override def run(): Unit = Utils.logUncaughtExceptions {
+          val loader = Utils.getContextOrSparkClassLoader
           try {
             if (serializedData != null && serializedData.limit() > 0) {
               reason = serializer.get().deserialize[TaskEndReason](
-                serializedData, Utils.getSparkClassLoader)
+                serializedData, loader)
             }
           } catch {
             case cnd: ClassNotFoundException =>
               // Log an error but keep going here -- the task failed, so not 
catastrophic
               // if we can't deserialize the reason.
-              val loader = Utils.getContextOrSparkClassLoader
               logError(
                 "Could not deserialize TaskEndReason: ClassNotFound with 
classloader " + loader)
             case ex: Exception => {}

http://git-wip-us.apache.org/repos/asf/spark/blob/f802b07a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index 815caa7..bc72c36 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.scheduler
 
+import java.io.File
+import java.net.URL
 import java.nio.ByteBuffer
 
 import scala.concurrent.duration._
@@ -26,8 +28,10 @@ import scala.util.control.NonFatal
 import org.scalatest.BeforeAndAfter
 import org.scalatest.concurrent.Eventually._
 
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, 
SparkFunSuite}
+import org.apache.spark._
 import org.apache.spark.storage.TaskResultBlockId
+import org.apache.spark.TestUtils.JavaSourceFromString
+import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
 /**
  * Removes the TaskResult from the BlockManager before delegating to a normal 
TaskResultGetter.
@@ -119,5 +123,64 @@ class TaskResultGetterSuite extends SparkFunSuite with 
BeforeAndAfter with Local
     // Make sure two tasks were run (one failed one, and a second retried one).
     assert(scheduler.nextTaskId.get() === 2)
   }
+
+  /**
+   * Make sure we are using the context classloader when deserializing failed 
TaskResults instead
+   * of the Spark classloader.
+
+   * This test compiles a jar containing an exception and tests that when it 
is thrown on the
+   * executor, enqueueFailedTask can correctly deserialize the failure and 
identify the thrown
+   * exception as the cause.
+
+   * Before this fix, enqueueFailedTask would throw a ClassNotFoundException 
when deserializing
+   * the exception, resulting in an UnknownReason for the TaskEndResult.
+   */
+  test("failed task deserialized with the correct classloader (SPARK-11195)") {
+    // compile a small jar containing an exception that will be thrown on an 
executor.
+    val tempDir = Utils.createTempDir()
+    val srcDir = new File(tempDir, "repro/")
+    srcDir.mkdirs()
+    val excSource = new JavaSourceFromString(new File(srcDir, 
"MyException").getAbsolutePath,
+      """package repro;
+        |
+        |public class MyException extends Exception {
+        |}
+      """.stripMargin)
+    val excFile = TestUtils.createCompiledClass("MyException", srcDir, 
excSource, Seq.empty)
+    val jarFile = new File(tempDir, 
"testJar-%s.jar".format(System.currentTimeMillis()))
+    TestUtils.createJar(Seq(excFile), jarFile, directoryPrefix = Some("repro"))
+
+    // ensure we reset the classloader after the test completes
+    val originalClassLoader = Thread.currentThread.getContextClassLoader
+    try {
+      // load the exception from the jar
+      val loader = new MutableURLClassLoader(new Array[URL](0), 
originalClassLoader)
+      loader.addURL(jarFile.toURI.toURL)
+      Thread.currentThread().setContextClassLoader(loader)
+      val excClass: Class[_] = Utils.classForName("repro.MyException")
+
+      // NOTE: we must run the cluster with "local" so that the executor can 
load the compiled
+      // jar.
+      sc = new SparkContext("local", "test", conf)
+      val rdd = sc.parallelize(Seq(1), 1).map { _ =>
+        val exc = excClass.newInstance().asInstanceOf[Exception]
+        throw exc
+      }
+
+      // the driver should not have any problems resolving the exception class 
and determining
+      // why the task failed.
+      val exceptionMessage = intercept[SparkException] {
+        rdd.collect()
+      }.getMessage
+
+      val expectedFailure = """(?s).*Lost task.*: repro.MyException.*""".r
+      val unknownFailure = """(?s).*Lost task.*: UnknownReason.*""".r
+
+      assert(expectedFailure.findFirstMatchIn(exceptionMessage).isDefined)
+      assert(unknownFailure.findFirstMatchIn(exceptionMessage).isEmpty)
+    } finally {
+      Thread.currentThread.setContextClassLoader(originalClassLoader)
+    }
+  }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to