I think Chang is right, but I also think this only comes up in limited
scenarios.  I initially thought it wasn't a bug, but after some more
thought I have some concerns in light of the issues we've had w/
nondeterministic RDDs, eg. repartition().

Say I have code like this:

val cachedRDD = sc.textFile(...).cache()
(0 until 200).par.foreach { idx => cachedRDD.doSomeAction(idx) }

that is, my cached rdd is referenced by many threads concurrently before
the RDD has been cached.

When one of those tasks gets to cachedRDD.getOrCompute(), there are a few
possible scenarios:

1) the partition has never been referenced before.
BlockManager.getOrCompute() will say the block doesn't exist, so it will
get recomputed (
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L360
)

2) The partition has been fully materialized by another task, the
blockmanagermaster on the driver already knows about it, so
BlockManager.getOrCompute() will return a pointer to the cached block
(perhaps on another node)

3) The partition is actively being computed by another task on the same
executor.  Then BlockManager.getOrCompute() will not know about that other
version of the task (it only knows about blocks that are fully
materialized, IIUC).  But eventually, when the tasks try to actually write
the data, they'll try to get a write lock for the block:
https://github.com/apache/spark/blob/f09c1a36c4b0ca1fb450e274b22294dca590d8f8/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1218
one task will get the write lock first; the other task will block on the
other task, and then realize the block exists and just return those values.

4) The partition is actively being compute by another task on a *different*
executor.  IIUC, Spark doesn't try to do anything to prevent both tasks
from computing the block themselves in this case.  (To do so would require
extra coordination in driver before writing every single block.)  Those
locks in BlockManager and BlockInfoManager don't stop this case, because
this is happening in entirely independent JVMs.
There normally won't be any problem here -- if the RDD is totally
deterministic, then you'll just end up with an extra copy of the data.  In
a way, this is good, the cached RDD is in high demand, so having an extra
copy isn't so bad.
OTOH, if the RDD is non-deterministic, you've now got two copies with
different values.  Then again, RDD cache is not resilient in general, so
you've always got to be able to handle an RDD getting recomputed if its
evicted from the cache.  So this should be pretty similar.

On Mon, Nov 25, 2019 at 2:29 AM Weichen Xu <weichen...@databricks.com>
wrote:

> emmm, I haven't check code, but I think if an RDD is referenced in several
> places, the correct behavior should be: when this RDD data is needed, it
> will be computed and then cached only once, otherwise it should be treated
> as a bug. If you are suspicious there's a race condition, you could create
> a jira ticket.
>
> On Mon, Nov 25, 2019 at 12:21 PM Chang Chen <baibaic...@gmail.com> wrote:
>
>> Sorry I did't describe clearly,  RDD id itself is thread-safe, how about
>> cached data?
>>
>> See codes from BlockManager
>>
>> def getOrElseUpdate(...)   = {
>>   get[T](blockId)(classTag) match {
>>    case ...
>>    case _ =>                                      // 1. no data is cached.
>>     // Need to compute the block
>>  }
>>  // Initially we hold no locks on this block
>>  doPutIterator(...) match{..}
>> }
>>
>> Considering  two DAGs (contain the same cached RDD ) runs
>> simultaneously,  if both returns none  when they get same block from
>> BlockManager(i.e. #1 above), then I guess the same data would be cached
>> twice.
>>
>> If the later cache could override the previous data, and no memory is
>> waste, then this is OK
>>
>> Thanks
>> Chang
>>
>>
>> Weichen Xu <weichen...@databricks.com> 于2019年11月25日周一 上午11:52写道:
>>
>>> Rdd id is immutable and when rdd object created, the rdd id is
>>> generated. So why there is race condition in "rdd id" ?
>>>
>>> On Mon, Nov 25, 2019 at 11:31 AM Chang Chen <baibaic...@gmail.com>
>>> wrote:
>>>
>>>> I am wonder the concurrent semantics for reason about the correctness.
>>>> If the two query simultaneously run the DAGs which use the same cached
>>>> DF\RDD,but before cache data actually happen, what will happen?
>>>>
>>>> By looking into code a litter, I suspect they have different BlockID
>>>> for same Dataset which is unexpected behavior, but there is no race
>>>> condition.
>>>>
>>>> However RDD id is not lazy, so there is race condition.
>>>>
>>>> Thanks
>>>> Chang
>>>>
>>>>
>>>> Weichen Xu <weichen...@databricks.com> 于2019年11月12日周二 下午1:22写道:
>>>>
>>>>> Hi Chang,
>>>>>
>>>>> RDD/Dataframe is immutable and lazy computed. They are thread safe.
>>>>>
>>>>> Thanks!
>>>>>
>>>>> On Tue, Nov 12, 2019 at 12:31 PM Chang Chen <baibaic...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi all
>>>>>>
>>>>>> I meet a case where I need cache a source RDD, and then create
>>>>>> different DataFrame from it in different threads to accelerate query.
>>>>>>
>>>>>> I know that SparkSession is thread safe(
>>>>>> https://issues.apache.org/jira/browse/SPARK-15135), but i am not
>>>>>> sure whether RDD  is thread safe or not
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>

Reply via email to