Github user haohui commented on the issue:
https://github.com/apache/flink/pull/3252
Adding `ROWTIME()` as an expression to enable users to specify event time
windows.
After trying multiple approaches at the end I settled down with translating
`LogicalAggregate` directly to `DataStreamAggregate`. The translation removes
the group-by expression from the aggregate and adds the same expression as a
window.
Note that `ROWTIME()` is actually translated to a call to the local
timestamp. The expression has to be executable because Calcite creates a new
project operator to compute the group-by expression, where the expression has
to be executed. For example, the following query
```
SELECT COUNT(*) FROM table GROUP BY FLOOR(ROWTIME() TO HOUR)
```
will be translated to:
```
LogicalAggregate(group={$0}, agg={COUNT(*)})
LogicalProject($0=FLOOR(ROWTIME() TO HOUR))
...
```
It's tempting to remove the group-by expression from the logical plan.
However, it cannot be done using the optimization frameworks in Calcite. These
frameworks expect the output types of the operators stay the same before and
after the transformations. Removing the field actually changes the types thus
Calcite will complain.
The down side of this approach is that it might be difficult for Flink to
catch malformed queries such as `SELECT COUNT(*) FROM table GROUP BY
FLOOR(ROWTIME() / 2 TO HOUR)` at compile-time. Any ideas to improve the
situation?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---