Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3765
  
    Hi @haohui, 
    
    I suggested before that PR #3771 might be used for DISTINCT group window 
functions. However, this does not work because we cannot register state for an 
AggregateFunction. The benefit of the approach of #3771 would have been that it 
does not need to deserialize the Map every time a record is accumulated (or 
retracted). Instead the distinct values are kept in a MapState that can be 
accessed (and deserialized) per look up key. But this approach does not work 
with the AggregateFunction that we use for early aggregation. 
    
    To be honest, I'm a bit concerned about the performance of the approach of 
this PR because the  state of the DistinctAccumulator accumulator (i.e., the 
complete map) will be de/serialized every time we access it. 
    
    I think we can use this approach for now, but should look out, whether we 
can use an approach similar to the batch side where distinct aggregations (on 
different keys) are translated into multiple aggregations which are later 
joined together (the join would be rather cheap because its a 1-to-1 join).
    
    I'll have a look at this PR later today.
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to