[
https://issues.apache.org/jira/browse/SPARK-5782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14371394#comment-14371394
]
Mark Khaitman commented on SPARK-5782:
--------------------------------------
I didn't think this would be an extreme example when I was making it... but
this will singly handedly describe both problems that are occurring. It seems
that the main problem is when RDDs are joined in the style of a tree as opposed
to chaining them...
The following example single handedly maxed out memory on a bunch of servers
(way outside the allowable bounds on the python workers >4-5GB each ):
If anyone can confirm whether the same sort of problem occurs when doing
something something similar to this, that would be great...
Example:
rdd1 = sc.parallelize([(i % 5, i) for i in range(1000)],32)
rdd2 = sc.parallelize([(i % 5, i) for i in range(1000)],32)
rdd3 = sc.parallelize([(i % 5, i) for i in range(1000)],32)
rdd4 = sc.parallelize([(i % 5, i) for i in range(1000)],32)
rdd5 = sc.parallelize([(i % 5, i) for i in range(1000)],32)
rdd6 = sc.parallelize([(i % 5, i) for i in range(1000)],32)
rdd_j1 = rdd1.join(rdd2)
rdd_j2 = rdd2.join(rdd3)
rdd_j3 = rdd3.join(rdd4)
rdd_j4 = rdd4.join(rdd5)
rdd_j5 = rdd5.join(rdd6)
rdd_j1_2 = rdd_j1.join(rdd_j2)
rdd_j2_3 = rdd_j2.join(rdd_j3)
rdd_j3_4 = rdd_j3.join(rdd_j4)
rdd_j4_5 = rdd_j4.join(rdd_j5)
rdd_f1 = rdd_j1_2.join(rdd_j2_3)
rdd_f2 = rdd_j3_4.join(rdd_j4_5)
rdd_final = rdd_f1.join(rdd_f2)
rdd_final.count()
> Python Worker / Pyspark Daemon Memory Issue
> -------------------------------------------
>
> Key: SPARK-5782
> URL: https://issues.apache.org/jira/browse/SPARK-5782
> Project: Spark
> Issue Type: Bug
> Components: PySpark, Shuffle
> Affects Versions: 1.3.0, 1.2.1, 1.2.2
> Environment: CentOS 7, Spark Standalone
> Reporter: Mark Khaitman
> Priority: Blocker
>
> I'm including the Shuffle component on this, as a brief scan through the code
> (which I'm not 100% familiar with just yet) shows a large amount of memory
> handling in it:
> It appears that any type of join between two RDDs spawns up twice as many
> pyspark.daemon workers compared to the default 1 task -> 1 core configuration
> in our environment. This can become problematic in the cases where you build
> up a tree of RDD joins, since the pyspark.daemons do not cease to exist until
> the top level join is completed (or so it seems)... This can lead to memory
> exhaustion by a single framework, even though is set to have a 512MB python
> worker memory limit and few gigs of executor memory.
> Another related issue to this is that the individual python workers are not
> supposed to even exceed that far beyond 512MB, otherwise they're supposed to
> spill to disk.
> Some of our python workers are somehow reaching 2GB each (which when
> multiplied by the number of cores per executor * the number of joins
> occurring in some cases), causing the Out-of-Memory killer to step up to its
> unfortunate job! :(
> I think with the _next_limit method in shuffle.py, if the current memory
> usage is close to the memory limit, then a 1.05 multiplier can endlessly
> cause more memory to be consumed by the single python worker, since the max
> of (512 vs 511 * 1.05) would end up blowing up towards the latter of the
> two... Shouldn't the memory limit be the absolute cap in this case?
> I've only just started looking into the code, and would definitely love to
> contribute towards Spark, though I figured it might be quicker to resolve if
> someone already owns the code!
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]