Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3111
  
    Hi @docete,
    
    thanks for posting the plan. It looks OK, but this is because the query 
computes non-grouped aggregates. 
    
    In case of grouped aggregates, the resulting plan will be less efficient 
than the plan I proposed because the it will not reuse existing partitioning 
and sorting properties of the data. This will result in at least one shuffle 
and one full sort for each distinct aggregate.
    
    The trick is to explicitly partition the data for the distinct operation on 
a subset (i.e., the grouping keys) of the attributes that usually would be 
used. The following aggregations and joins can be performed in a streaming 
fashion without partitioning or sorting the data again. This is not possible 
with your plan. 
    
    We could try to implement that with some tweaking as an optimization rule 
(which would be custom and based on the rule you copied from Calcite) or 
implement it as a dedicated `DataSetRelNode` for distinct aggregates. I'm more 
in favor of the latter option. 
    Once, the optimizer tracks physical data properties, it is easier to 
implement distinct aggregates using optimizer rules.
    
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to