It is a lot of columns, but I'm not sure if that's why it is running out of
memory. In Spark SQL, we are not yet doing external aggregation when the
number of keys is large in the aggregation hashmap. We will fix this and
have external aggregation in 1.5.


On Tue, May 19, 2015 at 2:43 AM, daniel.mescheder <
daniel.mesche...@realimpactanalytics.com> wrote:

> Dear List,
>
> We have run into serious problems trying to run a larger than average
> number of aggregations in a GROUP BY query. Symptoms of this problem are
> OutOfMemory exceptions and unreasonably long processing times due to
> GC. The problem occurs when the following two conditions are met:
>
>  - The number of groups is relatively large (growing with the size of the
> dataset)
>  - The number of columns is relatively large
>
> To reproduce, paste the following gist into your spark-shell (I'm running
> 1.3.1): https://gist.github.com/DanielMe/9467bb0d9ad3aa639429
>
> This example is relatively small in size:
>
>  - The size of the input is 10ˆ6 * 64bit = 8MB
>  - The size of the output should be around 3 * 10ˆ8 * 64bit = 2.4GB
>  - The aggregations themselves are just "count(1)" and hence not so
> difficult to compute
>
> I am running this on a cluster with three 61GB worker machines and an
> equally equipped master with the following spark-defaults.conf:
>
> spark.executor.memory=55g
> spark.driver.memory=55g
>
> The result: The workers will choke with "java.lang.OutOfMemoryError: GC
> overhead limit exceeded". In fact, if you play with the num_columns
> parameter you should observe an unreasonable amount of time spent on GC
> even for lower values. If you run this on a desktop machine, low values of
> num_columns should already lead to OOM crashes.
>
> My questions are:
>
>  - What causes this behaviour?
>  - Can/should catalyst be able to automatically optimize queries of this
> kind to run in reasonable time or at least not crash?
>  - What are  possible workarounds to achieve the desired effect? (Even if
> that means not using DataFrames but going down to the raw RDD level)
>
> Our preliminary analysis of the situation concluded that what is blowing
> up is in fact the hashTable in Aggregate::doExecute which will try to store
> the cross product of groups and columns on each partition. In fact, we
> managed to mitigate the issue a bit by
>
>  - reducing the size of the partitions (which will make these hash tables
> smaller)
>  - pre-partitioning the data using a HashPartitioner on the key (which
> will reduce the number of different groups per partition)
>
> The latter actually seems to be a sensible thing to do whenever
> num_columns*num_groups > num_rows because in this setting the amount of
> data we have to shuffle around after the first aggregation step is actually
> larger than the amount of data we had initially. Could this be something
> that catalyst should take into account when creating a physical plan?
>
> Thanks in advance.
>
> Kind regards,
>
>
> Daniel
>
>
> ------------------------------
> View this message in context: Performance & Memory Issues When Creating
> Many Columns in GROUP BY (spark-sql)
> <http://apache-spark-developers-list.1001551.n3.nabble.com/Performance-Memory-Issues-When-Creating-Many-Columns-in-GROUP-BY-spark-sql-tp12313.html>
> Sent from the Apache Spark Developers List mailing list archive
> <http://apache-spark-developers-list.1001551.n3.nabble.com/> at
> Nabble.com.
>

Reply via email to