Repository: spark
Updated Branches:
  refs/heads/master a9beeaaae -> e9fc0b6a8


[SPARK-16787] SparkContext.addFile() should not throw if called twice with the 
same file

## What changes were proposed in this pull request?

The behavior of `SparkContext.addFile()` changed slightly with the introduction 
of the Netty-RPC-based file server, which was introduced in Spark 1.6 (where it 
was disabled by default) and became the default / only file server in Spark 
2.0.0.

Prior to 2.0, calling `SparkContext.addFile()` with files that have the same 
name and identical contents would succeed. This behavior was never explicitly 
documented but Spark has behaved this way since very early 1.x versions.

In 2.0 (or 1.6 with the Netty file server enabled), the second `addFile()` call 
will fail with a requirement error because NettyStreamManager tries to guard 
against duplicate file registration.

This problem also affects `addJar()` in a more subtle way: the 
`fileServer.addJar()` call will also fail with an exception but that exception 
is logged and ignored; I believe that the problematic exception-catching path 
was mistakenly copied from some old code which was only relevant to very old 
versions of Spark and YARN mode.

I believe that this change of behavior was unintentional, so this patch weakens 
the `require` check so that adding the same filename at the same path will 
succeed.

At file download time, Spark tasks will fail with exceptions if an executor 
already has a local copy of a file and that file's contents do not match the 
contents of the file being downloaded / added. As a result, it's important that 
we prevent files with the same name and different contents from being served 
because allowing that can effectively brick an executor by preventing it from 
successfully launching any new tasks. Before this patch's change, this was 
prevented by forbidding `addFile()` from being called twice on files with the 
same name. Because Spark does not defensively copy local files that are passed 
to `addFile` it is vulnerable to files' contents changing, so I think it's okay 
to rely on an implicit assumption that these files are intended to be immutable 
(since if they _are_ mutable then this can lead to either explicit task 
failures or implicit incorrectness (in case new executors silently get newer 
copies of the file while old executors continue to use an older versi
 on)). To guard against this, I have decided to only update the file addition 
timestamps on the first call to `addFile()`; duplicate calls will succeed but 
will not update the timestamp. This behavior is fine as long as we assume files 
are immutable, which seems reasonable given the behaviors described above.

As part of this change, I also improved the thread-safety of the `addedJars` 
and `addedFiles` maps; this is important because these maps may be concurrently 
read by a task launching thread and written by a driver thread in case the 
user's driver code is multi-threaded.

## How was this patch tested?

I added regression tests in `SparkContextSuite`.

Author: Josh Rosen <joshro...@databricks.com>

Closes #14396 from JoshRosen/SPARK-16787.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e9fc0b6a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e9fc0b6a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e9fc0b6a

Branch: refs/heads/master
Commit: e9fc0b6a8b4ce62cab56d18581f588c67b811f5b
Parents: a9beeaa
Author: Josh Rosen <joshro...@databricks.com>
Authored: Tue Aug 2 12:02:11 2016 -0700
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Tue Aug 2 12:02:11 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 36 ++++++--------
 .../spark/rpc/netty/NettyStreamManager.scala    | 12 +++--
 .../scala/org/apache/spark/scheduler/Task.scala |  5 +-
 .../org/apache/spark/SparkContextSuite.scala    | 51 ++++++++++++++++++++
 4 files changed, 78 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e9fc0b6a/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d48e2b4..48126c2 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.lang.reflect.Constructor
 import java.net.URI
 import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
-import java.util.concurrent.ConcurrentMap
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, 
AtomicReference}
 
 import scala.collection.JavaConverters._
@@ -262,8 +262,8 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
   private[spark] def env: SparkEnv = _env
 
   // Used to store a URL for each static file/jar together with the file's 
local timestamp
-  private[spark] val addedFiles = HashMap[String, Long]()
-  private[spark] val addedJars = HashMap[String, Long]()
+  private[spark] val addedFiles = new ConcurrentHashMap[String, Long]().asScala
+  private[spark] val addedJars = new ConcurrentHashMap[String, Long]().asScala
 
   // Keeps track of all persisted RDDs
   private[spark] val persistentRdds = {
@@ -1430,14 +1430,14 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
       schemeCorrectedPath
     }
     val timestamp = System.currentTimeMillis
-    addedFiles(key) = timestamp
-
-    // Fetch the file locally in case a job is executed using 
DAGScheduler.runLocally().
-    Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, 
env.securityManager,
-      hadoopConfiguration, timestamp, useCache = false)
-
-    logInfo("Added file " + path + " at " + key + " with timestamp " + 
addedFiles(key))
-    postEnvironmentUpdate()
+    if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
+      logInfo(s"Added file $path at $key with timestamp $timestamp")
+      // Fetch the file locally so that closures which are run on the driver 
can still use the
+      // SparkFiles API to access files.
+      Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, 
env.securityManager,
+        hadoopConfiguration, timestamp, useCache = false)
+      postEnvironmentUpdate()
+    }
   }
 
   /**
@@ -1705,12 +1705,6 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
                 case exc: FileNotFoundException =>
                   logError(s"Jar not found at $path")
                   null
-                case e: Exception =>
-                  // For now just log an error but allow to go through so 
spark examples work.
-                  // The spark examples don't really need the jar distributed 
since its also
-                  // the app jar.
-                  logError("Error adding jar (" + e + "), was the --addJars 
option used?")
-                  null
               }
             }
           // A JAR file which exists locally on every worker node
@@ -1721,11 +1715,13 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
         }
       }
       if (key != null) {
-        addedJars(key) = System.currentTimeMillis
-        logInfo("Added JAR " + path + " at " + key + " with timestamp " + 
addedJars(key))
+        val timestamp = System.currentTimeMillis
+        if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
+          logInfo(s"Added JAR $path at $key with timestamp $timestamp")
+          postEnvironmentUpdate()
+        }
       }
     }
-    postEnvironmentUpdate()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e9fc0b6a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
index afcb023..780fadd 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
@@ -66,14 +66,18 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
   }
 
   override def addFile(file: File): String = {
-    require(files.putIfAbsent(file.getName(), file) == null,
-      s"File ${file.getName()} already registered.")
+    val existingPath = files.putIfAbsent(file.getName, file)
+    require(existingPath == null || existingPath == file,
+      s"File ${file.getName} was already registered with a different path " +
+        s"(old path = $existingPath, new path = $file")
     
s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}"
   }
 
   override def addJar(file: File): String = {
-    require(jars.putIfAbsent(file.getName(), file) == null,
-      s"JAR ${file.getName()} already registered.")
+    val existingPath = jars.putIfAbsent(file.getName, file)
+    require(existingPath == null || existingPath == file,
+      s"File ${file.getName} was already registered with a different path " +
+        s"(old path = $existingPath, new path = $file")
     
s"${rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(file.getName())}"
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e9fc0b6a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 15f863b..35c4daf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -21,6 +21,7 @@ import java.io.{DataInputStream, DataOutputStream}
 import java.nio.ByteBuffer
 import java.util.Properties
 
+import scala.collection.mutable
 import scala.collection.mutable.HashMap
 
 import org.apache.spark._
@@ -198,8 +199,8 @@ private[spark] object Task {
    */
   def serializeWithDependencies(
       task: Task[_],
-      currentFiles: HashMap[String, Long],
-      currentJars: HashMap[String, Long],
+      currentFiles: mutable.Map[String, Long],
+      currentJars: mutable.Map[String, Long],
       serializer: SerializerInstance)
     : ByteBuffer = {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e9fc0b6a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 4fa3cab..f8d143d 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -216,6 +216,57 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
+  test("cannot call addFile with different paths that have the same filename") 
{
+    val dir = Utils.createTempDir()
+    try {
+      val subdir1 = new File(dir, "subdir1")
+      val subdir2 = new File(dir, "subdir2")
+      assert(subdir1.mkdir())
+      assert(subdir2.mkdir())
+      val file1 = new File(subdir1, "file")
+      val file2 = new File(subdir2, "file")
+      Files.write("old", file1, StandardCharsets.UTF_8)
+      Files.write("new", file2, StandardCharsets.UTF_8)
+      sc = new SparkContext("local-cluster[1,1,1024]", "test")
+      sc.addFile(file1.getAbsolutePath)
+      def getAddedFileContents(): String = {
+        sc.parallelize(Seq(0)).map { _ =>
+          scala.io.Source.fromFile(SparkFiles.get("file")).mkString
+        }.first()
+      }
+      assert(getAddedFileContents() === "old")
+      intercept[IllegalArgumentException] {
+        sc.addFile(file2.getAbsolutePath)
+      }
+      assert(getAddedFileContents() === "old")
+    } finally {
+      Utils.deleteRecursively(dir)
+    }
+  }
+
+  // Regression tests for SPARK-16787
+  for (
+    schedulingMode <- Seq("local-mode", "non-local-mode");
+    method <- Seq("addJar", "addFile")
+  ) {
+    val jarPath = 
Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar").toString
+    val master = schedulingMode match {
+      case "local-mode" => "local"
+      case "non-local-mode" => "local-cluster[1,1,1024]"
+    }
+    test(s"$method can be called twice with same file in $schedulingMode 
(SPARK-16787)") {
+      sc = new SparkContext(master, "test")
+      method match {
+        case "addJar" =>
+          sc.addJar(jarPath)
+          sc.addJar(jarPath)
+        case "addFile" =>
+          sc.addFile(jarPath)
+          sc.addFile(jarPath)
+      }
+    }
+  }
+
   test("Cancelling job group should not cause SparkContext to shutdown 
(SPARK-6414)") {
     try {
       sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to