Repository: spark
Updated Branches:
  refs/heads/branch-1.0 d365fbf94 -> 8100cbdb7


SPARK-1677: allow user to disable output dir existence checking

https://issues.apache.org/jira/browse/SPARK-1677

For compatibility with older versions of Spark it would be nice to have an 
option `spark.hadoop.validateOutputSpecs` (default true)  for the user to 
disable the output directory existence checking

Author: CodingCat <[email protected]>

Closes #947 from CodingCat/SPARK-1677 and squashes the following commits:

7930f83 [CodingCat] miao
c0c0e03 [CodingCat] bug fix and doc update
5318562 [CodingCat] bug fix
13219b5 [CodingCat] allow user to disable output dir existence checking
(cherry picked from commit 89cdbb087cb2f0d03be2dd77440300c6bd61c792)

Signed-off-by: Patrick Wendell <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8100cbdb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8100cbdb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8100cbdb

Branch: refs/heads/branch-1.0
Commit: 8100cbdb7546e8438019443cfc00683017c81278
Parents: d365fbf
Author: CodingCat <[email protected]>
Authored: Thu Jun 5 11:39:35 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Thu Jun 5 11:39:43 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/rdd/PairRDDFunctions.scala |  6 ++++--
 .../test/scala/org/apache/spark/FileSuite.scala | 22 ++++++++++++++++++++
 docs/configuration.md                           |  8 +++++++
 3 files changed, 34 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8100cbdb/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 223fef7..c405641 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -689,7 +689,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     val outfmt = job.getOutputFormatClass
     val jobFormat = outfmt.newInstance
 
-    if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
+    if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
+      jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
       // FileOutputFormat ignores the filesystem parameter
       jobFormat.checkOutputSpecs(job)
     }
@@ -755,7 +756,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " 
+
       valueClass.getSimpleName + ")")
 
-    if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
+    if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
+      outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
       // FileOutputFormat ignores the filesystem parameter
       val ignoredFs = FileSystem.get(conf)
       conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)

http://git-wip-us.apache.org/repos/asf/spark/blob/8100cbdb/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala 
b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 1f2206b..070e974 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -230,6 +230,17 @@ class FileSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test ("allow user to disable the output directory existence checking (old 
Hadoop API") {
+    val sf = new SparkConf()
+    
sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs",
 "false")
+    sc = new SparkContext(sf)
+    val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, 
"c")), 1)
+    randomRDD.saveAsTextFile(tempDir.getPath + "/output")
+    assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
+    randomRDD.saveAsTextFile(tempDir.getPath + "/output")
+    assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
+  }
+
   test ("prevent user from overwriting the empty directory (new Hadoop API)") {
     sc = new SparkContext("local", "test")
     val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), 
("key3", "b"), ("key4", "c")), 1)
@@ -248,6 +259,17 @@ class FileSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test ("allow user to disable the output directory existence checking (new 
Hadoop API") {
+    val sf = new SparkConf()
+    
sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs",
 "false")
+    sc = new SparkContext(sf)
+    val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), 
("key3", "b"), ("key4", "c")), 1)
+    randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, 
String]](tempDir.getPath + "/output")
+    assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === 
true)
+    randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, 
String]](tempDir.getPath + "/output")
+    assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === 
true)
+  }
+
   test ("save Hadoop Dataset through old Hadoop API") {
     sc = new SparkContext("local", "test")
     val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), 
("key3", "b"), ("key4", "c")), 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/8100cbdb/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 0697f7f..71fafa5 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -487,6 +487,14 @@ Apart from these, the following properties are also 
available, and may be useful
     this duration will be cleared as well.
   </td>
 </tr>
+<tr>
+    <td>spark.hadoop.validateOutputSpecs</td>
+    <td>true</td>
+    <td>If set to true, validates the output specification (e.g. checking if 
the output directory already exists) 
+    used in saveAsHadoopFile and other variants. This can be disabled to 
silence exceptions due to pre-existing 
+    output directories. We recommend that users do not disable this except if 
trying to achieve compatibility with 
+    previous versions of Spark. Simply use Hadoop's FileSystem API to delete 
output directories by hand.</td>
+</tr>
 </table>
 
 #### Networking

Reply via email to