Repository: spark Updated Branches: refs/heads/branch-1.0 57526e40a -> bc8d24ac1
Workaround in Spark for ConcurrentModification issue (JIRA Hadoop-10456, Spark-1097) This fix has gone into Hadoop 2.4.1. For developers using < 2.4.1, it would be good to have a workaround in Spark as well. Fix has been tested for performance as well, no regressions found. Author: nravi <[email protected]> Closes #1000 from nishkamravi2/master and squashes the following commits: eb663ca [nravi] Merge branch 'master' of https://github.com/apache/spark df2aeb1 [nravi] Improved fix for ConcurrentModificationIssue (Spark-1097, Hadoop-10456) 6b840f0 [nravi] Undo the fix for SPARK-1758 (the problem is fixed) 5108700 [nravi] Fix in Spark for the Concurrent thread modification issue (SPARK-1097, HADOOP-10456) 681b36f [nravi] Fix for SPARK-1758: failing test org.apache.spark.JavaAPISuite.wholeTextFiles (cherry picked from commit 70c8116c0aecba293234edc44a7f8e58e5008649) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc8d24ac Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc8d24ac Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc8d24ac Branch: refs/heads/branch-1.0 Commit: bc8d24ac1d8d20d359b32f6a965d55a4052b72bc Parents: 57526e4 Author: nravi <[email protected]> Authored: Fri Jun 13 10:52:21 2014 -0700 Committer: Reynold Xin <[email protected]> Committed: Fri Jun 13 10:52:41 2014 -0700 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bc8d24ac/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 6547755..2aa111d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -139,10 +139,13 @@ class HadoopRDD[K, V]( // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. - val newJobConf = new JobConf(broadcastedConf.value.value) - initLocalJobConfFuncOpt.map(f => f(newJobConf)) - HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) - newJobConf + // synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456) + broadcastedConf.synchronized { + val newJobConf = new JobConf(broadcastedConf.value.value) + initLocalJobConfFuncOpt.map(f => f(newJobConf)) + HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) + newJobConf + } } }
