Repository: spark
Updated Branches:
  refs/heads/branch-1.0 dc18167ee -> 6d8f1dd15


[SPARK-2546] Clone JobConf for each task (branch-1.0 / 1.1 backport)

This patch attempts to fix SPARK-2546 in `branch-1.0` and `branch-1.1`.  The 
underlying problem is that thread-safety issues in Hadoop Configuration objects 
may cause Spark tasks to get stuck in infinite loops.  The approach taken here 
is to clone a new copy of the JobConf for each task rather than sharing a 
single copy between tasks.  Note that there are still Configuration 
thread-safety issues that may affect the driver, but these seem much less 
likely to occur in practice and will be more complex to fix (see discussion on 
the SPARK-2546 ticket).

This cloning is guarded by a new configuration option 
(`spark.hadoop.cloneConf`) and is disabled by default in order to avoid 
unexpected performance regressions for workloads that are unaffected by the 
Configuration thread-safety issues.

Author: Josh Rosen <[email protected]>

Closes #2684 from JoshRosen/jobconf-fix-backport and squashes the following 
commits:

f14f259 [Josh Rosen] Add configuration option to control cloning of Hadoop 
JobConf.
b562451 [Josh Rosen] Remove unused jobConfCacheKey field.
dd25697 [Josh Rosen] [SPARK-2546] [1.0 / 1.1 backport] Clone JobConf for each 
task.

(cherry picked from commit 2cd40db2b3ab5ddcb323fd05c171dbd9025f9e71)
Signed-off-by: Josh Rosen <[email protected]>

Conflicts:
        docs/configuration.md


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6d8f1dd1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6d8f1dd1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6d8f1dd1

Branch: refs/heads/branch-1.0
Commit: 6d8f1dd15afdc7432b5721c89f9b2b402460322b
Parents: dc18167
Author: Josh Rosen <[email protected]>
Authored: Sun Oct 19 00:31:06 2014 -0700
Committer: Josh Rosen <[email protected]>
Committed: Sun Oct 19 00:33:11 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 53 ++++++++++++++------
 docs/configuration.md                           |  9 ++++
 2 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6d8f1dd1/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index d0a2241..99f1f2d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -125,27 +125,47 @@ class HadoopRDD[K, V](
   // used to build JobTracker ID
   private val createTime = new Date()
 
+  private val shouldCloneJobConf = sc.conf.get("spark.hadoop.cloneConf", 
"false").toBoolean
+
   // Returns a JobConf that will be used on slaves to obtain input splits for 
Hadoop reads.
   protected def getJobConf(): JobConf = {
     val conf: Configuration = broadcastedConf.value.value
-    if (conf.isInstanceOf[JobConf]) {
-      // A user-broadcasted JobConf was provided to the HadoopRDD, so always 
use it.
-      conf.asInstanceOf[JobConf]
-    } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
-      // getJobConf() has been called previously, so there is already a local 
cache of the JobConf
-      // needed by this RDD.
-      HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
-    } else {
-      // Create a JobConf that will be cached and used across this RDD's 
getJobConf() calls in the
-      // local process. The local cache is accessed through 
HadoopRDD.putCachedMetadata().
-      // The caching helps minimize GC, since a JobConf can contain ~10KB of 
temporary objects.
-      // Synchronize to prevent ConcurrentModificationException (Spark-1097, 
Hadoop-10456).
+    if (shouldCloneJobConf) {
+      // Hadoop Configuration objects are not thread-safe, which may lead to 
various problems if
+      // one job modifies a configuration while another reads it (SPARK-2546). 
 This problem occurs
+      // somewhat rarely because most jobs treat the configuration as though 
it's immutable.  One
+      // solution, implemented here, is to clone the Configuration object.  
Unfortunately, this
+      // clone can be very expensive.  To avoid unexpected performance 
regressions for workloads and
+      // Hadoop versions that do not suffer from these thread-safety issues, 
this cloning is
+      // disabled by default.
       HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+        logDebug("Cloning Hadoop Configuration")
         val newJobConf = new JobConf(conf)
-        initLocalJobConfFuncOpt.map(f => f(newJobConf))
-        HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+        if (!conf.isInstanceOf[JobConf]) {
+          initLocalJobConfFuncOpt.map(f => f(newJobConf))
+        }
         newJobConf
       }
+    } else {
+      if (conf.isInstanceOf[JobConf]) {
+        logDebug("Re-using user-broadcasted JobConf")
+        conf.asInstanceOf[JobConf]
+      } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+        logDebug("Re-using cached JobConf")
+        HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+      } else {
+        // Create a JobConf that will be cached and used across this RDD's 
getJobConf() calls in the
+        // local process. The local cache is accessed through 
HadoopRDD.putCachedMetadata().
+        // The caching helps minimize GC, since a JobConf can contain ~10KB of 
temporary objects.
+        // Synchronize to prevent ConcurrentModificationException (SPARK-1097, 
HADOOP-10456).
+        HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+          logDebug("Creating new JobConf and caching it for later re-use")
+          val newJobConf = new JobConf(conf)
+          initLocalJobConfFuncOpt.map(f => f(newJobConf))
+          HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+          newJobConf
+        }
+      }
     }
   }
 
@@ -231,7 +251,10 @@ class HadoopRDD[K, V](
 }
 
 private[spark] object HadoopRDD {
-  /** Constructing Configuration objects is not threadsafe, use this lock to 
serialize. */
+  /**
+   * Configuration's constructor is not threadsafe (see SPARK-1097 and 
HADOOP-10456).
+   * Therefore, we synchronize on this lock before calling new JobConf() or 
new Configuration().
+   */
   val CONFIGURATION_INSTANTIATION_LOCK = new Object()
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/6d8f1dd1/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 65b2adf..367b2fd 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -495,6 +495,15 @@ Apart from these, the following properties are also 
available, and may be useful
     output directories. We recommend that users do not disable this except if 
trying to achieve compatibility with 
     previous versions of Spark. Simply use Hadoop's FileSystem API to delete 
output directories by hand.</td>
 </tr>
+<tr>
+    <td><code>spark.hadoop.cloneConf</code></td>
+    <td>false</td>
+    <td>If set to true, clones a new Hadoop <code>Configuration</code> object 
for each task.  This
+    option should be enabled to work around <code>Configuration</code> 
thread-safety issues (see
+    <a href="https://issues.apache.org/jira/browse/SPARK-2546";>SPARK-2546</a> 
for more details).
+    This is disabled by default in order to avoid unexpected performance 
regressions for jobs that
+    are not affected by these issues.</td>
+</tr>
 </table>
 
 #### Networking


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to