[ https://issues.apache.org/jira/browse/SPARK-23053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357952#comment-16357952 ]
huangtengfei commented on SPARK-23053: -------------------------------------- the following is a repro case, for clarity {code:java} /** Wrapped rdd partition. */ class WrappedPartition(val partition: Partition) extends Partition { def index: Int = partition.index } /** * An RDD with a particular defined Partition which is WrappedPartition. * The compute method will cast the split to WrappedPartition. The cast operation will be * used in this test suite. */ class WrappedRDD(parent: RDD[Int]) extends RDD[Int](parent) { protected def getPartitions: Array[Partition] = { parent.partitions.map(p => new WrappedPartition(p)) } def compute(split: Partition, context: TaskContext): Iterator[Int] = { parent.compute(split.asInstanceOf[WrappedPartition].partition, context) } } {code} {code:java} /** * In this repro, we simulate the scene in concurrent jobs using the same * rdd which is marked to do checkpoint: * Job one has already finished the spark job, and start the process of doCheckpoint; * Job two is submitted, and submitMissingTasks is called. * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done, * while part calculates from stage.rdd.partitions is called after doCheckpoint is done, * we may get a ClassCastException when execute the task because of some rdd will do * Partition cast. * * With this test case, just want to indicate that we should do taskSerialization and * part calculate in submitMissingTasks with the same rdd checkpoint status. */ repro("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") { // set checkpointDir. val checkpointDir = Utils.createTempDir() sc.setCheckpointDir(checkpointDir.toString) // Semaphores to control the process sequence for the two threads below. val doCheckpointStarted = new Semaphore(0) val taskBinaryBytesFinished = new Semaphore(0) val checkpointStateUpdated = new Semaphore(0) val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4)) rdd.checkpoint() val checkpointRunnable = new Runnable { override def run() = { // Simulate what RDD.doCheckpoint() does here. rdd.doCheckpointCalled = true val checkpointData = rdd.checkpointData.get RDDCheckpointData.synchronized { if (checkpointData.cpState == CheckpointState.Initialized) { checkpointData.cpState = CheckpointState.CheckpointingInProgress } } val newRDD = checkpointData.doCheckpoint() // Release doCheckpointStarted after job triggered in checkpoint finished, so // that taskBinary serialization can start. doCheckpointStarted.release() // Wait until taskBinary serialization finished in submitMissingTasksThread. taskBinaryBytesFinished.acquire() // Update our state and truncate the RDD lineage. RDDCheckpointData.synchronized { checkpointData.cpRDD = Some(newRDD) checkpointData.cpState = CheckpointState.Checkpointed rdd.markCheckpointed() } checkpointStateUpdated.release() } } val submitMissingTasksRunnable = new Runnable { override def run() = { // Simulate the process of submitMissingTasks. // Wait until doCheckpoint job running finished, but checkpoint status not changed. doCheckpointStarted.acquire() val ser = SparkEnv.get.closureSerializer.newInstance() // Simulate task serialization while submitMissingTasks. // Task serialized with rdd checkpoint not finished. val cleanedFunc = sc.clean(Utils.getIteratorSize _) val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it) val taskBinaryBytes = JavaUtils.bufferToArray( ser.serialize((rdd, func): AnyRef)) // Because partition calculate is in a synchronized block, so in the fixed code // partition is calculated here. val correctPart = rdd.partitions(0) // Release taskBinaryBytesFinished so changing checkpoint status to Checkpointed will // be done in checkpointThread. taskBinaryBytesFinished.release() // Wait until checkpoint status changed to Checkpointed in checkpointThread. checkpointStateUpdated.acquire() // Now we're done simulating the interleaving that might happen within the scheduler, // we'll check to make sure the final state is OK by simulating a couple steps that // normally happen on the executor. // Part calculated with rdd checkpoint already finished. val errPart = rdd.partitions(0) // TaskBinary will be deserialized when run task in executor. val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) => Unit)]( ByteBuffer.wrap(taskBinaryBytes), Thread.currentThread.getContextClassLoader) val taskContext = mock(classOf[TaskContext]) doNothing().when(taskContext).killTaskIfInterrupted() // Make sure our test case is setup correctly -- we expect a ClassCastException here // if we use the rdd.partitions after checkpointing was done, but our binary bytes is // from before it finished. intercept[ClassCastException] { // Triggered when runTask in executor. taskRdd.iterator(errPart, taskContext) } // Execute successfully with correctPart. taskRdd.iterator(correctPart, taskContext) } } try { new Thread(checkpointRunnable).start() val submitMissingTasksThread = new Thread(submitMissingTasksRunnable) submitMissingTasksThread.start() submitMissingTasksThread.join() } finally { Utils.deleteRecursively(checkpointDir) } } {code} > taskBinarySerialization and task partitions calculate in > DagScheduler.submitMissingTasks should keep the same RDD checkpoint status > ----------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-23053 > URL: https://issues.apache.org/jira/browse/SPARK-23053 > Project: Spark > Issue Type: Bug > Components: Scheduler, Spark Core > Affects Versions: 2.1.0 > Reporter: huangtengfei > Priority: Major > > When we run concurrent jobs using the same rdd which is marked to do > checkpoint. If one job has finished running the job, and start the process of > RDD.doCheckpoint, while another job is submitted, then submitStage and > submitMissingTasks will be called. In > [submitMissingTasks|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961], > will serialize taskBinaryBytes and calculate task partitions which are both > affected by the status of checkpoint, if the former is calculated before > doCheckpoint finished, while the latter is calculated after doCheckpoint > finished, when run task, rdd.compute will be called, for some rdds with > particular partition type such as > [MapWithStateRDD|https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala] > who will do partition type cast, will get a ClassCastException because the > part params is actually a CheckpointRDDPartition. > This error occurs because rdd.doCheckpoint occurs in the same thread that > called sc.runJob, while the task serialization occurs in the DAGSchedulers > event loop. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org