Repository: spark
Updated Branches:
  refs/heads/branch-2.3 1c81c0c62 -> dbb1b399b


[SPARK-23053][CORE] taskBinarySerialization and task partitions calculate in 
DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

## What changes were proposed in this pull request?

When we run concurrent jobs using the same rdd which is marked to do 
checkpoint. If one job has finished running the job, and start the process of 
RDD.doCheckpoint, while another job is submitted, then submitStage and 
submitMissingTasks will be called. In 
[submitMissingTasks](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961),
 will serialize taskBinaryBytes and calculate task partitions which are both 
affected by the status of checkpoint, if the former is calculated before 
doCheckpoint finished, while the latter is calculated after doCheckpoint 
finished, when run task, rdd.compute will be called, for some rdds with 
particular partition type such as 
[UnionRDD](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala)
 who will do partition type cast, will get a ClassCastException because the 
part params is actually a CheckpointRDDPartition.
This error occurs  because rdd.doCheckpoint occurs in the same thread that 
called sc.runJob, while the task serialization occurs in the DAGSchedulers 
event loop.

## How was this patch tested?

the exist uts and also add a test case in DAGScheduerSuite to show the 
exception case.

Author: huangtengfei <huangtengfei@huangtengfeideMacBook-Pro.local>

Closes #20244 from ivoson/branch-taskpart-mistype.

(cherry picked from commit 091a000d27f324de8c5c527880854ecfcf5de9a4)
Signed-off-by: Imran Rashid <iras...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dbb1b399
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dbb1b399
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dbb1b399

Branch: refs/heads/branch-2.3
Commit: dbb1b399b6cf8372a3659c472f380142146b1248
Parents: 1c81c0c
Author: huangtengfei <huangtengfei@huangtengfeideMacBook-Pro.local>
Authored: Tue Feb 13 09:59:21 2018 -0600
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Tue Feb 13 10:00:05 2018 -0600

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 27 +++++++++++++-------
 1 file changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dbb1b399/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 199937b..8c46a84 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -39,7 +39,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.partial.{ApproximateActionListener, 
ApproximateEvaluator, PartialResult}
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{RDD, RDDCheckpointData}
 import org.apache.spark.rpc.RpcTimeout
 import org.apache.spark.storage._
 import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
@@ -1016,15 +1016,24 @@ class DAGScheduler(
     // might modify state of objects referenced in their closures. This is 
necessary in Hadoop
     // where the JobConf/Configuration object is not thread-safe.
     var taskBinary: Broadcast[Array[Byte]] = null
+    var partitions: Array[Partition] = null
     try {
       // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
       // For ResultTask, serialize and broadcast (rdd, func).
-      val taskBinaryBytes: Array[Byte] = stage match {
-        case stage: ShuffleMapStage =>
-          JavaUtils.bufferToArray(
-            closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
-        case stage: ResultStage =>
-          JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, 
stage.func): AnyRef))
+      var taskBinaryBytes: Array[Byte] = null
+      // taskBinaryBytes and partitions are both effected by the checkpoint 
status. We need
+      // this synchronization in case another concurrent job is checkpointing 
this RDD, so we get a
+      // consistent view of both variables.
+      RDDCheckpointData.synchronized {
+        taskBinaryBytes = stage match {
+          case stage: ShuffleMapStage =>
+            JavaUtils.bufferToArray(
+              closureSerializer.serialize((stage.rdd, stage.shuffleDep): 
AnyRef))
+          case stage: ResultStage =>
+            JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, 
stage.func): AnyRef))
+        }
+
+        partitions = stage.rdd.partitions
       }
 
       taskBinary = sc.broadcast(taskBinaryBytes)
@@ -1049,7 +1058,7 @@ class DAGScheduler(
           stage.pendingPartitions.clear()
           partitionsToCompute.map { id =>
             val locs = taskIdToLocations(id)
-            val part = stage.rdd.partitions(id)
+            val part = partitions(id)
             stage.pendingPartitions += id
             new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
               taskBinary, part, locs, properties, serializedTaskMetrics, 
Option(jobId),
@@ -1059,7 +1068,7 @@ class DAGScheduler(
         case stage: ResultStage =>
           partitionsToCompute.map { id =>
             val p: Int = stage.partitions(id)
-            val part = stage.rdd.partitions(p)
+            val part = partitions(p)
             val locs = taskIdToLocations(id)
             new ResultTask(stage.id, stage.latestInfo.attemptNumber,
               taskBinary, part, locs, id, properties, serializedTaskMetrics,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to