[ 
https://issues.apache.org/jira/browse/SPARK-23053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357952#comment-16357952
 ] 

huangtengfei commented on SPARK-23053:
--------------------------------------

the following is a repro case, for clarity 
{code:java}
/** Wrapped rdd partition. */
class WrappedPartition(val partition: Partition) extends Partition {
  def index: Int = partition.index
}

/**
 * An RDD with a particular defined Partition which is WrappedPartition.
 * The compute method will cast the split to WrappedPartition. The cast 
operation will be
 * used in this test suite.
 */
class WrappedRDD(parent: RDD[Int]) extends RDD[Int](parent) {
  protected def getPartitions: Array[Partition] = {
    parent.partitions.map(p => new WrappedPartition(p))
  }

  def compute(split: Partition, context: TaskContext): Iterator[Int] = {
    parent.compute(split.asInstanceOf[WrappedPartition].partition, context)
  }
}
{code}
{code:java}
/**
 * In this repro, we simulate the scene in concurrent jobs using the same
 * rdd which is marked to do checkpoint:
 * Job one has already finished the spark job, and start the process of 
doCheckpoint;
 * Job two is submitted, and submitMissingTasks is called.
 * In submitMissingTasks, if taskSerialization is called before doCheckpoint is 
done,
 * while part calculates from stage.rdd.partitions is called after doCheckpoint 
is done,
 * we may get a ClassCastException when execute the task because of some rdd 
will do
 * Partition cast.
 *
 * With this test case, just want to indicate that we should do 
taskSerialization and
 * part calculate in submitMissingTasks with the same rdd checkpoint status.
 */
repro("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
  // set checkpointDir.
  val checkpointDir = Utils.createTempDir()
  sc.setCheckpointDir(checkpointDir.toString)

  // Semaphores to control the process sequence for the two threads below.
  val doCheckpointStarted = new Semaphore(0)
  val taskBinaryBytesFinished = new Semaphore(0)
  val checkpointStateUpdated = new Semaphore(0)

  val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
  rdd.checkpoint()

  val checkpointRunnable = new Runnable {
    override def run() = {
      // Simulate what RDD.doCheckpoint() does here.
      rdd.doCheckpointCalled = true
      val checkpointData = rdd.checkpointData.get
      RDDCheckpointData.synchronized {
        if (checkpointData.cpState == CheckpointState.Initialized) {
          checkpointData.cpState = CheckpointState.CheckpointingInProgress
        }
      }

      val newRDD = checkpointData.doCheckpoint()

      // Release doCheckpointStarted after job triggered in checkpoint 
finished, so
      // that taskBinary serialization can start.
      doCheckpointStarted.release()
      // Wait until taskBinary serialization finished in 
submitMissingTasksThread.
      taskBinaryBytesFinished.acquire()

      // Update our state and truncate the RDD lineage.
      RDDCheckpointData.synchronized {
        checkpointData.cpRDD = Some(newRDD)
        checkpointData.cpState = CheckpointState.Checkpointed
        rdd.markCheckpointed()
      }
      checkpointStateUpdated.release()
    }
  }

  val submitMissingTasksRunnable = new Runnable {
    override def run() = {
      // Simulate the process of submitMissingTasks.
      // Wait until doCheckpoint job running finished, but checkpoint status 
not changed.
      doCheckpointStarted.acquire()

      val ser = SparkEnv.get.closureSerializer.newInstance()

      // Simulate task serialization while submitMissingTasks.
      // Task serialized with rdd checkpoint not finished.
      val cleanedFunc = sc.clean(Utils.getIteratorSize _)
      val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
      val taskBinaryBytes = JavaUtils.bufferToArray(
        ser.serialize((rdd, func): AnyRef))
      // Because partition calculate is in a synchronized block, so in the 
fixed code
      // partition is calculated here.
      val correctPart = rdd.partitions(0)

      // Release taskBinaryBytesFinished so changing checkpoint status to 
Checkpointed will
      // be done in checkpointThread.
      taskBinaryBytesFinished.release()
      // Wait until checkpoint status changed to Checkpointed in 
checkpointThread.
      checkpointStateUpdated.acquire()

      // Now we're done simulating the interleaving that might happen within 
the scheduler,
      // we'll check to make sure the final state is OK by simulating a couple 
steps that
      // normally happen on the executor.
      // Part calculated with rdd checkpoint already finished.
      val errPart = rdd.partitions(0)

      // TaskBinary will be deserialized when run task in executor.
      val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, 
Iterator[Int]) => Unit)](
        ByteBuffer.wrap(taskBinaryBytes), 
Thread.currentThread.getContextClassLoader)

      val taskContext = mock(classOf[TaskContext])
      doNothing().when(taskContext).killTaskIfInterrupted()

      // Make sure our test case is setup correctly -- we expect a 
ClassCastException here
      // if we use the rdd.partitions after checkpointing was done, but our 
binary bytes is
      // from before it finished.
      intercept[ClassCastException] {
        // Triggered when runTask in executor.
        taskRdd.iterator(errPart, taskContext)
      }

      // Execute successfully with correctPart.
      taskRdd.iterator(correctPart, taskContext)
    }
  }

  try {
    new Thread(checkpointRunnable).start()
    val submitMissingTasksThread = new Thread(submitMissingTasksRunnable)
    submitMissingTasksThread.start()
    submitMissingTasksThread.join()
  } finally {
    Utils.deleteRecursively(checkpointDir)
  }
}
{code}
 

> taskBinarySerialization and task partitions calculate in 
> DagScheduler.submitMissingTasks should keep the same RDD checkpoint status
> -----------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-23053
>                 URL: https://issues.apache.org/jira/browse/SPARK-23053
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler, Spark Core
>    Affects Versions: 2.1.0
>            Reporter: huangtengfei
>            Priority: Major
>
> When we run concurrent jobs using the same rdd which is marked to do 
> checkpoint. If one job has finished running the job, and start the process of 
> RDD.doCheckpoint, while another job is submitted, then submitStage and 
> submitMissingTasks will be called. In 
> [submitMissingTasks|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961],
>  will serialize taskBinaryBytes and calculate task partitions which are both 
> affected by the status of checkpoint, if the former is calculated before 
> doCheckpoint finished, while the latter is calculated after doCheckpoint 
> finished, when run task, rdd.compute will be called, for some rdds with 
> particular partition type such as 
> [MapWithStateRDD|https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala]
>  who will do partition type cast, will get a ClassCastException because the 
> part params is actually a CheckpointRDDPartition.
> This error occurs because rdd.doCheckpoint occurs in the same thread that 
> called sc.runJob, while the task serialization occurs in the DAGSchedulers 
> event loop.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to