[ 
https://issues.apache.org/jira/browse/SPARK-5782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14371661#comment-14371661
 ] 

Sean Owen commented on SPARK-5782:
----------------------------------

I think it might be a useful comparison to try this in Scala. For example even 
this slows to a crawl since it's a shuffle of a shuffle of a shuffle with no 
persistence:

{code}
val rdds = (0 until 6).map(n => sc.parallelize((0 until 1000).map(i => (i % 5, 
i)), 32))
val rdds2 = (1 until rdds.size).map(i => rdds(i-1).join(rdds(i)))
val rdds3 = (1 until rdds2.size).map(i => rdds2(i-1).join(rdds2(i)))
rdds3(0).count
{code}

I don't see it dying, but the executors are using a couple gigabytes. It 
doesn't seem crazy given the bookkeeping for the shuffle.

The answer may be you can't expect this to take just 2GB. I don't know the 
Python bit or if it should die differently or something. I don't know if it's 
an issue with a join per se, but so many concurrent a repeated, nested joins 
that keep being recomputed simultaneously.

That is maybe have a look at what a huge, but single, join does as it may be 
more realistic.

> Python Worker / Pyspark Daemon Memory Issue
> -------------------------------------------
>
>                 Key: SPARK-5782
>                 URL: https://issues.apache.org/jira/browse/SPARK-5782
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Shuffle
>    Affects Versions: 1.3.0, 1.2.1, 1.2.2
>         Environment: CentOS 7, Spark Standalone
>            Reporter: Mark Khaitman
>            Priority: Blocker
>
> I'm including the Shuffle component on this, as a brief scan through the code 
> (which I'm not 100% familiar with just yet) shows a large amount of memory 
> handling in it:
> It appears that any type of join between two RDDs spawns up twice as many 
> pyspark.daemon workers compared to the default 1 task -> 1 core configuration 
> in our environment. This can become problematic in the cases where you build 
> up a tree of RDD joins, since the pyspark.daemons do not cease to exist until 
> the top level join is completed (or so it seems)... This can lead to memory 
> exhaustion by a single framework, even though is set to have a 512MB python 
> worker memory limit and few gigs of executor memory.
> Another related issue to this is that the individual python workers are not 
> supposed to even exceed that far beyond 512MB, otherwise they're supposed to 
> spill to disk.
> Some of our python workers are somehow reaching 2GB each (which when 
> multiplied by the number of cores per executor * the number of joins 
> occurring in some cases), causing the Out-of-Memory killer to step up to its 
> unfortunate job! :(
> I think with the _next_limit method in shuffle.py, if the current memory 
> usage is close to the memory limit, then a 1.05 multiplier can endlessly 
> cause more memory to be consumed by the single python worker, since the max 
> of (512 vs 511 * 1.05) would end up blowing up towards the latter of the 
> two... Shouldn't the memory limit be the absolute cap in this case?
> I've only just started looking into the code, and would definitely love to 
> contribute towards Spark, though I figured it might be quicker to resolve if 
> someone already owns the code!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to