GitHub user ggevay opened a pull request:

    https://github.com/apache/flink/pull/1517

    [FLINK-2237] [runtime] Add hash-based combiner.

    This adds ReduceHashTable, and modifies ReduceCombineDriver to use it 
instead of sorting. When an input element comes in, the driver probes the table 
for the key, and either inserts the element if it is not present, or updates it 
with one reduce step. When memory runs out, all the current elements (partial 
aggregates) in the table are sent to the output.
    
    I haven't yet deleted the code of the sort based solution from the driver, 
but it seems that the hash-based solution is always faster (sometimes by a 
factor of 3), so there isn't a need for dynamically choosing between them. I 
will do some more performance tests to confirm this, and then probably delete 
the sort based code. The hash-based approach also has the advantage that the 
memory used is proportional to the number of different keys, instead of the 
number of input elements.
    
    The interface of ReduceHashTable is a subset of the interface of 
CompactingHashTable, so in theory, it could replace CompactingHashTable 
everywhere (this is only JoinWithSolutionSet and coGroupWithSolutionSet, if I'm 
not mistaken). I haven't actually tried this yet though, but if you think that 
it worth to give it a go, then I will try it, and run some real performance 
tests. I've already added ReduceHashTable to HashTablePerformanceComparison, 
and when there is plenty of memory available, ReduceHashTable is about 5-10% 
faster than CompactingHashTable. It also uses ~15% less memory, and it also has 
the advantage that if the sizes of the records don't change during the updates, 
then no compactings are needed (because records are updated in-place), so 
performance doesn't degrade when we are close to occupying all the memory. (In 
contrast to CompactingHashTable, where updates start getting slow, when there 
is little free memory left.)
    
    For details about the operation of ReduceHashTable, see the comment at the 
beginning of the file.
    
    I've changed the ctor of RandomAccessInputView to not do a seek. (This 
behaviour is more practical for me, because I want to create an instance before 
allocating memory segments to it.) I verified at most places of usage that the 
code does an explicit seek after the ctor, but I couldn't see this in 
AbstractBlockResettableIterator, so I inserted an explicit seek there.
    
    I fixed a small bug in HashTablePerformanceComparison: the *-1 was not 
doing anything, because that value was always 0 there.
    
    The predefined aggregations (eg. DataSet.sum()) currently use groupReduce. 
I'm not sure what was the reason for this originally, but in light of this 
speedup of reduce now, maybe this could be reconsidered. (And the same stands 
for DataSet.distinct().)
    
    Additionally, I have plans for making a different hash table for handling 
the case when the serializer can tell the size of the records. This would allow 
open addressing to be used with linear probing, which would avoid a few cache 
misses per element access, so I'm expecting maybe a 2x speedup from this. (This 
open addressing hash table would actually be simpler than ReduceHashTable.) The 
difficulty with this is that currently the serializers are not very smart in 
this respect (I mean knowing the size), so some improvements would be needed 
for them, but I have some ideas for this.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ggevay/flink hashedReduceSquashed

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1517.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1517
    
----
commit d5eb20c10dcc875e6ae14f3298da829a2a4a5d61
Author: Gabor Gevay <gga...@gmail.com>
Date:   2015-12-04T14:28:23Z

    [FLINK-2237] [runtime] Add hash-based combiner.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to