Thank you Imran I will check whether there is memory waste or not
Imran Rashid <iras...@cloudera.com> 于2019年11月26日周二 上午1:30写道: > I think Chang is right, but I also think this only comes up in limited > scenarios. I initially thought it wasn't a bug, but after some more > thought I have some concerns in light of the issues we've had w/ > nondeterministic RDDs, eg. repartition(). > > Say I have code like this: > > val cachedRDD = sc.textFile(...).cache() > (0 until 200).par.foreach { idx => cachedRDD.doSomeAction(idx) } > > that is, my cached rdd is referenced by many threads concurrently before > the RDD has been cached. > > When one of those tasks gets to cachedRDD.getOrCompute(), there are a few > possible scenarios: > > 1) the partition has never been referenced before. > BlockManager.getOrCompute() will say the block doesn't exist, so it will > get recomputed ( > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L360 > ) > > 2) The partition has been fully materialized by another task, the > blockmanagermaster on the driver already knows about it, so > BlockManager.getOrCompute() will return a pointer to the cached block > (perhaps on another node) > > 3) The partition is actively being computed by another task on the same > executor. Then BlockManager.getOrCompute() will not know about that other > version of the task (it only knows about blocks that are fully > materialized, IIUC). But eventually, when the tasks try to actually write > the data, they'll try to get a write lock for the block: > https://github.com/apache/spark/blob/f09c1a36c4b0ca1fb450e274b22294dca590d8f8/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1218 > one task will get the write lock first; the other task will block on the > other task, and then realize the block exists and just return those values. > > 4) The partition is actively being compute by another task on a > *different* executor. IIUC, Spark doesn't try to do anything to prevent > both tasks from computing the block themselves in this case. (To do so > would require extra coordination in driver before writing every single > block.) Those locks in BlockManager and BlockInfoManager don't stop this > case, because this is happening in entirely independent JVMs. > There normally won't be any problem here -- if the RDD is totally > deterministic, then you'll just end up with an extra copy of the data. In > a way, this is good, the cached RDD is in high demand, so having an extra > copy isn't so bad. > OTOH, if the RDD is non-deterministic, you've now got two copies with > different values. Then again, RDD cache is not resilient in general, so > you've always got to be able to handle an RDD getting recomputed if its > evicted from the cache. So this should be pretty similar. > > On Mon, Nov 25, 2019 at 2:29 AM Weichen Xu <weichen...@databricks.com> > wrote: > >> emmm, I haven't check code, but I think if an RDD is referenced in >> several places, the correct behavior should be: when this RDD data is >> needed, it will be computed and then cached only once, otherwise it should >> be treated as a bug. If you are suspicious there's a race condition, you >> could create a jira ticket. >> >> On Mon, Nov 25, 2019 at 12:21 PM Chang Chen <baibaic...@gmail.com> wrote: >> >>> Sorry I did't describe clearly, RDD id itself is thread-safe, how about >>> cached data? >>> >>> See codes from BlockManager >>> >>> def getOrElseUpdate(...) = { >>> get[T](blockId)(classTag) match { >>> case ... >>> case _ => // 1. no data is >>> cached. >>> // Need to compute the block >>> } >>> // Initially we hold no locks on this block >>> doPutIterator(...) match{..} >>> } >>> >>> Considering two DAGs (contain the same cached RDD ) runs >>> simultaneously, if both returns none when they get same block from >>> BlockManager(i.e. #1 above), then I guess the same data would be cached >>> twice. >>> >>> If the later cache could override the previous data, and no memory is >>> waste, then this is OK >>> >>> Thanks >>> Chang >>> >>> >>> Weichen Xu <weichen...@databricks.com> 于2019年11月25日周一 上午11:52写道: >>> >>>> Rdd id is immutable and when rdd object created, the rdd id is >>>> generated. So why there is race condition in "rdd id" ? >>>> >>>> On Mon, Nov 25, 2019 at 11:31 AM Chang Chen <baibaic...@gmail.com> >>>> wrote: >>>> >>>>> I am wonder the concurrent semantics for reason about the correctness. >>>>> If the two query simultaneously run the DAGs which use the same cached >>>>> DF\RDD,but before cache data actually happen, what will happen? >>>>> >>>>> By looking into code a litter, I suspect they have different BlockID >>>>> for same Dataset which is unexpected behavior, but there is no race >>>>> condition. >>>>> >>>>> However RDD id is not lazy, so there is race condition. >>>>> >>>>> Thanks >>>>> Chang >>>>> >>>>> >>>>> Weichen Xu <weichen...@databricks.com> 于2019年11月12日周二 下午1:22写道: >>>>> >>>>>> Hi Chang, >>>>>> >>>>>> RDD/Dataframe is immutable and lazy computed. They are thread safe. >>>>>> >>>>>> Thanks! >>>>>> >>>>>> On Tue, Nov 12, 2019 at 12:31 PM Chang Chen <baibaic...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi all >>>>>>> >>>>>>> I meet a case where I need cache a source RDD, and then create >>>>>>> different DataFrame from it in different threads to accelerate query. >>>>>>> >>>>>>> I know that SparkSession is thread safe( >>>>>>> https://issues.apache.org/jira/browse/SPARK-15135), but i am not >>>>>>> sure whether RDD is thread safe or not >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>