GitHub user ggevay opened a pull request:
https://github.com/apache/flink/pull/1517
[FLINK-2237] [runtime] Add hash-based combiner.
This adds ReduceHashTable, and modifies ReduceCombineDriver to use it
instead of sorting. When an input element comes in, the driver probes the table
for the key, and either inserts the element if it is not present, or updates it
with one reduce step. When memory runs out, all the current elements (partial
aggregates) in the table are sent to the output.
I haven't yet deleted the code of the sort based solution from the driver,
but it seems that the hash-based solution is always faster (sometimes by a
factor of 3), so there isn't a need for dynamically choosing between them. I
will do some more performance tests to confirm this, and then probably delete
the sort based code. The hash-based approach also has the advantage that the
memory used is proportional to the number of different keys, instead of the
number of input elements.
The interface of ReduceHashTable is a subset of the interface of
CompactingHashTable, so in theory, it could replace CompactingHashTable
everywhere (this is only JoinWithSolutionSet and coGroupWithSolutionSet, if I'm
not mistaken). I haven't actually tried this yet though, but if you think that
it worth to give it a go, then I will try it, and run some real performance
tests. I've already added ReduceHashTable to HashTablePerformanceComparison,
and when there is plenty of memory available, ReduceHashTable is about 5-10%
faster than CompactingHashTable. It also uses ~15% less memory, and it also has
the advantage that if the sizes of the records don't change during the updates,
then no compactings are needed (because records are updated in-place), so
performance doesn't degrade when we are close to occupying all the memory. (In
contrast to CompactingHashTable, where updates start getting slow, when there
is little free memory left.)
For details about the operation of ReduceHashTable, see the comment at the
beginning of the file.
I've changed the ctor of RandomAccessInputView to not do a seek. (This
behaviour is more practical for me, because I want to create an instance before
allocating memory segments to it.) I verified at most places of usage that the
code does an explicit seek after the ctor, but I couldn't see this in
AbstractBlockResettableIterator, so I inserted an explicit seek there.
I fixed a small bug in HashTablePerformanceComparison: the *-1 was not
doing anything, because that value was always 0 there.
The predefined aggregations (eg. DataSet.sum()) currently use groupReduce.
I'm not sure what was the reason for this originally, but in light of this
speedup of reduce now, maybe this could be reconsidered. (And the same stands
for DataSet.distinct().)
Additionally, I have plans for making a different hash table for handling
the case when the serializer can tell the size of the records. This would allow
open addressing to be used with linear probing, which would avoid a few cache
misses per element access, so I'm expecting maybe a 2x speedup from this. (This
open addressing hash table would actually be simpler than ReduceHashTable.) The
difficulty with this is that currently the serializers are not very smart in
this respect (I mean knowing the size), so some improvements would be needed
for them, but I have some ideas for this.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/ggevay/flink hashedReduceSquashed
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/1517.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1517
----
commit d5eb20c10dcc875e6ae14f3298da829a2a4a5d61
Author: Gabor Gevay <[email protected]>
Date: 2015-12-04T14:28:23Z
[FLINK-2237] [runtime] Add hash-based combiner.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---