Repository: spark Updated Branches: refs/heads/branch-1.4 f5af299ab -> b6ba2dab2
[SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently in multiple places: * The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched * The JobConf is serialized as part of the DStream checkpoints. These concurrent accesses (updating in one thread, while the another thread is serializing it) can lead to concurrentModidicationException in the underlying Java hashmap using in the internal Hadoop Configuration object. The solution is to create a new JobConf in every batch, that is updated by `RDD.saveAsHadoopFile()`, while the checkpointing serializes the original JobConf. Tests to be added in #9988 will fail reliably without this patch. Keeping this patch really small to make sure that it can be added to previous branches. Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #10088 from tdas/SPARK-12087. (cherry picked from commit 8a75a3049539eeef04c0db51736e97070c162b46) Signed-off-by: Shixiong Zhu <shixi...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b6ba2dab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b6ba2dab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b6ba2dab Branch: refs/heads/branch-1.4 Commit: b6ba2dab26092f56271114aa62f25b2fc9d6adad Parents: f5af299 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Tue Dec 1 21:04:52 2015 -0800 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Tue Dec 1 21:05:37 2015 -0800 ---------------------------------------------------------------------- .../org/apache/spark/streaming/dstream/PairDStreamFunctions.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b6ba2dab/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index 358e4c6..4e392f5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -691,7 +691,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) val serializableConf = new SerializableWritable(conf) val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) - rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value) + rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, + new JobConf(serializableConf.value)) } self.foreachRDD(saveFunc) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org