Github user haohui commented on the issue:

    https://github.com/apache/flink/pull/3252
  
    Adding `ROWTIME()` as an expression to enable users to specify event time 
windows.
    
    After trying multiple approaches at the end I settled down with translating 
`LogicalAggregate` directly to `DataStreamAggregate`. The translation removes 
the group-by expression from the aggregate and adds the same expression as a 
window.
    
    Note that `ROWTIME()` is actually translated to a call to the local 
timestamp. The expression has to be executable because Calcite creates a new 
project operator to compute the group-by expression, where the expression has 
to be executed. For example, the following query
    
    ```
    SELECT COUNT(*) FROM table GROUP BY FLOOR(ROWTIME() TO HOUR)
    ```
    
    will be translated to:
    
    ```
    LogicalAggregate(group={$0}, agg={COUNT(*)})
      LogicalProject($0=FLOOR(ROWTIME() TO HOUR))
      ...
    ```
    
    It's tempting to remove the group-by expression from the logical plan. 
However, it cannot be done using the optimization frameworks in Calcite. These 
frameworks expect the output types of the operators stay the same before and 
after the transformations. Removing the field actually changes the types thus 
Calcite will complain.
    
    The down side of this approach is that it might be difficult for Flink to 
catch malformed queries such as `SELECT COUNT(*) FROM table GROUP BY 
FLOOR(ROWTIME() / 2 TO HOUR)` at compile-time. Any ideas to improve the 
situation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to