Max Seiden created SPARK-7629:
---------------------------------
Summary: Unroll AggregateFunction "update" in the case of a single
input expression
Key: SPARK-7629
URL: https://issues.apache.org/jira/browse/SPARK-7629
Project: Spark
Issue Type: Improvement
Components: SQL
Affects Versions: 1.2.1
Reporter: Max Seiden
Priority: Minor
h3. Summary
This was observed on 1.2.1. Many of the AggregateFunctions take a list of
Expressions as input, wrap them up in an InterpretedProjection, and evaluate
the InterpretedProjection to get the arguments for the actual aggregate
expression. In the case where there is only a single input expression however,
this leads to gross inefficiencies.
Take CountDistinctFunction as an example, and assume it has a single input
expression of type String. In this case spark uses an OpenHashSet[Any] to
collect a distinct set of GenericRow(Array[Any](string)). This is hugely
inefficient, since every String object must be wrapped up in first an Array and
second a GenericRow, and then inserted into the OpenHashSet where all hashing
and equality comparisons happen on the GenericRow.
This means that any single OpenHashSet entry has unnecessary overhead from the
GenericRow and Array[Any], and all hashcode and equality operations must go
through the Row. In the case of hashcode, this means that every invocation
requires a while loop and pattern match. In the case of equality,
Seq[Any].equals is used, which requires (for both the candidate and existing
objects) a pattern match, call to "canEqual", and call to "sameElements" - the
last of these (in IterableLike as far as I can tell) constructs a Scala
Iterator over the Array[Any] and does element-by-element comparisons.
Needless to say, this requires far too many cycles in the case of a single
input Expression, has a high and unnecessary memory overhead, and generates a
ton of garbage. To give a concrete example, I am unable to compute a grand
distinct on an input of 15M unique IP addresses in a 2GB JVM. After a few
seconds of running, I start to get substantial GCs, and eventually get into a
state where the JVM is stuck doing full GCs. Additionally a profile of the
executor thread shows that 85% of the time is spent rehashing (perhaps
explainable at that scale), but that 65% of the time is spent in
GenericRow.hashCode.
My proposed improvement is to unroll those aggregate functions in the case of a
single input expression so that the inefficiencies described above can be
avoided altogether. The only tricky bits here are dealing with NULLs (Row does
that for us) and efficiently handling both the single input and multi-input
cases within the same aggregate function impl.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]