[ 
https://issues.apache.org/jira/browse/CALCITE-2216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16406121#comment-16406121
 ] 

Fabian Hueske commented on CALCITE-2216:
----------------------------------------

I don't think we would need that for Flink.

We have a rule that converts
{code:java}
LogicalProject(EXPR$0=[$1], EXPR$2=[TUMBLE_START($0)], EXPR$3=[TUMBLE_END($0)])
  LogicalAggregate(group=[{0}], EXPR$0=[COUNT($1)])
    LogicalProject($f0=[TUMBLE($4, 900000)], a=[$0]){code}
Into something like
{code:java}
LogicalProject(EXPR$0=[$1], EXPR$2, EXPR$3)
  LogicalWindowAggregate(group=[{0}], EXPR$0=[COUNT($1)], 
EXPR$2=[TUMBLE_START($0)], EXPR$3=[TUMBLE_END($0)]){code}
We do that to have all information that we need in a single operator 
({{LogicalWindowAggregate}}) because we are using Flink's built-in window 
operators for the three window types and don't want the information to be 
spread across three RelNodes.

Regarding this proposal, we had problems forwarding {{TUMBLE_START($0)}} and 
{{TUMBLE_END($0)}} they were represented additional properties of the 
{{LogicalWindowAggregate}} and not as aggregate functions. It should have been 
possible to make these aggregation functions, but I felt that this would not be 
a clean solution because they are not actually aggregating.

Anyway, we can also change Flink's implementation to move the window auxiliary 
functions into the aggregation calls of {{Aggregate}}. 
Until then, we'd just keep a modified copy of the rule in our code.

 

> Improve extensibility of AggregateReduceFunctionsRule
> -----------------------------------------------------
>
>                 Key: CALCITE-2216
>                 URL: https://issues.apache.org/jira/browse/CALCITE-2216
>             Project: Calcite
>          Issue Type: Improvement
>          Components: core
>    Affects Versions: 1.15.0
>            Reporter: Fabian Hueske
>            Assignee: Julian Hyde
>            Priority: Minor
>
> I'm proposing to improve the extensibility of 
> {{AggregateReduceFunctionsRule}}. The purpose of the rule is to decompose 
> complex aggregation functions like {{VAR_POP}} and {{STDDEV_SAMP}} into 
> {{COUNT}} and {{SUM}} functions and compute the original functions in a 
> subsequent Calc operator.
> Right now, the rule class provides a {{protected}} method that can be 
> overridden to create an {{Aggregate}} with the updated aggregate calls.
> We are using the rule in Flink and have a special {{Aggregate}} Rel for 
> group-windowed aggregations ({{GROUP BY TUMBLE/HOP/SESSION}}). Our 
> implementation requires to forward some additional fields from the 
> {{Aggregate}} for window properties like {{TUMBLE_START}} or {{HOP_END}}. In 
> the current form, we cannot extend the rule, because these fields are striped 
> off by the {{Calc}} node that is automatically added by the rule.
> I'm proposing to also move the code to create the {{Calc}} into a 
> {{protected}} method just like the code to create the new {{Aggregate}}.
> I know, this is a fairly Flink-specific issue, but the code changes are 
> minimal (no change in functionality) and it would help us, because we would 
> not need to copy the rule and maintain it in Flink.
> I'll open a PR for this. Looking forward to your comments.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to