Repository: spark
Updated Branches:
  refs/heads/branch-1.5 4f07a590c -> 0d57a4ae1


[SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles

The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently 
in multiple places:
* The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched
* The JobConf is serialized as part of the DStream checkpoints.
These concurrent accesses (updating in one thread, while the another thread is 
serializing it) can lead to concurrentModidicationException in the underlying 
Java hashmap using in the internal Hadoop Configuration object.

The solution is to create a new JobConf in every batch, that is updated by 
`RDD.saveAsHadoopFile()`, while the checkpointing serializes the original 
JobConf.

Tests to be added in #9988 will fail reliably without this patch. Keeping this 
patch really small to make sure that it can be added to previous branches.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #10088 from tdas/SPARK-12087.

(cherry picked from commit 8a75a3049539eeef04c0db51736e97070c162b46)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d57a4ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d57a4ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d57a4ae

Branch: refs/heads/branch-1.5
Commit: 0d57a4ae10f4ec40386194bc3c8e27f32da09d4d
Parents: 4f07a59
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Tue Dec 1 21:04:52 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Dec 1 21:05:18 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/streaming/dstream/PairDStreamFunctions.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0d57a4ae/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 71bec96..aa36997 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -692,7 +692,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
     val serializableConf = new SerializableJobConf(conf)
     val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
       val file = rddToFileName(prefix, suffix, time)
-      rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, 
serializableConf.value)
+      rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass,
+        new JobConf(serializableConf.value))
     }
     self.foreachRDD(saveFunc)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to