Repository: spark Updated Branches: refs/heads/branch-1.0 d365fbf94 -> 8100cbdb7
SPARK-1677: allow user to disable output dir existence checking https://issues.apache.org/jira/browse/SPARK-1677 For compatibility with older versions of Spark it would be nice to have an option `spark.hadoop.validateOutputSpecs` (default true) for the user to disable the output directory existence checking Author: CodingCat <[email protected]> Closes #947 from CodingCat/SPARK-1677 and squashes the following commits: 7930f83 [CodingCat] miao c0c0e03 [CodingCat] bug fix and doc update 5318562 [CodingCat] bug fix 13219b5 [CodingCat] allow user to disable output dir existence checking (cherry picked from commit 89cdbb087cb2f0d03be2dd77440300c6bd61c792) Signed-off-by: Patrick Wendell <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8100cbdb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8100cbdb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8100cbdb Branch: refs/heads/branch-1.0 Commit: 8100cbdb7546e8438019443cfc00683017c81278 Parents: d365fbf Author: CodingCat <[email protected]> Authored: Thu Jun 5 11:39:35 2014 -0700 Committer: Patrick Wendell <[email protected]> Committed: Thu Jun 5 11:39:43 2014 -0700 ---------------------------------------------------------------------- .../org/apache/spark/rdd/PairRDDFunctions.scala | 6 ++++-- .../test/scala/org/apache/spark/FileSuite.scala | 22 ++++++++++++++++++++ docs/configuration.md | 8 +++++++ 3 files changed, 34 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8100cbdb/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 223fef7..c405641 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -689,7 +689,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val outfmt = job.getOutputFormatClass val jobFormat = outfmt.newInstance - if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) { + if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) && + jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) { // FileOutputFormat ignores the filesystem parameter jobFormat.checkOutputSpecs(job) } @@ -755,7 +756,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + valueClass.getSimpleName + ")") - if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) { + if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) && + outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) { // FileOutputFormat ignores the filesystem parameter val ignoredFs = FileSystem.get(conf) conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf) http://git-wip-us.apache.org/repos/asf/spark/blob/8100cbdb/core/src/test/scala/org/apache/spark/FileSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 1f2206b..070e974 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -230,6 +230,17 @@ class FileSuite extends FunSuite with LocalSparkContext { } } + test ("allow user to disable the output directory existence checking (old Hadoop API") { + val sf = new SparkConf() + sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false") + sc = new SparkContext(sf) + val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) + randomRDD.saveAsTextFile(tempDir.getPath + "/output") + assert(new File(tempDir.getPath + "/output/part-00000").exists() === true) + randomRDD.saveAsTextFile(tempDir.getPath + "/output") + assert(new File(tempDir.getPath + "/output/part-00000").exists() === true) + } + test ("prevent user from overwriting the empty directory (new Hadoop API)") { sc = new SparkContext("local", "test") val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) @@ -248,6 +259,17 @@ class FileSuite extends FunSuite with LocalSparkContext { } } + test ("allow user to disable the output directory existence checking (new Hadoop API") { + val sf = new SparkConf() + sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false") + sc = new SparkContext(sf) + val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) + randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output") + assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true) + randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output") + assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true) + } + test ("save Hadoop Dataset through old Hadoop API") { sc = new SparkContext("local", "test") val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) http://git-wip-us.apache.org/repos/asf/spark/blob/8100cbdb/docs/configuration.md ---------------------------------------------------------------------- diff --git a/docs/configuration.md b/docs/configuration.md index 0697f7f..71fafa5 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -487,6 +487,14 @@ Apart from these, the following properties are also available, and may be useful this duration will be cleared as well. </td> </tr> +<tr> + <td>spark.hadoop.validateOutputSpecs</td> + <td>true</td> + <td>If set to true, validates the output specification (e.g. checking if the output directory already exists) + used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing + output directories. We recommend that users do not disable this except if trying to achieve compatibility with + previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td> +</tr> </table> #### Networking
