Github user ggevay commented on the pull request:

    https://github.com/apache/flink/pull/1517#issuecomment-172939642
  
    Hello @fhueske, thanks for the comment and shepherding the PR!
    
    > You said, the combiner flushes the full hash table if it runs out of 
memory. Do you think it would be possible to track the update frequency of 
buckets and only flush the bucket with the least updates (or n buckets with 
least updates)? This might improve the performance for skewed input data.
    
    If we flush only some of the elements when the memory is full, then the 
append position of the record area doesn't change, but only some holes appear 
in the record area, so we immediately need to do a compaction. Since the 
compaction traverses the entire table, this idea could only work if we flush a 
substantial amount of the elements (like half of them).
    
    This is an interesting idea, but I'm finding it quite hard to form even 
just an intuitive understanding about its performance effects. I mean doing 
this would have some overhead, and at the moment I totally can't see how much 
skew in the data would make this worth it (or what percentage of the data 
should be flushed at a time, etc.).
    
    I'll think about this some more, but getting right the trade-offs here 
would probably require quite a lot of work: experimentation with different 
variants of the algorithm, with differently skewed data, with different 
`distinct keys / total number of input elements` ratios, so we should probably 
postpone this to some later time, after this basic version is merged.
    
    > I agree that the sort-based strategy for reduce combiners can be removed 
eventually, when the hash-based strategy proves to work well. I would like to 
give it a bit more exposure though, before we drop the code.
    
    OK.
    
    > Porting the built-in aggregation functions, distinct, etc. from 
GroupReduceFunctions to ReduceFunctions sounds good. I think the reason for 
this design decision was that for the sort-based strategies ReduceFunctions did 
not had a benefit over GroupReduceFunctions. Instead they caused more method 
invocations (once for each input record) compared to once per key.
    
    I see, thanks! I'll keep this in mind for now, and open a JIRA when the 
dust settles around the hash-based combiner, and we see the performance 
differences more clearly.
    
    > It would be great if you could open a JIRA and add a short design 
document / API proposal for the changes on the serializers that you talked 
about. This would allow the community to review and discuss your proposal.
    
    OK, I will.
    
    Best,
    Gábor


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to