[ 
https://issues.apache.org/jira/browse/SPARK-5782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14371394#comment-14371394
 ] 

Mark Khaitman commented on SPARK-5782:
--------------------------------------

I didn't think this would be an extreme example when I was making it... but 
this will singly handedly describe both problems that are occurring. It seems 
that the main problem is when RDDs are joined in the style of a tree as opposed 
to chaining them... 

The following example single handedly maxed out memory on a bunch of servers 
(way outside the allowable bounds on the python workers >4-5GB each ):

If anyone can confirm whether the same sort of problem occurs when doing 
something something similar to this, that would be great...

Example:

rdd1 = sc.parallelize([(i % 5, i) for i in range(1000)],32)
rdd2 = sc.parallelize([(i % 5, i) for i in range(1000)],32)
rdd3 = sc.parallelize([(i % 5, i) for i in range(1000)],32)
rdd4 = sc.parallelize([(i % 5, i) for i in range(1000)],32)
rdd5 = sc.parallelize([(i % 5, i) for i in range(1000)],32)
rdd6 = sc.parallelize([(i % 5, i) for i in range(1000)],32)

rdd_j1 = rdd1.join(rdd2)
rdd_j2 = rdd2.join(rdd3)
rdd_j3 = rdd3.join(rdd4)
rdd_j4 = rdd4.join(rdd5)
rdd_j5 = rdd5.join(rdd6)

rdd_j1_2 = rdd_j1.join(rdd_j2)
rdd_j2_3 = rdd_j2.join(rdd_j3)
rdd_j3_4 = rdd_j3.join(rdd_j4)
rdd_j4_5 = rdd_j4.join(rdd_j5)

rdd_f1 = rdd_j1_2.join(rdd_j2_3)
rdd_f2 = rdd_j3_4.join(rdd_j4_5)

rdd_final = rdd_f1.join(rdd_f2)
rdd_final.count()

> Python Worker / Pyspark Daemon Memory Issue
> -------------------------------------------
>
>                 Key: SPARK-5782
>                 URL: https://issues.apache.org/jira/browse/SPARK-5782
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Shuffle
>    Affects Versions: 1.3.0, 1.2.1, 1.2.2
>         Environment: CentOS 7, Spark Standalone
>            Reporter: Mark Khaitman
>            Priority: Blocker
>
> I'm including the Shuffle component on this, as a brief scan through the code 
> (which I'm not 100% familiar with just yet) shows a large amount of memory 
> handling in it:
> It appears that any type of join between two RDDs spawns up twice as many 
> pyspark.daemon workers compared to the default 1 task -> 1 core configuration 
> in our environment. This can become problematic in the cases where you build 
> up a tree of RDD joins, since the pyspark.daemons do not cease to exist until 
> the top level join is completed (or so it seems)... This can lead to memory 
> exhaustion by a single framework, even though is set to have a 512MB python 
> worker memory limit and few gigs of executor memory.
> Another related issue to this is that the individual python workers are not 
> supposed to even exceed that far beyond 512MB, otherwise they're supposed to 
> spill to disk.
> Some of our python workers are somehow reaching 2GB each (which when 
> multiplied by the number of cores per executor * the number of joins 
> occurring in some cases), causing the Out-of-Memory killer to step up to its 
> unfortunate job! :(
> I think with the _next_limit method in shuffle.py, if the current memory 
> usage is close to the memory limit, then a 1.05 multiplier can endlessly 
> cause more memory to be consumed by the single python worker, since the max 
> of (512 vs 511 * 1.05) would end up blowing up towards the latter of the 
> two... Shouldn't the memory limit be the absolute cap in this case?
> I've only just started looking into the code, and would definitely love to 
> contribute towards Spark, though I figured it might be quicker to resolve if 
> someone already owns the code!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to