Dear List,
We have run into serious problems trying to run a larger than average number of 
aggregations in a GROUP BY query. Symptoms of this problem are OutOfMemory 
exceptions and unreasonably long processing times due to GC. The problem occurs 
when the following two conditions are met:
 - The number of groups is relatively large (growing with the size of the 
dataset)
 - The number of columns is relatively large
To reproduce, paste the following gist into your spark-shell (I'm running 
1.3.1): https://gist.github.com/DanielMe/9467bb0d9ad3aa639429 
<https://gist.github.com/DanielMe/9467bb0d9ad3aa639429>
This example is relatively small in size:
 - The size of the input is 10ˆ6 * 64bit = 8MB
 - The size of the output should be around 3 * 10ˆ8 * 64bit = 2.4GB
 - The aggregations themselves are just "count(1)" and hence not so difficult 
to compute
I am running this on a cluster with three 61GB worker machines and an equally 
equipped master with the following spark-defaults.conf:
spark.executor.memory=55g
spark.driver.memory=55g
The result: The workers will choke with "java.lang.OutOfMemoryError: GC 
overhead limit exceeded". In fact, if you play with the num_columns parameter 
you should observe an unreasonable amount of time spent on GC even for lower 
values. If you run this on a desktop machine, low values of num_columns should 
already lead to OOM crashes.
My questions are:
 - What causes this behaviour?
 - Can/should catalyst be able to automatically optimize queries of this kind 
to run in reasonable time or at least not crash?
 - What are  possible workarounds to achieve the desired effect? (Even if that 
means not using DataFrames but going down to the raw RDD level)
Our preliminary analysis of the situation concluded that what is blowing up is 
in fact the hashTable in Aggregate::doExecute which will try to store the cross 
product of groups and columns on each partition. In fact, we managed to 
mitigate the issue a bit by
 - reducing the size of the partitions (which will make these hash tables 
smaller)
 - pre-partitioning the data using a HashPartitioner on the key (which will 
reduce the number of different groups per partition)
The latter actually seems to be a sensible thing to do whenever 
num_columns*num_groups > num_rows because in this setting the amount of data we 
have to shuffle around after the first aggregation step is actually larger than 
the amount of data we had initially. Could this be something that catalyst 
should take into account when creating a physical plan?
Thanks in advance.
Kind regards,

Daniel 





--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Performance-Memory-Issues-When-Creating-Many-Columns-in-GROUP-BY-spark-sql-tp12313.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Reply via email to