[
https://issues.apache.org/jira/browse/SPARK-21682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16120537#comment-16120537
]
Ryan Williams commented on SPARK-21682:
---------------------------------------
bq. But do you really need to create so many partitions? Could you use
`coalesce` to reduce the number of partitions?
In my real app, that is exactly what I am trying to do! I am {{count}}'ing the
records to determine how many partitions I should {{coalesce}} to, and I am
{{cache}}'ing to avoid computing the RDD twice (once for the {{count}}, once
for the repartition) because that would be very expensive.
More detail: I have a large RDD (~100BN records, 100k partitions) that I am
filtering down to what is likely to be <1MM records (but might not be!). ~1MM
records/partition is a good size for this data, both before and after the
filtering/repartitioning, based on what I know about it (and have already
observed in this app when I try to put much more than that on a single
partition, and see GC problems).
If that sounds crazy to you, please tell me.
----
Otherwise, can we instead talk about:
* should Spark fall over at 100k partitions?
* this worked fine in 1.6.3 but afaict it's impossible to {{count}} a
100k-partition cached RDD in the 2.x line. Is that a problem?
* does anyone know how much work the driver is doing before and after e.g. a
large accumulator is added, and how much the "maximum number of partitions
before GC-stall-death" ceiling is lowered by a given change?
> Caching 100k-task RDD GC-kills driver (due to updatedBlockStatuses?)
> --------------------------------------------------------------------
>
> Key: SPARK-21682
> URL: https://issues.apache.org/jira/browse/SPARK-21682
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.0.2, 2.1.1, 2.2.0
> Reporter: Ryan Williams
>
> h3. Summary
> * {{sc.parallelize(1 to 100000, 100000).cache.count}} causes a driver GC
> stall midway through on every configuration and version I've tried in 2.x.
> * It runs fine with no Full GCs as of 1.6.3
> * I think that {{internal.metrics.updatedBlockStatuses}} is the culprit, and
> breaks a contract about what big-O sizes accumulators' values can be:
> ** they are each of size O(P), where P is the number of partitions in a
> cached RDD
> ** ⇒ the driver must process O(P²) data from {{TaskEnd}} events, instead of
> O(P)
> ** ⇒ the driver also must process O(P*E) work every 10s from
> {{ExecutorMetricsUpdates}} (where E is the number of executors; cf.
> {{spark.executor.heartbeatInterval}})
> * when operating on a 100k-partition cached RDD, the driver enters a GC loop
> due to all the allocations it must do to process {{ExecutorMetricsUpdate}}
> and {{TaskEnd}} events with {{updatedBlockStatuses}} attached
> * this metric should be disabled, or some ability to blacklist it from the
> command-line should be added.
> * [SPARK-20084|https://issues.apache.org/jira/browse/SPARK-20084] addressed
> one part of this - the event-log size had exploded - but the root problem
> still exists / is worse
> h3. {{count}} a 100k-partition RDD: works fine without {{.cache}}
> In Spark 2.2.0 or 2.1.1:
> {code}
> spark-shell --conf spark.driver.extraJavaOptions="-XX:+PrintGCDetails
> -XX:+PrintGCTimeStamps -verbose:gc"
> scala> val rdd = sc.parallelize(1 to 100000, 100000)
> scala> rdd.count
> {code}
> In YARN and local modes, this finishes in ~20s seconds with ~20 partial GCs
> logged, all taking under 0.1s ([example
> output|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-not-cached-works-fine]);
> all is well!
> h3. {{count}} a 100k-partition cached RDD: GC-dies
> If we {{cache}} the RDD first, the same {{count}} job quickly sends the
> driver into a GC death spiral: full GC's start after a few thousand tasks and
> increase in frequency and length until they last minutes / become continuous
> (and, in YARN, the driver loses contact with any executors).
> Example outputs:
> [local|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-cached-crashes],
>
> [YARN|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-yarn-mode-cached-rdd-dies].
> The YARN example removes any confusion about whether the storing of the
> blocks is causing memory pressure on the driver; the driver is basically
> doing no work except receiving executor updates and events, and yet it
> becomes overloaded.
> h3. Can't effectively throw driver heap at the problem
> I've tested with 1GB, 10GB, and 20GB heaps, and the larger heaps do what we'd
> expect: delay the first Full GC, and make Full GCs longer when they happen.
> I don't have a clear sense on whether the onset is linear or quadratic (i.e.
> do I get twice as far into the job before the first Full GC with a 20GB as
> with a 10GB heap, or only sqrt(2) times as far?).
> h3. Mostly memory pressure, not OOMs
> An interesting note is that I'm rarely seeing OOMs as a result of this, even
> on small heaps.
> I think this is consistent with the idea that all this data is being
> immediately discarded by the driver, as opposed to kept around to serve web
> UIs or somesuch.
> h3. Eliminating {{ExecutorMetricsUpdate}}'s doesn't seem to help
> Interestingly, setting large values of {{spark.executor.heartbeatInterval}}
> doesn't seem to mitigate the problem; GC-stall sets in at about the same
> point in the {{count}} job.
> This implies that, in this example, the {{TaskEnd}} events are doing most or
> all of the damage.
> h3. CMS helps but doesn't solve the problem
> In some rough testing, I saw the {{count}} get about twice as far before
> dying when using the CMS collector.
> h3. What bandwidth do we expect the driver to process events at?
> IIUC, every 10s the driver gets O(T) (~100k?) block updates from each of ~500
> executors, and allocating objects for these updates is pushing it over a
> tipping point where it can't keep up.
> I don't know how to get good numbers on how much data the driver is
> processing; does anyone?
> There should be monitoring/tests in place to catch a regression where the
> driver begins writing 1000x the data to the event-log, or having to process
> 1000x the data over the event bus
> h3. Should this accumulator be disabled altogether?
> Seems like yes, to me. Making the driver churn through all this useless data
> seems unreasonable (short of a major refactoring of the driver to... offload
> things to threads?).
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]