Repository: spark
Updated Branches:
  refs/heads/master e767d7dda -> 939ba1f8f


[SPARK-4835] Disable validateOutputSpecs for Spark Streaming jobs

This patch disables output spec. validation for jobs launched through Spark 
Streaming, since this interferes with checkpoint recovery.

Hadoop OutputFormats have a `checkOutputSpecs` method which performs certain 
checks prior to writing output, such as checking whether the output directory 
already exists.  SPARK-1100 added checks for FileOutputFormat, SPARK-1677 
(#947) added a SparkConf configuration to disable these checks, and SPARK-2309 
(#1088) extended these checks to run for all OutputFormats, not just 
FileOutputFormat.

In Spark Streaming, we might have to re-process a batch during checkpoint 
recovery, so `save` actions may be called multiple times.  In addition to 
`DStream`'s own save actions, users might use `transform` or `foreachRDD` and 
call the `RDD` and `PairRDD` save actions.  When output spec. validation is 
enabled, the second calls to these actions will fail due to existing output.

This patch automatically disables output spec. validation for jobs submitted by 
the Spark Streaming scheduler.  This is done by using Scala's `DynamicVariable` 
to propagate the bypass setting without having to mutate SparkConf or introduce 
a global variable.

Author: Josh Rosen <[email protected]>

Closes #3832 from JoshRosen/SPARK-4835 and squashes the following commits:

36eaf35 [Josh Rosen] Add comment explaining use of transform() in test.
6485cf8 [Josh Rosen] Add test case in Streaming; fix bug for transform()
7b3e06a [Josh Rosen] Remove Streaming-specific setting to undo this change; 
update conf. guide
bf9094d [Josh Rosen] Revise disableOutputSpecValidation() comment to not refer 
to Spark Streaming.
e581d17 [Josh Rosen] Deduplicate isOutputSpecValidationEnabled logic.
762e473 [Josh Rosen] [SPARK-4835] Disable validateOutputSpecs for Spark 
Streaming jobs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/939ba1f8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/939ba1f8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/939ba1f8

Branch: refs/heads/master
Commit: 939ba1f8f6e32fef9026cc43fce55b36e4b9bfd1
Parents: e767d7d
Author: Josh Rosen <[email protected]>
Authored: Sun Jan 4 20:26:18 2015 -0800
Committer: Tathagata Das <[email protected]>
Committed: Sun Jan 4 20:26:18 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/rdd/PairRDDFunctions.scala | 19 +++++++++-
 docs/configuration.md                           |  4 +-
 .../spark/streaming/dstream/DStream.scala       | 10 ++++-
 .../streaming/dstream/TransformedDStream.scala  |  2 +-
 .../streaming/scheduler/JobScheduler.scala      |  8 +++-
 .../spark/streaming/CheckpointSuite.scala       | 39 ++++++++++++++++++++
 6 files changed, 75 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/939ba1f8/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 4469c89..f8df5b2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -25,6 +25,7 @@ import scala.collection.{Map, mutable}
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
+import scala.util.DynamicVariable
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
 import org.apache.hadoop.conf.{Configurable, Configuration}
@@ -964,7 +965,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     val outfmt = job.getOutputFormatClass
     val jobFormat = outfmt.newInstance
 
-    if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
+    if (isOutputSpecValidationEnabled) {
       // FileOutputFormat ignores the filesystem parameter
       jobFormat.checkOutputSpecs(job)
     }
@@ -1042,7 +1043,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " 
+
       valueClass.getSimpleName + ")")
 
-    if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
+    if (isOutputSpecValidationEnabled) {
       // FileOutputFormat ignores the filesystem parameter
       val ignoredFs = FileSystem.get(hadoopConf)
       hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
@@ -1117,8 +1118,22 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   private[spark] def valueClass: Class[_] = vt.runtimeClass
 
   private[spark] def keyOrdering: Option[Ordering[K]] = Option(ord)
+
+  // Note: this needs to be a function instead of a 'val' so that the 
disableOutputSpecValidation
+  // setting can take effect:
+  private def isOutputSpecValidationEnabled: Boolean = {
+    val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value
+    val enabledInConf = 
self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)
+    enabledInConf && !validationDisabled
+  }
 }
 
 private[spark] object PairRDDFunctions {
   val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
+
+  /**
+   * Allows for the `spark.hadoop.validateOutputSpecs` checks to be disabled 
on a case-by-case
+   * basis; see SPARK-4835 for more details.
+   */
+  val disableOutputSpecValidation: DynamicVariable[Boolean] = new 
DynamicVariable[Boolean](false)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/939ba1f8/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index fa9d311..9bb6499 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -709,7 +709,9 @@ Apart from these, the following properties are also 
available, and may be useful
     <td>If set to true, validates the output specification (e.g. checking if 
the output directory already exists)
     used in saveAsHadoopFile and other variants. This can be disabled to 
silence exceptions due to pre-existing
     output directories. We recommend that users do not disable this except if 
trying to achieve compatibility with
-    previous versions of Spark. Simply use Hadoop's FileSystem API to delete 
output directories by hand.</td>
+    previous versions of Spark. Simply use Hadoop's FileSystem API to delete 
output directories by hand.
+    This setting is ignored for jobs generated through Spark Streaming's 
StreamingContext, since
+    data may need to be rewritten to pre-existing output directories during 
checkpoint recovery.</td>
 </tr>
 <tr>
     <td><code>spark.hadoop.cloneConf</code></td>

http://git-wip-us.apache.org/repos/asf/spark/blob/939ba1f8/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 28fc00c..b874f56 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
 import scala.util.matching.Regex
 
 import org.apache.spark.{Logging, SparkException}
-import org.apache.spark.rdd.{BlockRDD, RDD}
+import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.StreamingContext.rddToFileName
@@ -292,7 +292,13 @@ abstract class DStream[T: ClassTag] (
         // set this DStream's creation site, generate RDDs and then restore 
the previous call site.
         val prevCallSite = ssc.sparkContext.getCallSite()
         ssc.sparkContext.setCallSite(creationSite)
-        val rddOption = compute(time)
+        // Disable checks for existing output directories in jobs launched by 
the streaming
+        // scheduler, since we may need to write output to an existing 
directory during checkpoint
+        // recovery; see SPARK-4835 for more details. We need to have this 
call here because
+        // compute() might cause Spark jobs to be launched.
+        val rddOption = 
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+          compute(time)
+        }
         ssc.sparkContext.setCallSite(prevCallSite)
 
         rddOption.foreach { case newRDD =>

http://git-wip-us.apache.org/repos/asf/spark/blob/939ba1f8/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index 7cd4554..71b6185 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{PairRDDFunctions, RDD}
 import org.apache.spark.streaming.{Duration, Time}
 import scala.reflect.ClassTag
 

http://git-wip-us.apache.org/repos/asf/spark/blob/939ba1f8/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index cfa3cd8..0e0f5bd 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConversions._
 import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors}
 import akka.actor.{ActorRef, Actor, Props}
 import org.apache.spark.{SparkException, Logging, SparkEnv}
+import org.apache.spark.rdd.PairRDDFunctions
 import org.apache.spark.streaming._
 
 
@@ -168,7 +169,12 @@ class JobScheduler(val ssc: StreamingContext) extends 
Logging {
   private class JobHandler(job: Job) extends Runnable {
     def run() {
       eventActor ! JobStarted(job)
-      job.run()
+      // Disable checks for existing output directories in jobs launched by 
the streaming scheduler,
+      // since we may need to write output to an existing directory during 
checkpoint recovery;
+      // see SPARK-4835 for more details.
+      PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+        job.run()
+      }
       eventActor ! JobCompleted(job)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/939ba1f8/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 72d055e..5d232c6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -255,6 +255,45 @@ class CheckpointSuite extends TestSuiteBase {
     }
   }
 
+  test("recovery with saveAsHadoopFile inside transform operation") {
+    // Regression test for SPARK-4835.
+    //
+    // In that issue, the problem was that `saveAsHadoopFile(s)` would fail 
when the last batch
+    // was restarted from a checkpoint since the output directory would 
already exist.  However,
+    // the other saveAsHadoopFile* tests couldn't catch this because they only 
tested whether the
+    // output matched correctly and not whether the post-restart batch had 
successfully finished
+    // without throwing any errors.  The following test reproduces the same 
bug with a test that
+    // actually fails because the error in saveAsHadoopFile causes transform() 
to fail, which
+    // prevents the expected output from being written to the output stream.
+    //
+    // This is not actually a valid use of transform, but it's being used here 
so that we can test
+    // the fix for SPARK-4835 independently of additional test cleanup.
+    //
+    // After SPARK-5079 is addressed, should be able to remove this test since 
a strengthened
+    // version of the other saveAsHadoopFile* tests would prevent regressions 
for this issue.
+    val tempDir = Files.createTempDir()
+    try {
+      testCheckpointedOperation(
+        Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), 
Seq("", ""), Seq()),
+        (s: DStream[String]) => {
+          s.transform { (rdd, time) =>
+            val output = rdd.map(x => (x, 1)).reduceByKey(_ + _)
+            output.saveAsHadoopFile(
+              new File(tempDir, "result-" + time.milliseconds).getAbsolutePath,
+              classOf[Text],
+              classOf[IntWritable],
+              classOf[TextOutputFormat[Text, IntWritable]])
+            output
+          }
+        },
+        Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 
1)), Seq(("", 2)), Seq()),
+        3
+      )
+    } finally {
+      Utils.deleteRecursively(tempDir)
+    }
+  }
+
   // This tests whether the StateDStream's RDD checkpoints works correctly such
   // that the system can recover from a master failure. This assumes as 
reliable,
   // replayable input source - TestInputDStream.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to