Hi, I did a bit of prototyping yesterday to check to what extend Calcite supports window operations on streams if we would implement them for the Table API. For the Table API we do not go through Calcite's SQL parser and validator, but generate the logical plan (tree of RelNodes) ourselves mostly using Calcite's Relbuilder. It turns out that Calcite does not restrict grouped aggregations on streams at this abstraction level, i.e., it does not perform any checks.
I think it should be possible to implement windowed aggregates for the Table API. Once CALCITE-1345 [1] is implemented (and released), windowed aggregates are also supported by the SQL parser, validator, and optimizer. In order to make them work with our implementation we would need to adapt our solution to it (only internally), but I am sure we could reuse a lot of our initial implementation (Table API, validation, execution). I drafted an API proposal a few months ago [2] and could convert this into a FLIP to discuss the API and break it down into subtasks. What do you think? Cheers, Fabian [1] https://issues.apache.org/jira/browse/CALCITE-1345 [2] https://docs.google.com/document/d/19kSOAOINKCSWLBCKRq2WvNtmuaA9o3AyCh2ePqr3V5E 2016-08-19 11:04 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > Hi Jark, > > thanks for starting this discussion. Actually, I think we are rather > "blocked" on the internal handling of streaming windows in Calcite than the > SQL parser. IMO, it should be possible to exchange or modify the parser if > we want that. > > Regarding Calcite's StreamSQL syntax: Except for the STREAM keyword, > Calcite closely follows the SQL standard (e.g.,no special keywords like > WINDOW. Instead stream specific aspects like tumbling windows are done as > functions such as TUMBLE [1]). One main motivation of the Calcite community > is to have the same syntax for streaming and static tables. This includes > support for tables which are static and streaming at the same time (the > example of [1] is a table about orders to which new order records are > added). When querying such a table, the STREAM keyword is required to > distinguish the cases of a batch query which returns a result set and a > standing query which returns a result stream. In the context of Flink we > can can do the distinction using the type of the TableEnvironment. So we > could use the batch parser, but would need to change a couple things > internally and add checks for proper grouping on the timestamp column when > doing windows, etc. So far the discussion about the StreamSQL syntax rather > focused on the question whether 1) StreamSQL should follow the SQL standard > (as Calcite proposes) or 2) whether Flink should use a custom syntax with > stream specific features. For instance a tumbling window is expressed in > the GROUP BY clause [1] when following standard SQL but it could be defined > using a special WINDOW keyword in a custom StreamSQL dialect. > > You are right that we have a dependency on Calcite. However, I think this > dependency is rather in the internals than the parser, i.e., how does the > validator/optimizer support and handle monotone / quasi-monotone attributes > and windows. I am not sure how much is already supported but the Calcite > community is working on this [2]. I think we need these features in Calcite > unless we want to completely remove our dependency on Calcite for > StreamSQL. I would not be in favor of removing Calcite at this point. We > put a lot of effort into refactoring the Table API internals. Instead we > should start to talk to the Calcite community and see how far they are, > what is missing, and how we can help. > > I will start a discussion on the Calcite dev mailing list in the next days > and ask about the status of StreamSQL. > > Best, > Fabian > > [1] http://calcite.apache.org/docs/stream.html#tumbling-windows-improved > [2] https://issues.apache.org/jira/browse/CALCITE-1345 >