[ https://issues.apache.org/jira/browse/SPARK-4196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen updated SPARK-4196: ----------------------------- Summary: Streaming + checkpointing + saveAsNewAPIHadoopFiles = NotSerializableException for Hadoop Configuration (was: Streaming + checkpointing yields NotSerializableException for Hadoop Configuration from saveAsNewAPIHadoopFiles ?) More info. The problem is that {{CheckpointWriter}} serializes the {{DStreamGraph}} when checkpointing is enabled. In the case of, for example, {{saveAsNewAPIHadoopFiles}}, this includes a {{ForEachDStream}} with a reference to a Hadoop {{Configuration}}. This isn't a problem without checkpointing because Spark is not going to need to serialize this {{ForEachDStream}} closure to execute it in general. But it does to checkpoint it. Does that make sense? I'm not sure what to do but this is presenting a significant problem to me as I can't see a sly workaround to make streaming, with saving Hadoop files, with checkpointing, to work. Here's a cobbled-together test that shows the problem: {code} test("recovery with save to HDFS stream") { // Set up the streaming context and input streams val testDir = Utils.createTempDir() val outDir = Utils.createTempDir() var ssc = new StreamingContext(master, framework, Seconds(1)) ssc.checkpoint(checkpointDir) val fileStream = ssc.textFileStream(testDir.toString) for (i <- Seq(1, 2, 3)) { Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8")) // wait to make sure that the file is written such that it gets shown in the file listings } val reducedStream = fileStream.map(x => (x, x)).saveAsNewAPIHadoopFiles( outDir.toURI.toString, "saveAsNewAPIHadoopFilesTest", classOf[Text], classOf[Text], classOf[TextOutputFormat[Text,Text]], ssc.sparkContext.hadoopConfiguration) ssc.start() ssc.awaitTermination(5000) ssc.stop() val checkpointDirFile = new File(checkpointDir) assert(outDir.listFiles().length > 0) assert(checkpointDirFile.listFiles().length == 1) assert(checkpointDirFile.listFiles()(0).listFiles().length > 0) Utils.deleteRecursively(testDir) Utils.deleteRecursively(outDir) } {code} You'll see the {{NotSerializableException}} clearly if you hack {{Checkpoint.write()}}: {code} def write(checkpoint: Checkpoint) { val bos = new ByteArrayOutputStream() val zos = compressionCodec.compressedOutputStream(bos) val oos = new ObjectOutputStream(zos) try { oos.writeObject(checkpoint) } catch { case e: Exception => e.printStackTrace() throw e } ... {code} > Streaming + checkpointing + saveAsNewAPIHadoopFiles = > NotSerializableException for Hadoop Configuration > ------------------------------------------------------------------------------------------------------- > > Key: SPARK-4196 > URL: https://issues.apache.org/jira/browse/SPARK-4196 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 1.1.0 > Reporter: Sean Owen > > I am reasonably sure there is some issue here in Streaming and that I'm not > missing something basic, but not 100%. I went ahead and posted it as a JIRA > to track, since it's come up a few times before without resolution, and right > now I can't get checkpointing to work at all. > When Spark Streaming checkpointing is enabled, I see a > NotSerializableException thrown for a Hadoop Configuration object, and it > seems like it is not one from my user code. > Before I post my particular instance see > http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3c1408135046777-12202.p...@n3.nabble.com%3E > for another occurrence. > I was also on customer site last week debugging an identical issue with > checkpointing in a Scala-based program and they also could not enable > checkpointing without hitting exactly this error. > The essence of my code is: > {code} > final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); > JavaStreamingContextFactory streamingContextFactory = new > JavaStreamingContextFactory() { > @Override > public JavaStreamingContext create() { > return new JavaStreamingContext(sparkContext, new > Duration(batchDurationMS)); > } > }; > streamingContext = JavaStreamingContext.getOrCreate( > checkpointDirString, sparkContext.hadoopConfiguration(), > streamingContextFactory, false); > streamingContext.checkpoint(checkpointDirString); > {code} > It yields: > {code} > 2014-10-31 14:29:00,211 ERROR OneForOneStrategy:66 > org.apache.hadoop.conf.Configuration > - field (class > "org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9", > name: "conf$2", type: "class org.apache.hadoop.conf.Configuration") > - object (class > "org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9", > <function2>) > - field (class "org.apache.spark.streaming.dstream.ForEachDStream", > name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc", > type: "interface scala.Function2") > - object (class "org.apache.spark.streaming.dstream.ForEachDStream", > org.apache.spark.streaming.dstream.ForEachDStream@cb8016a) > ... > {code} > This looks like it's due to PairRDDFunctions, as this saveFunc seems > to be org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9 > : > {code} > def saveAsNewAPIHadoopFiles( > prefix: String, > suffix: String, > keyClass: Class[_], > valueClass: Class[_], > outputFormatClass: Class[_ <: NewOutputFormat[_, _]], > conf: Configuration = new Configuration > ) { > val saveFunc = (rdd: RDD[(K, V)], time: Time) => { > val file = rddToFileName(prefix, suffix, time) > rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, > outputFormatClass, conf) > } > self.foreachRDD(saveFunc) > } > {code} > Is that not a problem? but then I don't know how it would ever work in Spark. > But then again I don't see why this is an issue and only when checkpointing > is enabled. > Long-shot, but I wonder if it is related to closure issues like > https://issues.apache.org/jira/browse/SPARK-1866 -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org