Github user ggevay commented on the pull request:

    https://github.com/apache/flink/pull/1517#issuecomment-184696741
  
    Hello Fabian,
    
    I did 3. 
(https://github.com/apache/flink/commit/ef644821d5417b992cfda366225bd5622faa9b2b),
 because the machinery for that was already in place (see the condition in 
`compactOrThrow`). I chose the threshold to be 5%. (This can probably be the 
same with the solution set case, because if lengths change a lot then we get 
very slow as memory load gets near the total memory, so it is probably better 
to indicate the memory problem to the user with an exception than to silently 
be very slow.)
    
    I also did some changes to the tests.
    
    For 2., the situation doesn't seem straightforward to me. For example, if 
there are not many length changes, then exactly the opposite should be done: we 
should emit from the end of the record area (rather than the beginning), 
because if there is skew in the data, then the more common keys will appear 
sooner, so they tend to appear near the beginning of the record area.
    
    The other ideas are also interesting, and I would love to experiment with 
them, but unfortunately I don't really have that much time for this at the 
moment. So I would suggest to merge the non-partitioned version, and then the 
partitioned version can be implemented later when I or someone else has a lot 
of free time on their hands.
    
    (Btw., it would be very interesting to try machine learning techniques for 
dynamically making these decisions that involve complicated trade-offs, based 
on the actual data:
    - Have some switches which control these things like
      - what part of the record area to emit (begin or end; how much)
      - at how much fragmentation should we do compacting instead of emitting
      - what load factor should trigger a resize
      - size of bucket area
      - how to choose which partition to emit
      - maybe even do spilling also in the combiner
      - whether to insert prefetch instructions for the random memory accesses 
that will probably involve a CPU cache miss (the trade-off here is that then 
you have to work with multiple consecutive input records at the same time, so 
you have to do extra copies if object reuse is enabled, which might cost a lot) 
(I have actually experimented with this a little, and there were 20-35% 
speedups, if copies are cheap)
      - ... (it's easy to come up with many more)
    - Gather some statistics about what is happening, and turn them into 
features
      - avg. record size
      - #keys / #elements ratio
      - skew
      - time it takes to serialize a record
      - time it takes to run the ReduceFunction
      - ratio of updates that involve size changes
      - size is changing up or down on average
      - backpressure
        - that we are generating
        - that we get from our outputs (if this is large (eg. because of a 
saturated network), then we should set the switches to do more aggressive 
combining)
      - how many CPU cache misses occur while looking up keys (eg. for 
recognizing the situation where records with matching keys are often close to 
each other for some reason)
      - hash collisions (so that we can start with a more simple hash function 
(few percent speedup), and change it, if it is bad)
      - ... (it's easy to come up with many more)
    - Train some machine learning model which will figure out how to set the 
switches based on the features
    
    I think a pretty good speedup could result from tuning all these things to 
the actual data at hand.
    Maybe in a few years, when data flow systems get more mature, then this can 
become a reality.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to