Very well put Imran. This is a variant of executor failure after an RDD has
been computed (including caching). In general, non determinism in spark is
going to lead to inconsistency.
The only reasonable solution for us, at that time, was to make
pseudo-randomness repeatable and checkpoint after so that recomputation
becomes deterministic.


Regards,
Mridul

On Mon, Nov 25, 2019 at 9:30 AM Imran Rashid <iras...@cloudera.com.invalid>
wrote:

> I think Chang is right, but I also think this only comes up in limited
> scenarios.  I initially thought it wasn't a bug, but after some more
> thought I have some concerns in light of the issues we've had w/
> nondeterministic RDDs, eg. repartition().
>
> Say I have code like this:
>
> val cachedRDD = sc.textFile(...).cache()
> (0 until 200).par.foreach { idx => cachedRDD.doSomeAction(idx) }
>
> that is, my cached rdd is referenced by many threads concurrently before
> the RDD has been cached.
>
> When one of those tasks gets to cachedRDD.getOrCompute(), there are a few
> possible scenarios:
>
> 1) the partition has never been referenced before.
> BlockManager.getOrCompute() will say the block doesn't exist, so it will
> get recomputed (
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L360
> )
>
> 2) The partition has been fully materialized by another task, the
> blockmanagermaster on the driver already knows about it, so
> BlockManager.getOrCompute() will return a pointer to the cached block
> (perhaps on another node)
>
> 3) The partition is actively being computed by another task on the same
> executor.  Then BlockManager.getOrCompute() will not know about that other
> version of the task (it only knows about blocks that are fully
> materialized, IIUC).  But eventually, when the tasks try to actually write
> the data, they'll try to get a write lock for the block:
> https://github.com/apache/spark/blob/f09c1a36c4b0ca1fb450e274b22294dca590d8f8/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1218
> one task will get the write lock first; the other task will block on the
> other task, and then realize the block exists and just return those values.
>
> 4) The partition is actively being compute by another task on a
> *different* executor.  IIUC, Spark doesn't try to do anything to prevent
> both tasks from computing the block themselves in this case.  (To do so
> would require extra coordination in driver before writing every single
> block.)  Those locks in BlockManager and BlockInfoManager don't stop this
> case, because this is happening in entirely independent JVMs.
> There normally won't be any problem here -- if the RDD is totally
> deterministic, then you'll just end up with an extra copy of the data.  In
> a way, this is good, the cached RDD is in high demand, so having an extra
> copy isn't so bad.
> OTOH, if the RDD is non-deterministic, you've now got two copies with
> different values.  Then again, RDD cache is not resilient in general, so
> you've always got to be able to handle an RDD getting recomputed if its
> evicted from the cache.  So this should be pretty similar.
>
> On Mon, Nov 25, 2019 at 2:29 AM Weichen Xu <weichen...@databricks.com>
> wrote:
>
>> emmm, I haven't check code, but I think if an RDD is referenced in
>> several places, the correct behavior should be: when this RDD data is
>> needed, it will be computed and then cached only once, otherwise it should
>> be treated as a bug. If you are suspicious there's a race condition, you
>> could create a jira ticket.
>>
>> On Mon, Nov 25, 2019 at 12:21 PM Chang Chen <baibaic...@gmail.com> wrote:
>>
>>> Sorry I did't describe clearly,  RDD id itself is thread-safe, how about
>>> cached data?
>>>
>>> See codes from BlockManager
>>>
>>> def getOrElseUpdate(...)   = {
>>>   get[T](blockId)(classTag) match {
>>>    case ...
>>>    case _ =>                                      // 1. no data is
>>> cached.
>>>     // Need to compute the block
>>>  }
>>>  // Initially we hold no locks on this block
>>>  doPutIterator(...) match{..}
>>> }
>>>
>>> Considering  two DAGs (contain the same cached RDD ) runs
>>> simultaneously,  if both returns none  when they get same block from
>>> BlockManager(i.e. #1 above), then I guess the same data would be cached
>>> twice.
>>>
>>> If the later cache could override the previous data, and no memory is
>>> waste, then this is OK
>>>
>>> Thanks
>>> Chang
>>>
>>>
>>> Weichen Xu <weichen...@databricks.com> 于2019年11月25日周一 上午11:52写道:
>>>
>>>> Rdd id is immutable and when rdd object created, the rdd id is
>>>> generated. So why there is race condition in "rdd id" ?
>>>>
>>>> On Mon, Nov 25, 2019 at 11:31 AM Chang Chen <baibaic...@gmail.com>
>>>> wrote:
>>>>
>>>>> I am wonder the concurrent semantics for reason about the correctness.
>>>>> If the two query simultaneously run the DAGs which use the same cached
>>>>> DF\RDD,but before cache data actually happen, what will happen?
>>>>>
>>>>> By looking into code a litter, I suspect they have different BlockID
>>>>> for same Dataset which is unexpected behavior, but there is no race
>>>>> condition.
>>>>>
>>>>> However RDD id is not lazy, so there is race condition.
>>>>>
>>>>> Thanks
>>>>> Chang
>>>>>
>>>>>
>>>>> Weichen Xu <weichen...@databricks.com> 于2019年11月12日周二 下午1:22写道:
>>>>>
>>>>>> Hi Chang,
>>>>>>
>>>>>> RDD/Dataframe is immutable and lazy computed. They are thread safe.
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Tue, Nov 12, 2019 at 12:31 PM Chang Chen <baibaic...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi all
>>>>>>>
>>>>>>> I meet a case where I need cache a source RDD, and then create
>>>>>>> different DataFrame from it in different threads to accelerate query.
>>>>>>>
>>>>>>> I know that SparkSession is thread safe(
>>>>>>> https://issues.apache.org/jira/browse/SPARK-15135), but i am not
>>>>>>> sure whether RDD  is thread safe or not
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>

Reply via email to