[ 
https://issues.apache.org/jira/browse/SPARK-5782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14372039#comment-14372039
 ] 

Mark Khaitman commented on SPARK-5782:
--------------------------------------

This might also be a problem specific to re-using python workers... If the 
daemon reconnects to a worker, it never inspects it's current memory usage upon 
re-connecting. Then we go into a mergeCombiner case (for example) and if there 
is less than 1000 (batch size default) items to be merged, nothing will get 
spilled to disk, nor will anything be garbage collected.

So from line 293 in shuffle.py for example:

            c += 1
            if c % batch == 0 and get_used_memory() > self.memory_limit:
                self._spill()
                self._partitioned_mergeCombiners(iterator, self._next_limit())
                break

c starts at 0 however, so if there are always less than 1000 items to be merged 
in the task, the memory can just keep growing upon reconnecting to the worker 
to begin the next task...? Hope I'm not going on a tangent here.

> Python Worker / Pyspark Daemon Memory Issue
> -------------------------------------------
>
>                 Key: SPARK-5782
>                 URL: https://issues.apache.org/jira/browse/SPARK-5782
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Shuffle
>    Affects Versions: 1.3.0, 1.2.1, 1.2.2
>         Environment: CentOS 7, Spark Standalone
>            Reporter: Mark Khaitman
>            Priority: Blocker
>
> I'm including the Shuffle component on this, as a brief scan through the code 
> (which I'm not 100% familiar with just yet) shows a large amount of memory 
> handling in it:
> It appears that any type of join between two RDDs spawns up twice as many 
> pyspark.daemon workers compared to the default 1 task -> 1 core configuration 
> in our environment. This can become problematic in the cases where you build 
> up a tree of RDD joins, since the pyspark.daemons do not cease to exist until 
> the top level join is completed (or so it seems)... This can lead to memory 
> exhaustion by a single framework, even though is set to have a 512MB python 
> worker memory limit and few gigs of executor memory.
> Another related issue to this is that the individual python workers are not 
> supposed to even exceed that far beyond 512MB, otherwise they're supposed to 
> spill to disk.
> Some of our python workers are somehow reaching 2GB each (which when 
> multiplied by the number of cores per executor * the number of joins 
> occurring in some cases), causing the Out-of-Memory killer to step up to its 
> unfortunate job! :(
> I think with the _next_limit method in shuffle.py, if the current memory 
> usage is close to the memory limit, then a 1.05 multiplier can endlessly 
> cause more memory to be consumed by the single python worker, since the max 
> of (512 vs 511 * 1.05) would end up blowing up towards the latter of the 
> two... Shouldn't the memory limit be the absolute cap in this case?
> I've only just started looking into the code, and would definitely love to 
> contribute towards Spark, though I figured it might be quicker to resolve if 
> someone already owns the code!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to