Dear List, We have run into serious problems trying to run a larger than average number of aggregations in a GROUP BY query. Symptoms of this problem are OutOfMemory exceptions and unreasonably long processing times due to GC. The problem occurs when the following two conditions are met: - The number of groups is relatively large (growing with the size of the dataset) - The number of columns is relatively large To reproduce, paste the following gist into your spark-shell (I'm running 1.3.1): https://gist.github.com/DanielMe/9467bb0d9ad3aa639429 <https://gist.github.com/DanielMe/9467bb0d9ad3aa639429> This example is relatively small in size: - The size of the input is 10ˆ6 * 64bit = 8MB - The size of the output should be around 3 * 10ˆ8 * 64bit = 2.4GB - The aggregations themselves are just "count(1)" and hence not so difficult to compute I am running this on a cluster with three 61GB worker machines and an equally equipped master with the following spark-defaults.conf: spark.executor.memory=55g spark.driver.memory=55g The result: The workers will choke with "java.lang.OutOfMemoryError: GC overhead limit exceeded". In fact, if you play with the num_columns parameter you should observe an unreasonable amount of time spent on GC even for lower values. If you run this on a desktop machine, low values of num_columns should already lead to OOM crashes. My questions are: - What causes this behaviour? - Can/should catalyst be able to automatically optimize queries of this kind to run in reasonable time or at least not crash? - What are possible workarounds to achieve the desired effect? (Even if that means not using DataFrames but going down to the raw RDD level) Our preliminary analysis of the situation concluded that what is blowing up is in fact the hashTable in Aggregate::doExecute which will try to store the cross product of groups and columns on each partition. In fact, we managed to mitigate the issue a bit by - reducing the size of the partitions (which will make these hash tables smaller) - pre-partitioning the data using a HashPartitioner on the key (which will reduce the number of different groups per partition) The latter actually seems to be a sensible thing to do whenever num_columns*num_groups > num_rows because in this setting the amount of data we have to shuffle around after the first aggregation step is actually larger than the amount of data we had initially. Could this be something that catalyst should take into account when creating a physical plan? Thanks in advance. Kind regards,
Daniel -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Performance-Memory-Issues-When-Creating-Many-Columns-in-GROUP-BY-spark-sql-tp12313.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com.