Repository: spark
Updated Branches:
  refs/heads/master 53e48f73e -> edcb878e2


[SPARK-20338][CORE] Spaces in spark.eventLog.dir are not correctly handled

## What changes were proposed in this pull request?

“spark.eventLog.dir” supports with space characters.

1. Update EventLoggingListenerSuite like `testDir = 
Utils.createTempDir(namePrefix = s"history log")`
2. Fix EventLoggingListenerSuite tests

## How was this patch tested?

update unit tests

Author: zuotingbing <[email protected]>

Closes #18285 from zuotingbing/spark-resolveURI.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/edcb878e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/edcb878e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/edcb878e

Branch: refs/heads/master
Commit: edcb878e2fbd0d85bf70614fed37f4cbf0caa95e
Parents: 53e48f7
Author: zuotingbing <[email protected]>
Authored: Fri Jun 16 10:34:52 2017 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Fri Jun 16 10:34:52 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/EventLoggingListener.scala     | 4 ++--
 .../apache/spark/deploy/history/FsHistoryProviderSuite.scala  | 5 ++---
 .../apache/spark/scheduler/EventLoggingListenerSuite.scala    | 7 +++----
 3 files changed, 7 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/edcb878e/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index f481436..35690b2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -96,8 +96,8 @@ private[spark] class EventLoggingListener(
     }
 
     val workingPath = logPath + IN_PROGRESS
-    val uri = new URI(workingPath)
     val path = new Path(workingPath)
+    val uri = path.toUri
     val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
     val isDefaultLocal = defaultFs == null || defaultFs == "file"
 
@@ -320,7 +320,7 @@ private[spark] object EventLoggingListener extends Logging {
       appId: String,
       appAttemptId: Option[String],
       compressionCodecName: Option[String] = None): String = {
-    val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId)
+    val base = new Path(logBaseDir).toString.stripSuffix("/") + "/" + 
sanitize(appId)
     val codec = compressionCodecName.map("." + _).getOrElse("")
     if (appAttemptId.isDefined) {
       base + "_" + sanitize(appAttemptId.get) + codec

http://git-wip-us.apache.org/repos/asf/spark/blob/edcb878e/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 9b3e4ec..7109146 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.deploy.history
 
 import java.io._
-import java.net.URI
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.TimeUnit
 import java.util.zip.{ZipInputStream, ZipOutputStream}
@@ -27,7 +26,7 @@ import scala.concurrent.duration._
 import scala.language.postfixOps
 
 import com.google.common.io.{ByteStreams, Files}
-import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hdfs.DistributedFileSystem
 import org.json4s.jackson.JsonMethods._
 import org.mockito.Matchers.any
@@ -63,7 +62,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
       codec: Option[String] = None): File = {
     val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
     val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId, 
appAttemptId)
-    val logPath = new URI(logUri).getPath + ip
+    val logPath = new Path(logUri).toUri.getPath + ip
     new File(logPath)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/edcb878e/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 4cae6c6..0afd07b 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.scheduler
 
 import java.io.{File, FileOutputStream, InputStream, IOException}
-import java.net.URI
 
 import scala.collection.mutable
 import scala.io.Source
@@ -52,7 +51,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with 
LocalSparkContext wit
   private var testDirPath: Path = _
 
   before {
-    testDir = Utils.createTempDir()
+    testDir = Utils.createTempDir(namePrefix = s"history log")
     testDir.deleteOnExit()
     testDirPath = new Path(testDir.getAbsolutePath())
   }
@@ -111,7 +110,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with 
LocalSparkContext wit
 
   test("Log overwriting") {
     val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test", None)
-    val logPath = new URI(logUri).getPath
+    val logPath = new Path(logUri).toUri.getPath
     // Create file before writing the event log
     new FileOutputStream(new File(logPath)).close()
     // Expected IOException, since we haven't enabled log overwrite.
@@ -293,7 +292,7 @@ object EventLoggingListenerSuite {
     val conf = new SparkConf
     conf.set("spark.eventLog.enabled", "true")
     conf.set("spark.eventLog.testing", "true")
-    conf.set("spark.eventLog.dir", logDir.toUri.toString)
+    conf.set("spark.eventLog.dir", logDir.toString)
     compressionCodec.foreach { codec =>
       conf.set("spark.eventLog.compress", "true")
       conf.set("spark.io.compression.codec", codec)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to