Thank you Imran

I will check whether there is memory waste or not

Imran Rashid <iras...@cloudera.com> 于2019年11月26日周二 上午1:30写道:

> I think Chang is right, but I also think this only comes up in limited
> scenarios.  I initially thought it wasn't a bug, but after some more
> thought I have some concerns in light of the issues we've had w/
> nondeterministic RDDs, eg. repartition().
>
> Say I have code like this:
>
> val cachedRDD = sc.textFile(...).cache()
> (0 until 200).par.foreach { idx => cachedRDD.doSomeAction(idx) }
>
> that is, my cached rdd is referenced by many threads concurrently before
> the RDD has been cached.
>
> When one of those tasks gets to cachedRDD.getOrCompute(), there are a few
> possible scenarios:
>
> 1) the partition has never been referenced before.
> BlockManager.getOrCompute() will say the block doesn't exist, so it will
> get recomputed (
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L360
> )
>
> 2) The partition has been fully materialized by another task, the
> blockmanagermaster on the driver already knows about it, so
> BlockManager.getOrCompute() will return a pointer to the cached block
> (perhaps on another node)
>
> 3) The partition is actively being computed by another task on the same
> executor.  Then BlockManager.getOrCompute() will not know about that other
> version of the task (it only knows about blocks that are fully
> materialized, IIUC).  But eventually, when the tasks try to actually write
> the data, they'll try to get a write lock for the block:
> https://github.com/apache/spark/blob/f09c1a36c4b0ca1fb450e274b22294dca590d8f8/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1218
> one task will get the write lock first; the other task will block on the
> other task, and then realize the block exists and just return those values.
>
> 4) The partition is actively being compute by another task on a
> *different* executor.  IIUC, Spark doesn't try to do anything to prevent
> both tasks from computing the block themselves in this case.  (To do so
> would require extra coordination in driver before writing every single
> block.)  Those locks in BlockManager and BlockInfoManager don't stop this
> case, because this is happening in entirely independent JVMs.
> There normally won't be any problem here -- if the RDD is totally
> deterministic, then you'll just end up with an extra copy of the data.  In
> a way, this is good, the cached RDD is in high demand, so having an extra
> copy isn't so bad.
> OTOH, if the RDD is non-deterministic, you've now got two copies with
> different values.  Then again, RDD cache is not resilient in general, so
> you've always got to be able to handle an RDD getting recomputed if its
> evicted from the cache.  So this should be pretty similar.
>
> On Mon, Nov 25, 2019 at 2:29 AM Weichen Xu <weichen...@databricks.com>
> wrote:
>
>> emmm, I haven't check code, but I think if an RDD is referenced in
>> several places, the correct behavior should be: when this RDD data is
>> needed, it will be computed and then cached only once, otherwise it should
>> be treated as a bug. If you are suspicious there's a race condition, you
>> could create a jira ticket.
>>
>> On Mon, Nov 25, 2019 at 12:21 PM Chang Chen <baibaic...@gmail.com> wrote:
>>
>>> Sorry I did't describe clearly,  RDD id itself is thread-safe, how about
>>> cached data?
>>>
>>> See codes from BlockManager
>>>
>>> def getOrElseUpdate(...)   = {
>>>   get[T](blockId)(classTag) match {
>>>    case ...
>>>    case _ =>                                      // 1. no data is
>>> cached.
>>>     // Need to compute the block
>>>  }
>>>  // Initially we hold no locks on this block
>>>  doPutIterator(...) match{..}
>>> }
>>>
>>> Considering  two DAGs (contain the same cached RDD ) runs
>>> simultaneously,  if both returns none  when they get same block from
>>> BlockManager(i.e. #1 above), then I guess the same data would be cached
>>> twice.
>>>
>>> If the later cache could override the previous data, and no memory is
>>> waste, then this is OK
>>>
>>> Thanks
>>> Chang
>>>
>>>
>>> Weichen Xu <weichen...@databricks.com> 于2019年11月25日周一 上午11:52写道:
>>>
>>>> Rdd id is immutable and when rdd object created, the rdd id is
>>>> generated. So why there is race condition in "rdd id" ?
>>>>
>>>> On Mon, Nov 25, 2019 at 11:31 AM Chang Chen <baibaic...@gmail.com>
>>>> wrote:
>>>>
>>>>> I am wonder the concurrent semantics for reason about the correctness.
>>>>> If the two query simultaneously run the DAGs which use the same cached
>>>>> DF\RDD,but before cache data actually happen, what will happen?
>>>>>
>>>>> By looking into code a litter, I suspect they have different BlockID
>>>>> for same Dataset which is unexpected behavior, but there is no race
>>>>> condition.
>>>>>
>>>>> However RDD id is not lazy, so there is race condition.
>>>>>
>>>>> Thanks
>>>>> Chang
>>>>>
>>>>>
>>>>> Weichen Xu <weichen...@databricks.com> 于2019年11月12日周二 下午1:22写道:
>>>>>
>>>>>> Hi Chang,
>>>>>>
>>>>>> RDD/Dataframe is immutable and lazy computed. They are thread safe.
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Tue, Nov 12, 2019 at 12:31 PM Chang Chen <baibaic...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi all
>>>>>>>
>>>>>>> I meet a case where I need cache a source RDD, and then create
>>>>>>> different DataFrame from it in different threads to accelerate query.
>>>>>>>
>>>>>>> I know that SparkSession is thread safe(
>>>>>>> https://issues.apache.org/jira/browse/SPARK-15135), but i am not
>>>>>>> sure whether RDD  is thread safe or not
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>

Reply via email to