Github user fhueske commented on the pull request:
https://github.com/apache/flink/pull/1517#issuecomment-182398031
Hi @ggevay, sorry it took me very long to review your PR.
As I said before, this is a very desirable feature and a solid
implementation.
I think a few things can be improved. Especially the full stop to resize /
rebuild / emit records might take some time, depending on the size of the table.
I have the following suggestions / ideas:
- Split the single record into multiple partitions (i.e., use multiple
`RecordArea`s). Each partition holds the data of multiple buckets. This allows
to restrict rebuilding, compaction, etc. to only a part of the whole table. In
fact this is what I meant in my initial comment about tracking the updates of
buckets (which I confused with partitions...). Partitioning is also used in the
other hash tables in Flink. It can also help to make your implementation more
suitable for the final reduce case, because it allows to spill individual
partitions to disk. In `CompactingHashTable` the max number of partitions is
set to `32`.
- Should we think about [linear
hashing](https://en.wikipedia.org/wiki/Linear_hashing) for resizing the table.
This technique grows the table by splitting individual buckets without the need
to reorganize the whole table.
- Do you think it is possible to extract the ReduceFunction from the table?
IMO this would be a cleaner design if we want to use the table instead of the
`CompactingHashTable`.
What do you think?
In any case, we need to update the documentation for the added
`CombineHint`. I would also be good to extend `ReduceITCase` with a few tests
that use `CombineHint.HASH`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---