[ 
https://issues.apache.org/jira/browse/SPARK-28917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Imran Rashid updated SPARK-28917:
---------------------------------
    Description: 
{{RDD.dependencies}} stores the precomputed cache value, but it is not 
thread-safe.  This can lead to a race where the value gets overwritten, but the 
DAGScheduler gets stuck in an inconsistent state.  In particular, this can 
happen when there is a race between the DAGScheduler event loop, and another 
thread (eg. a user thread, if there is multi-threaded job submission).


First, a job is submitted by the user, which then computes the result Stage and 
its parents:

https://github.com/apache/spark/blob/24655583f1cb5dae2e80bb572604fb4a9761ec07/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L983

Which eventually makes a call to {{rdd.dependencies}}:

https://github.com/apache/spark/blob/24655583f1cb5dae2e80bb572604fb4a9761ec07/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L519

At the same time, the user could also touch {{rdd.dependencies}} in another 
thread, which could overwrite the stored value because of the race.

Then the DAGScheduler checks the dependencies *again* later on in the job 
submission, via {{getMissingParentStages}}

https://github.com/apache/spark/blob/24655583f1cb5dae2e80bb572604fb4a9761ec07/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1025

Because it will find new dependencies, it will create entirely different 
stages.  Now the job has some orphaned stages which will never run.

The symptoms of this are seeing disjoint sets of stages in the "Parents of 
final stage" and the "Missing parents" messages on job submission.

(*EDIT*: Seeing repeated msgs "Registering RDD X" actually is just fine, it is 
not a symptom of a problem at all.  It just means the RDD is the *input* to 
multiple shuffles.)

{noformat}
[INFO] 2019-08-15 23:22:31,570 org.apache.spark.SparkContext logInfo - Starting 
job: count at XXX.scala:462
...
[INFO] 2019-08-15 23:22:31,573 org.apache.spark.scheduler.DAGScheduler logInfo 
- Registering RDD 14 (repartition at XXX.scala:421)
...
...
[INFO] 2019-08-15 23:22:31,582 org.apache.spark.scheduler.DAGScheduler logInfo 
- Got job 1 (count at XXX.scala:462) with 40 output partitions
[INFO] 2019-08-15 23:22:31,582 org.apache.spark.scheduler.DAGScheduler logInfo 
- Final stage: ResultStage 5 (count at XXX.scala:462)
[INFO] 2019-08-15 23:22:31,582 org.apache.spark.scheduler.DAGScheduler logInfo 
- Parents of final stage: List(ShuffleMapStage 4)
[INFO] 2019-08-15 23:22:31,599 org.apache.spark.scheduler.DAGScheduler logInfo 
- Registering RDD 14 (repartition at XXX.scala:421)
[INFO] 2019-08-15 23:22:31,599 org.apache.spark.scheduler.DAGScheduler logInfo 
- Missing parents: List(ShuffleMapStage 6)
{noformat}

Note that there is a similar issue w/ {{rdd.partitions}}. I don't see a way it 
could mess up the scheduler (seems its only used for 
{{rdd.partitions.length}}).  There is also an issue that {{rdd.storageLevel}} 
is read and cached in the scheduler, but it could be modified simultaneously by 
the user in another thread.   Similarly, I can't see a way it could effect the 
scheduler.

*WORKAROUND*:
(a) call {{rdd.dependencies}} while you know that RDD is only getting touched 
by one thread (eg. in the thread that created it, or before you submit multiple 
jobs touching that RDD from other threads). Then that value will get cached.
(b) don't submit jobs from multiple threads.

  was:
{{RDD.dependencies}} stores the precomputed cache value, but it is not 
thread-safe.  This can lead to a race where the value gets overwritten, but the 
DAGScheduler gets stuck in an inconsistent state.  In particular, this can 
happen when there is a race between the DAGScheduler event loop, and another 
thread (eg. a user thread, if there is multi-threaded job submission).


First, a job is submitted by the user, which then computes the result Stage and 
its parents:

https://github.com/apache/spark/blob/24655583f1cb5dae2e80bb572604fb4a9761ec07/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L983

Which eventually makes a call to {{rdd.dependencies}}:

https://github.com/apache/spark/blob/24655583f1cb5dae2e80bb572604fb4a9761ec07/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L519

At the same time, the user could also touch {{rdd.dependencies}} in another 
thread, which could overwrite the stored value because of the race.

Then the DAGScheduler checks the dependencies *again* later on in the job 
submission, via {{getMissingParentStages}}

https://github.com/apache/spark/blob/24655583f1cb5dae2e80bb572604fb4a9761ec07/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1025

Because it will find new dependencies, it will create entirely different 
stages.  Now the job has some orphaned stages which will never run.

The symptoms of this are seeing disjoint sets of stages in the "Parents of 
final stage" and the "Missing parents" messages on job submission, as well as 
seeing repeated messages "Registered RDD X" for the same RDD id.  eg:

{noformat}
[INFO] 2019-08-15 23:22:31,570 org.apache.spark.SparkContext logInfo - Starting 
job: count at XXX.scala:462
...
[INFO] 2019-08-15 23:22:31,573 org.apache.spark.scheduler.DAGScheduler logInfo 
- Registering RDD 14 (repartition at XXX.scala:421)
...
...
[INFO] 2019-08-15 23:22:31,582 org.apache.spark.scheduler.DAGScheduler logInfo 
- Got job 1 (count at XXX.scala:462) with 40 output partitions
[INFO] 2019-08-15 23:22:31,582 org.apache.spark.scheduler.DAGScheduler logInfo 
- Final stage: ResultStage 5 (count at XXX.scala:462)
[INFO] 2019-08-15 23:22:31,582 org.apache.spark.scheduler.DAGScheduler logInfo 
- Parents of final stage: List(ShuffleMapStage 4)
[INFO] 2019-08-15 23:22:31,599 org.apache.spark.scheduler.DAGScheduler logInfo 
- Registering RDD 14 (repartition at XXX.scala:421)
[INFO] 2019-08-15 23:22:31,599 org.apache.spark.scheduler.DAGScheduler logInfo 
- Missing parents: List(ShuffleMapStage 6)
{noformat}

Note that there is a similar issue w/ {{rdd.partitions}}. I don't see a way it 
could mess up the scheduler (seems its only used for 
{{rdd.partitions.length}}).  There is also an issue that {{rdd.storageLevel}} 
is read and cached in the scheduler, but it could be modified simultaneously by 
the user in another thread.   Similarly, I can't see a way it could effect the 
scheduler.

*WORKAROUND*:
(a) call {{rdd.dependencies}} while you know that RDD is only getting touched 
by one thread (eg. in the thread that created it, or before you submit multiple 
jobs touching that RDD from other threads). Then that value will get cached.
(b) don't submit jobs from multiple threads.


> Jobs can hang because of race of RDD.dependencies
> -------------------------------------------------
>
>                 Key: SPARK-28917
>                 URL: https://issues.apache.org/jira/browse/SPARK-28917
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 2.3.3, 2.4.3
>            Reporter: Imran Rashid
>            Priority: Major
>
> {{RDD.dependencies}} stores the precomputed cache value, but it is not 
> thread-safe.  This can lead to a race where the value gets overwritten, but 
> the DAGScheduler gets stuck in an inconsistent state.  In particular, this 
> can happen when there is a race between the DAGScheduler event loop, and 
> another thread (eg. a user thread, if there is multi-threaded job submission).
> First, a job is submitted by the user, which then computes the result Stage 
> and its parents:
> https://github.com/apache/spark/blob/24655583f1cb5dae2e80bb572604fb4a9761ec07/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L983
> Which eventually makes a call to {{rdd.dependencies}}:
> https://github.com/apache/spark/blob/24655583f1cb5dae2e80bb572604fb4a9761ec07/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L519
> At the same time, the user could also touch {{rdd.dependencies}} in another 
> thread, which could overwrite the stored value because of the race.
> Then the DAGScheduler checks the dependencies *again* later on in the job 
> submission, via {{getMissingParentStages}}
> https://github.com/apache/spark/blob/24655583f1cb5dae2e80bb572604fb4a9761ec07/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1025
> Because it will find new dependencies, it will create entirely different 
> stages.  Now the job has some orphaned stages which will never run.
> The symptoms of this are seeing disjoint sets of stages in the "Parents of 
> final stage" and the "Missing parents" messages on job submission.
> (*EDIT*: Seeing repeated msgs "Registering RDD X" actually is just fine, it 
> is not a symptom of a problem at all.  It just means the RDD is the *input* 
> to multiple shuffles.)
> {noformat}
> [INFO] 2019-08-15 23:22:31,570 org.apache.spark.SparkContext logInfo - 
> Starting job: count at XXX.scala:462
> ...
> [INFO] 2019-08-15 23:22:31,573 org.apache.spark.scheduler.DAGScheduler 
> logInfo - Registering RDD 14 (repartition at XXX.scala:421)
> ...
> ...
> [INFO] 2019-08-15 23:22:31,582 org.apache.spark.scheduler.DAGScheduler 
> logInfo - Got job 1 (count at XXX.scala:462) with 40 output partitions
> [INFO] 2019-08-15 23:22:31,582 org.apache.spark.scheduler.DAGScheduler 
> logInfo - Final stage: ResultStage 5 (count at XXX.scala:462)
> [INFO] 2019-08-15 23:22:31,582 org.apache.spark.scheduler.DAGScheduler 
> logInfo - Parents of final stage: List(ShuffleMapStage 4)
> [INFO] 2019-08-15 23:22:31,599 org.apache.spark.scheduler.DAGScheduler 
> logInfo - Registering RDD 14 (repartition at XXX.scala:421)
> [INFO] 2019-08-15 23:22:31,599 org.apache.spark.scheduler.DAGScheduler 
> logInfo - Missing parents: List(ShuffleMapStage 6)
> {noformat}
> Note that there is a similar issue w/ {{rdd.partitions}}. I don't see a way 
> it could mess up the scheduler (seems its only used for 
> {{rdd.partitions.length}}).  There is also an issue that {{rdd.storageLevel}} 
> is read and cached in the scheduler, but it could be modified simultaneously 
> by the user in another thread.   Similarly, I can't see a way it could effect 
> the scheduler.
> *WORKAROUND*:
> (a) call {{rdd.dependencies}} while you know that RDD is only getting touched 
> by one thread (eg. in the thread that created it, or before you submit 
> multiple jobs touching that RDD from other threads). Then that value will get 
> cached.
> (b) don't submit jobs from multiple threads.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to