[ 
https://issues.apache.org/jira/browse/SPARK-21682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Williams updated SPARK-21682:
----------------------------------
    Description: 
h3. Summary

* {{sc.parallelize(1 to 100000, 100000).cache.count}} causes a driver GC stall 
midway through on every configuration and version I've tried in 2.x.
* It runs fine with no Full GCs as of 1.6.3
* I think that {{internal.metrics.updatedBlockStatuses}} is the culprit, and 
breaks a contract about what big-O sizes accumulators' values can be:
** they are each of size O(P), where P is the number of partitions in a cached 
RDD
** ⇒ the driver must process O(P²) data from {{TaskEnd}} events, instead of O(P)
** ⇒ the driver also must process O(P*E) work every 10s from 
{{ExecutorMetricsUpdates}} (where E is the number of executors; cf. 
{{spark.executor.heartbeatInterval}})
* when operating on a 100k-partition cached RDD, the driver enters a GC loop 
due to all the allocations it must do to process {{ExecutorMetricsUpdate}} and 
{{TaskEnd}} events with {{updatedBlockStatuses}} attached
* this metric should be disabled, or some ability to blacklist it from the 
command-line should be added.
* [SPARK-20084|https://issues.apache.org/jira/browse/SPARK-20084] addressed one 
part of this - the event-log size had exploded - but the root problem still 
exists / is worse

h3. {{count}} a 100k-partition RDD: works fine without {{.cache}}

In Spark 2.2.0 or 2.1.1:

{code}
spark-shell --conf spark.driver.extraJavaOptions="-XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps -verbose:gc"
scala> val rdd = sc.parallelize(1 to 100000, 100000)
scala> rdd.count
{code}

In YARN and local modes, this finishes in ~20s seconds with ~20 partial GCs 
logged, all taking under 0.1s ([example 
output|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-not-cached-works-fine]);
 all is well!

h3. {{count}} a 100k-partition cached RDD: GC-dies

If we {{cache}} the RDD first, the same {{count}} job quickly sends the driver 
into a GC death spiral: full GC's start after a few thousand tasks and increase 
in frequency and length until they last minutes / become continuous (and, in 
YARN, the driver loses contact with any executors).

Example outputs: 
[local|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-cached-crashes],
 
[YARN|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-yarn-mode-cached-rdd-dies].

The YARN example removes any confusion about whether the storing of the blocks 
is causing memory pressure on the driver; the driver is basically doing no work 
except receiving executor updates and events, and yet it becomes overloaded. 

h3. Can't effectively throw driver heap at the problem

I've tested with 1GB, 10GB, and 20GB heaps, and the larger heaps do what we'd 
expect: delay the first Full GC, and make Full GCs longer when they happen. 

I don't have a clear sense on whether the onset is linear or quadratic (i.e. do 
I get twice as far into the job before the first Full GC with a 20GB as with a 
10GB heap, or only sqrt(2) times as far?).

h3. Mostly memory pressure, not OOMs

An interesting note is that I'm rarely seeing OOMs as a result of this, even on 
small heaps.

I think this is consistent with the idea that all this data is being 
immediately discarded by the driver, as opposed to kept around to serve web UIs 
or somesuch.

h3. Eliminating {{ExecutorMetricsUpdate}}'s doesn't seem to help
Interestingly, setting large values of {{spark.executor.heartbeatInterval}} 
doesn't seem to mitigate the problem; GC-stall sets in at about the same point 
in the {{count}} job.

This implies that, in this example, the {{TaskEnd}} events are doing most or 
all of the damage.

h3. CMS helps but doesn't solve the problem
In some rough testing, I saw the {{count}} get about twice as far before dying 
when using the CMS collector.

h3. What bandwidth do we expect the driver to process events at?

IIUC, every 10s the driver gets O(T) (~100k?) block updates from each of ~500 
executors, and allocating objects for these updates is pushing it over a 
tipping point where it can't keep up. 

I don't know how to get good numbers on how much data the driver is processing; 
does anyone?

There should be monitoring/tests in place to catch a regression where the 
driver begins writing 1000x the data to the event-log, or having to process 
1000x the data over the event bus

h3. Should this accumulator be disabled altogether?

Seems like yes, to me. Making the driver churn through all this useless data 
seems unreasonable (short of a major refactoring of the driver to... offload 
things to threads?).

  was:
h3. Summary

* 
* {{internal.metrics.updatedBlockStatuses}} breaks a contract about what big-O 
sizes accumulators' values can be:
** they are each of size O(P), where P is the number of partitions in a cached 
RDD
** ⇒ the driver must process O(P²) data from {{TaskEnd}} events, instead of O(P)
** ⇒ the driver also must process O(P*E) work every 10s from 
{{ExecutorMetricsUpdates}} (where E is the number of executors; cf. 
{{spark.executor.heartbeatInterval}})
* when operating on a 100k-partition cached RDD, the driver enters a GC loop 
due to all the allocations it must do to process {{ExecutorMetricsUpdate}} and 
{{TaskEnd}} events with {{updatedBlockStatuses}} attached
* this metric should be disabled, or some ability to blacklist it from the 
command-line should be added.
* [SPARK-20084|https://issues.apache.org/jira/browse/SPARK-20084] addressed one 
part of this - the event-log size had exploded - but the root problem still 
exists / is worse

h3. {{count}} a 100k-partition RDD: works fine without {{.cache}}

In Spark 2.2.0 or 2.1.1:

{code}
spark-shell --conf spark.driver.extraJavaOptions="-XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps -verbose:gc"
scala> val rdd = sc.parallelize(1 to 100000, 100000)
scala> rdd.count
{code}

In YARN and local modes, this finishes in ~20s seconds with ~20 partial GCs 
logged, all taking under 0.1s ([example 
output|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-not-cached-works-fine]);
 all is well!

h3. {{count}} a 100k-partition cached RDD: GC-dies

If we {{cache}} the RDD first, the same {{count}} job quickly sends the driver 
into a GC death spiral: full GC's start after a few thousand tasks and increase 
in frequency and length until they last minutes / become continuous (and, in 
YARN, the driver loses contact with any executors).

Example outputs: 
[local|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-cached-crashes],
 
[YARN|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-yarn-mode-cached-rdd-dies].

The YARN example removes any confusion about whether the storing of the blocks 
is causing memory pressure on the driver; the driver is basically doing no work 
except receiving executor updates and events, and yet it becomes overloaded. 

h3. Can't effectively throw driver heap at the problem

I've tested with 1GB, 10GB, and 20GB heaps, and the larger heaps do what we'd 
expect: delay the first Full GC, and make Full GCs longer when they happen. 

I don't have a clear sense on whether the onset is linear or quadratic (i.e. do 
I get twice as far into the job before the first Full GC with a 20GB as with a 
10GB heap, or only sqrt(2) times as far?).

h3. Mostly memory pressure, not OOMs

An interesting note is that I'm rarely seeing OOMs as a result of this, even on 
small heaps.

I think this is consistent with the idea that all this data is being 
immediately discarded by the driver, as opposed to kept around to serve web UIs 
or somesuch.

h3. Eliminating {{ExecutorMetricsUpdate}}'s doesn't seem to help
Interestingly, setting large values of {{spark.executor.heartbeatInterval}} 
doesn't seem to mitigate the problem; GC-stall sets in at about the same point 
in the {{count}} job.

This implies that, in this example, the {{TaskEnd}} events are doing most or 
all of the damage.

h3. CMS helps but doesn't solve the problem
In some rough testing, I saw the {{count}} get about twice as far before dying 
when using the CMS collector.

h3. What bandwidth do we expect the driver to process events at?

IIUC, every 10s the driver gets O(T) (~100k?) block updates from each of ~500 
executors, and allocating objects for these updates is pushing it over a 
tipping point where it can't keep up. 

I don't know how to get good numbers on how much data the driver is processing; 
does anyone?

There should be monitoring/tests in place to catch a regression where the 
driver begins writing 1000x the data to the event-log, or having to process 
1000x the data over the event bus

h3. Should this accumulator be disabled altogether?

Seems like yes, to me. Making the driver churn through all this useless data 
seems unreasonable (short of a major refactoring of the driver to... offload 
things to threads?).


> Caching 100k-task RDD GC-kills driver (due to updatedBlockStatuses?)
> --------------------------------------------------------------------
>
>                 Key: SPARK-21682
>                 URL: https://issues.apache.org/jira/browse/SPARK-21682
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.0.2, 2.1.1, 2.2.0
>            Reporter: Ryan Williams
>
> h3. Summary
> * {{sc.parallelize(1 to 100000, 100000).cache.count}} causes a driver GC 
> stall midway through on every configuration and version I've tried in 2.x.
> * It runs fine with no Full GCs as of 1.6.3
> * I think that {{internal.metrics.updatedBlockStatuses}} is the culprit, and 
> breaks a contract about what big-O sizes accumulators' values can be:
> ** they are each of size O(P), where P is the number of partitions in a 
> cached RDD
> ** ⇒ the driver must process O(P²) data from {{TaskEnd}} events, instead of 
> O(P)
> ** ⇒ the driver also must process O(P*E) work every 10s from 
> {{ExecutorMetricsUpdates}} (where E is the number of executors; cf. 
> {{spark.executor.heartbeatInterval}})
> * when operating on a 100k-partition cached RDD, the driver enters a GC loop 
> due to all the allocations it must do to process {{ExecutorMetricsUpdate}} 
> and {{TaskEnd}} events with {{updatedBlockStatuses}} attached
> * this metric should be disabled, or some ability to blacklist it from the 
> command-line should be added.
> * [SPARK-20084|https://issues.apache.org/jira/browse/SPARK-20084] addressed 
> one part of this - the event-log size had exploded - but the root problem 
> still exists / is worse
> h3. {{count}} a 100k-partition RDD: works fine without {{.cache}}
> In Spark 2.2.0 or 2.1.1:
> {code}
> spark-shell --conf spark.driver.extraJavaOptions="-XX:+PrintGCDetails 
> -XX:+PrintGCTimeStamps -verbose:gc"
> scala> val rdd = sc.parallelize(1 to 100000, 100000)
> scala> rdd.count
> {code}
> In YARN and local modes, this finishes in ~20s seconds with ~20 partial GCs 
> logged, all taking under 0.1s ([example 
> output|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-not-cached-works-fine]);
>  all is well!
> h3. {{count}} a 100k-partition cached RDD: GC-dies
> If we {{cache}} the RDD first, the same {{count}} job quickly sends the 
> driver into a GC death spiral: full GC's start after a few thousand tasks and 
> increase in frequency and length until they last minutes / become continuous 
> (and, in YARN, the driver loses contact with any executors).
> Example outputs: 
> [local|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-cached-crashes],
>  
> [YARN|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-yarn-mode-cached-rdd-dies].
> The YARN example removes any confusion about whether the storing of the 
> blocks is causing memory pressure on the driver; the driver is basically 
> doing no work except receiving executor updates and events, and yet it 
> becomes overloaded. 
> h3. Can't effectively throw driver heap at the problem
> I've tested with 1GB, 10GB, and 20GB heaps, and the larger heaps do what we'd 
> expect: delay the first Full GC, and make Full GCs longer when they happen. 
> I don't have a clear sense on whether the onset is linear or quadratic (i.e. 
> do I get twice as far into the job before the first Full GC with a 20GB as 
> with a 10GB heap, or only sqrt(2) times as far?).
> h3. Mostly memory pressure, not OOMs
> An interesting note is that I'm rarely seeing OOMs as a result of this, even 
> on small heaps.
> I think this is consistent with the idea that all this data is being 
> immediately discarded by the driver, as opposed to kept around to serve web 
> UIs or somesuch.
> h3. Eliminating {{ExecutorMetricsUpdate}}'s doesn't seem to help
> Interestingly, setting large values of {{spark.executor.heartbeatInterval}} 
> doesn't seem to mitigate the problem; GC-stall sets in at about the same 
> point in the {{count}} job.
> This implies that, in this example, the {{TaskEnd}} events are doing most or 
> all of the damage.
> h3. CMS helps but doesn't solve the problem
> In some rough testing, I saw the {{count}} get about twice as far before 
> dying when using the CMS collector.
> h3. What bandwidth do we expect the driver to process events at?
> IIUC, every 10s the driver gets O(T) (~100k?) block updates from each of ~500 
> executors, and allocating objects for these updates is pushing it over a 
> tipping point where it can't keep up. 
> I don't know how to get good numbers on how much data the driver is 
> processing; does anyone?
> There should be monitoring/tests in place to catch a regression where the 
> driver begins writing 1000x the data to the event-log, or having to process 
> 1000x the data over the event bus
> h3. Should this accumulator be disabled altogether?
> Seems like yes, to me. Making the driver churn through all this useless data 
> seems unreasonable (short of a major refactoring of the driver to... offload 
> things to threads?).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to