GitHub user ggevay opened a pull request: https://github.com/apache/flink/pull/1517
[FLINK-2237] [runtime] Add hash-based combiner. This adds ReduceHashTable, and modifies ReduceCombineDriver to use it instead of sorting. When an input element comes in, the driver probes the table for the key, and either inserts the element if it is not present, or updates it with one reduce step. When memory runs out, all the current elements (partial aggregates) in the table are sent to the output. I haven't yet deleted the code of the sort based solution from the driver, but it seems that the hash-based solution is always faster (sometimes by a factor of 3), so there isn't a need for dynamically choosing between them. I will do some more performance tests to confirm this, and then probably delete the sort based code. The hash-based approach also has the advantage that the memory used is proportional to the number of different keys, instead of the number of input elements. The interface of ReduceHashTable is a subset of the interface of CompactingHashTable, so in theory, it could replace CompactingHashTable everywhere (this is only JoinWithSolutionSet and coGroupWithSolutionSet, if I'm not mistaken). I haven't actually tried this yet though, but if you think that it worth to give it a go, then I will try it, and run some real performance tests. I've already added ReduceHashTable to HashTablePerformanceComparison, and when there is plenty of memory available, ReduceHashTable is about 5-10% faster than CompactingHashTable. It also uses ~15% less memory, and it also has the advantage that if the sizes of the records don't change during the updates, then no compactings are needed (because records are updated in-place), so performance doesn't degrade when we are close to occupying all the memory. (In contrast to CompactingHashTable, where updates start getting slow, when there is little free memory left.) For details about the operation of ReduceHashTable, see the comment at the beginning of the file. I've changed the ctor of RandomAccessInputView to not do a seek. (This behaviour is more practical for me, because I want to create an instance before allocating memory segments to it.) I verified at most places of usage that the code does an explicit seek after the ctor, but I couldn't see this in AbstractBlockResettableIterator, so I inserted an explicit seek there. I fixed a small bug in HashTablePerformanceComparison: the *-1 was not doing anything, because that value was always 0 there. The predefined aggregations (eg. DataSet.sum()) currently use groupReduce. I'm not sure what was the reason for this originally, but in light of this speedup of reduce now, maybe this could be reconsidered. (And the same stands for DataSet.distinct().) Additionally, I have plans for making a different hash table for handling the case when the serializer can tell the size of the records. This would allow open addressing to be used with linear probing, which would avoid a few cache misses per element access, so I'm expecting maybe a 2x speedup from this. (This open addressing hash table would actually be simpler than ReduceHashTable.) The difficulty with this is that currently the serializers are not very smart in this respect (I mean knowing the size), so some improvements would be needed for them, but I have some ideas for this. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ggevay/flink hashedReduceSquashed Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1517.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1517 ---- commit d5eb20c10dcc875e6ae14f3298da829a2a4a5d61 Author: Gabor Gevay <gga...@gmail.com> Date: 2015-12-04T14:28:23Z [FLINK-2237] [runtime] Add hash-based combiner. ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---