Thanks for the pointer.

I have been reading the code and trying to understand how to create an 
efficient aggregate function but I must be missing something because it seems 
to me that creating any kind of aggregation function which uses non primitive 
types would have a high overhead.
Consider the following simple example: We have a column which contains the 
numbers 1-10. We want to calculate a histogram for these values.
In an equivalent to the hand written code in
 The trivial solution (a student solution) would look something like this:
var hist = new int[10]
for (v in col) {
  hist[v] += 1

The problem is that as far as I understand, spark wouldn’t create it this way.
Instead I would need to do something like “update hist in position v by +1” 
which in practice would mean the array will be copied at least 3 times:
First it will be copied from its unsafe implementation to a scala sequence 
(even worse, since arrays always use offsets, the copying would have to be done 
element by element instead of a single memcopy), then since the array is 
immutable, we will have to create a new version of it (by copying and changing 
just the relevant element) and then we copy it back to the unsafe version.

I tried to look at examples in the code which have an intermediate buffer which 
is not a simple structure. Basically, I see two such types of examples: 
distinct operations (which, if I understand correctly, somehow internally has a 
hashmap to contain the distinct values but I can’t find the code which 
generates it) and collect functions (collect_list, collect_set) which do not 
appear to do any code generation BUT define their own buffer as they will (the 
buffer is NOT of a regular type).

So I was wondering, what is the right way to implement an efficient logic as 
I see two options:

1.       Using UDAF – In this case I would define the buffer to have 10 integer 
fields and manipulate each. This solution suffers from two problems: First it 
is slow (especially if there are other aggregations which are using spark sql 
expressions) and second it is limited (I can’t change the size of the array in 
the middle. For example, assuming the above histogram is made on a groupby and 
I know beforehand that in 99% of the cases there are 3 values but in 1% of the 
cases there are 100 values. If I would have used an array I would just convert 
to a bigger array the first time I see a value from the 100)

2.       Implement similar to collect_list and collect_set. If I look at the 
documentation for collect class, this uses the slower sort based aggregation 
path because the number of elmenets can not be determined in advance even 
though in the basic case above, we do know the size. (although I am not sure 
how its performance would compare to the UDAF option). This appears to be 
simpler than UDAF because I can use the data types I want directly, however I 
can’t figure out how the code generation is done as I do not see the relevant 
functions when doing debugCodegen on the result
I also believe there should be a third option by actually implementing the 
proper expression, but I have no idea how to do that.

Can anyone point me in the right direction?

From: rxin [via Apache Spark Developers List] 
Sent: Monday, September 19, 2016 12:23 AM
To: Mendelson, Assaf
Subject: Re: Memory usage for spark types

Take a look at UnsafeArrayData and UnsafeMapData.

On Sun, Sep 18, 2016 at 9:06 AM, assaf.mendelson <[hidden 
email]</user/SendEmail.jtp?type=node&node=18985&i=0>> wrote:
I am trying to understand how spark types are kept in memory and accessed.
I tried to look at the code at the definition of MapType and ArrayType for 
example and I can’t seem to find the relevant code for its actual 

I am trying to figure out how these two types are implemented to understand how 
they match my needs.
In general, it appears the size of a map is the same as two arrays which is 
about double the naïve array implementation: if I have 1000 rows, each with a 
map from 10K integers to 10K integers, I find through caching the dataframe 
that the total is is ~150MB (the naïve implementation of two arrays would code 
1000*10000*(4+4) or a total of ~80MB). I see the same size if I use two arrays. 
Second, what would be the performance of updating the map/arrays as they are 
immutable (i.e. some copying is required).

The reason I am asking this is because I wanted to do an aggregate function 
which calculates a variation of a histogram.
The most naïve solution for this would be to have a map from the bin to the 
count. But since we are talking about an immutable map, wouldn’t that cost a 
lot more?
An even further optimization would be to use a mutable array where we combine 
the key and value to a single value (key and value are both int in my case). 
Assuming the maximum number of bins is small (e.g. less than 10), it is often 
cheaper to just search the array for the right key (and in this case the size 
of the data is expected to be significantly smaller than map). In my case, most 
of the type (90%) there are less than 3 elements in the bin and If I have more 
than 10 bins I basically do a combination to reduce the number.

For few elements, a map becomes very inefficient  - If I create 10M rows with 1 
map from int to int each I get an overall of ~380MB meaning ~38 bytes per 
element (instead of just 8). For array, again it is too large (229MB, i.e. ~23 
bytes per element).

Is there a way to implement a simple mutable array type to use in the 
aggregation buffer? Where is the portion of the code that handles the actual 
type handling?

View this message in context: Memory usage for spark 
Sent from the Apache Spark Developers List mailing list 
archive<> at

If you reply to this email, your message will be added to the discussion below:
To start a new topic under Apache Spark Developers List, email<>
To unsubscribe from Apache Spark Developers List, click 

View this message in context:
Sent from the Apache Spark Developers List mailing list archive at

Reply via email to