Repository: spark
Updated Branches:
  refs/heads/branch-1.2 1e356a8fa -> a9944c809


[SPARK-4196][SPARK-4602][Streaming] Fix serialization issue in 
PairDStreamFunctions.saveAsNewAPIHadoopFiles

Solves two JIRAs in one shot
- Makes the ForechDStream created by saveAsNewAPIHadoopFiles serializable for 
checkpoints
- Makes the default configuration object used saveAsNewAPIHadoopFiles be the 
Spark's hadoop configuration

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #3457 from tdas/savefiles-fix and squashes the following commits:

bb4729a [Tathagata Das] Same treatment for saveAsHadoopFiles
b382ea9 [Tathagata Das] Fix serialization issue in 
PairDStreamFunctions.saveAsNewAPIHadoopFiles.

(cherry picked from commit 8838ad7c135a585cde015dc38b5cb23314502dd9)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a9944c80
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a9944c80
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a9944c80

Branch: refs/heads/branch-1.2
Commit: a9944c809017cc61c9c2e38efe9d709dfb0a94cd
Parents: 1e356a8
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Tue Nov 25 14:16:27 2014 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Nov 25 14:41:30 2014 -0800

----------------------------------------------------------------------
 .../dstream/PairDStreamFunctions.scala          | 30 ++++++-----
 .../spark/streaming/CheckpointSuite.scala       | 56 +++++++++++++++++++-
 2 files changed, 70 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a9944c80/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index f405dda..806fe5f 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -17,20 +17,17 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.StreamingContext._
-
-import org.apache.spark.{Partitioner, HashPartitioner}
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
 
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
-import org.apache.hadoop.mapred.OutputFormat
 import org.apache.hadoop.conf.Configuration
-import org.apache.spark.streaming.{Time, Duration}
+import org.apache.hadoop.mapred.{JobConf, OutputFormat}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
+
+import org.apache.spark.{HashPartitioner, Partitioner, SerializableWritable}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.{Duration, Time}
+import org.apache.spark.streaming.StreamingContext._
 
 /**
  * Extra functions available on DStream of (key, value) pairs through an 
implicit conversion.
@@ -625,11 +622,13 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
       keyClass: Class[_],
       valueClass: Class[_],
       outputFormatClass: Class[_ <: OutputFormat[_, _]],
-      conf: JobConf = new JobConf
+      conf: JobConf = new JobConf(ssc.sparkContext.hadoopConfiguration)
     ) {
+    // Wrap conf in SerializableWritable so that ForeachDStream can be 
serialized for checkpoints
+    val serializableConf = new SerializableWritable(conf)
     val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
       val file = rddToFileName(prefix, suffix, time)
-      rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
+      rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, 
serializableConf.value)
     }
     self.foreachRDD(saveFunc)
   }
@@ -656,11 +655,14 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
       keyClass: Class[_],
       valueClass: Class[_],
       outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
-      conf: Configuration = new Configuration
+      conf: Configuration = ssc.sparkContext.hadoopConfiguration
     ) {
+    // Wrap conf in SerializableWritable so that ForeachDStream can be 
serialized for checkpoints
+    val serializableConf = new SerializableWritable(conf)
     val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
       val file = rddToFileName(prefix, suffix, time)
-      rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, 
outputFormatClass, conf)
+      rdd.saveAsNewAPIHadoopFile(
+        file, keyClass, valueClass, outputFormatClass, serializableConf.value)
     }
     self.foreachRDD(saveFunc)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a9944c80/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 77ff1ca..c97998a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -22,9 +22,14 @@ import java.nio.charset.Charset
 
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
+
 import com.google.common.io.Files
-import org.apache.hadoop.fs.{Path, FileSystem}
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.io.{IntWritable, Text}
+import org.apache.hadoop.mapred.TextOutputFormat
+import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => 
NewTextOutputFormat}
+
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
 import org.apache.spark.streaming.util.ManualClock
@@ -205,6 +210,51 @@ class CheckpointSuite extends TestSuiteBase {
     testCheckpointedOperation(input, operation, output, 7)
   }
 
+  test("recovery with saveAsHadoopFiles operation") {
+    val tempDir = Files.createTempDir()
+    try {
+      testCheckpointedOperation(
+        Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), 
Seq("", ""), Seq()),
+        (s: DStream[String]) => {
+          val output = s.map(x => (x, 1)).reduceByKey(_ + _)
+          output.saveAsHadoopFiles(
+            tempDir.toURI.toString,
+            "result",
+            classOf[Text],
+            classOf[IntWritable],
+            classOf[TextOutputFormat[Text, IntWritable]])
+          output
+        },
+        Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 
1)), Seq(("", 2)), Seq()),
+        3
+      )
+    } finally {
+      Utils.deleteRecursively(tempDir)
+    }
+  }
+
+  test("recovery with saveAsNewAPIHadoopFiles operation") {
+    val tempDir = Files.createTempDir()
+    try {
+      testCheckpointedOperation(
+        Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), 
Seq("", ""), Seq()),
+        (s: DStream[String]) => {
+          val output = s.map(x => (x, 1)).reduceByKey(_ + _)
+          output.saveAsNewAPIHadoopFiles(
+            tempDir.toURI.toString,
+            "result",
+            classOf[Text],
+            classOf[IntWritable],
+            classOf[NewTextOutputFormat[Text, IntWritable]])
+          output
+        },
+        Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 
1)), Seq(("", 2)), Seq()),
+        3
+      )
+    } finally {
+      Utils.deleteRecursively(tempDir)
+    }
+  }
 
   // This tests whether the StateDStream's RDD checkpoints works correctly such
   // that the system can recover from a master failure. This assumes as 
reliable,
@@ -391,7 +441,9 @@ class CheckpointSuite extends TestSuiteBase {
     logInfo("Manual clock after advancing = " + clock.time)
     Thread.sleep(batchDuration.milliseconds)
 
-    val outputStream = 
ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
+    val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
+      dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
+    }.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
     outputStream.output.map(_.flatten)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to