Repository: spark
Updated Branches:
  refs/heads/branch-1.6 41ad8aced -> 1fbca4120


[SPARK-12220][CORE] Make Utils.fetchFile support files that contain special 
characters

This PR encodes and decodes the file name to fix the issue.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10208 from zsxwing/uri.

(cherry picked from commit 86e405f357711ae93935853a912bc13985c259db)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1fbca412
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1fbca412
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1fbca412

Branch: refs/heads/branch-1.6
Commit: 1fbca41200d6e73cb276d5949b894881c700323f
Parents: 41ad8ac
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Thu Dec 17 09:55:37 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Dec 17 09:55:46 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/HttpFileServer.scala |  6 ++---
 .../spark/rpc/netty/NettyStreamManager.scala    |  5 ++--
 .../scala/org/apache/spark/util/Utils.scala     | 26 +++++++++++++++++++-
 .../org/apache/spark/rpc/RpcEnvSuite.scala      |  4 +++
 .../org/apache/spark/util/UtilsSuite.scala      | 11 +++++++++
 5 files changed, 46 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1fbca412/core/src/main/scala/org/apache/spark/HttpFileServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala 
b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index 7cf7bc0..ee9bfea 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -63,12 +63,12 @@ private[spark] class HttpFileServer(
 
   def addFile(file: File) : String = {
     addFileToDir(file, fileDir)
-    serverUri + "/files/" + file.getName
+    serverUri + "/files/" + Utils.encodeFileNameToURIRawPath(file.getName)
   }
 
   def addJar(file: File) : String = {
     addFileToDir(file, jarDir)
-    serverUri + "/jars/" + file.getName
+    serverUri + "/jars/" + Utils.encodeFileNameToURIRawPath(file.getName)
   }
 
   def addFileToDir(file: File, dir: File) : String = {
@@ -80,7 +80,7 @@ private[spark] class HttpFileServer(
       throw new IllegalArgumentException(s"$file cannot be a directory.")
     }
     Files.copy(file, new File(dir, file.getName))
-    dir + "/" + file.getName
+    dir + "/" + Utils.encodeFileNameToURIRawPath(file.getName)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1fbca412/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
index a2768b4..5343482 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer}
 import org.apache.spark.network.server.StreamManager
 import org.apache.spark.rpc.RpcEnvFileServer
+import org.apache.spark.util.Utils
 
 /**
  * StreamManager implementation for serving files from a NettyRpcEnv.
@@ -51,13 +52,13 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
   override def addFile(file: File): String = {
     require(files.putIfAbsent(file.getName(), file) == null,
       s"File ${file.getName()} already registered.")
-    s"${rpcEnv.address.toSparkURL}/files/${file.getName()}"
+    
s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}"
   }
 
   override def addJar(file: File): String = {
     require(jars.putIfAbsent(file.getName(), file) == null,
       s"JAR ${file.getName()} already registered.")
-    s"${rpcEnv.address.toSparkURL}/jars/${file.getName()}"
+    
s"${rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(file.getName())}"
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1fbca412/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index af63234..6cb52fb 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -318,6 +318,30 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * A file name may contain some invalid URI characters, such as " ". This 
method will convert the
+   * file name to a raw path accepted by `java.net.URI(String)`.
+   *
+   * Note: the file name must not contain "/" or "\"
+   */
+  def encodeFileNameToURIRawPath(fileName: String): String = {
+    require(!fileName.contains("/") && !fileName.contains("\\"))
+    // `file` and `localhost` are not used. Just to prevent URI from parsing 
`fileName` as
+    // scheme or host. The prefix "/" is required because URI doesn't accept a 
relative path.
+    // We should remove it after we get the raw path.
+    new URI("file", null, "localhost", -1, "/" + fileName, null, 
null).getRawPath.substring(1)
+  }
+
+  /**
+   * Get the file name from uri's raw path and decode it. If the raw path of 
uri ends with "/",
+   * return the name before the last "/".
+   */
+  def decodeFileNameInURI(uri: URI): String = {
+    val rawPath = uri.getRawPath
+    val rawFileName = rawPath.split("/").last
+    new URI("file:///" + rawFileName).getPath.substring(1)
+  }
+
+    /**
    * Download a file or directory to target directory. Supports fetching the 
file in a variety of
    * ways, including HTTP, Hadoop-compatible filesystems, and files on a 
standard filesystem, based
    * on the URL parameter. Fetching directories is only supported from 
Hadoop-compatible
@@ -338,7 +362,7 @@ private[spark] object Utils extends Logging {
       hadoopConf: Configuration,
       timestamp: Long,
       useCache: Boolean) {
-    val fileName = url.split("/").last
+    val fileName = decodeFileNameInURI(new URI(url))
     val targetFile = new File(targetDir, fileName)
     val fetchCacheEnabled = conf.getBoolean("spark.files.useFetchCache", 
defaultValue = true)
     if (useCache && fetchCacheEnabled) {

http://git-wip-us.apache.org/repos/asf/spark/blob/1fbca412/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala 
b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 2bb4ac1..e4ae7fb 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -771,12 +771,15 @@ abstract class RpcEnvSuite extends SparkFunSuite with 
BeforeAndAfterAll {
     val tempDir = Utils.createTempDir()
     val file = new File(tempDir, "file")
     Files.write(UUID.randomUUID().toString(), file, UTF_8)
+    val fileWithSpecialChars = new File(tempDir, "file name")
+    Files.write(UUID.randomUUID().toString(), fileWithSpecialChars, UTF_8)
     val empty = new File(tempDir, "empty")
     Files.write("", empty, UTF_8);
     val jar = new File(tempDir, "jar")
     Files.write(UUID.randomUUID().toString(), jar, UTF_8)
 
     val fileUri = env.fileServer.addFile(file)
+    val fileWithSpecialCharsUri = env.fileServer.addFile(fileWithSpecialChars)
     val emptyUri = env.fileServer.addFile(empty)
     val jarUri = env.fileServer.addJar(jar)
 
@@ -786,6 +789,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
     val files = Seq(
       (file, fileUri),
+      (fileWithSpecialChars, fileWithSpecialCharsUri),
       (empty, emptyUri),
       (jar, jarUri))
     files.foreach { case (f, uri) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/1fbca412/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 68b0da7..fdb51d4 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -734,4 +734,15 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
       conf.set("spark.executor.instances", "0")) === true)
   }
 
+  test("encodeFileNameToURIRawPath") {
+    assert(Utils.encodeFileNameToURIRawPath("abc") === "abc")
+    assert(Utils.encodeFileNameToURIRawPath("abc xyz") === "abc%20xyz")
+    assert(Utils.encodeFileNameToURIRawPath("abc:xyz") === "abc:xyz")
+  }
+
+  test("decodeFileNameInURI") {
+    assert(Utils.decodeFileNameInURI(new URI("files:///abc/xyz")) === "xyz")
+    assert(Utils.decodeFileNameInURI(new URI("files:///abc")) === "abc")
+    assert(Utils.decodeFileNameInURI(new URI("files:///abc%20xyz")) === "abc 
xyz")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to