It seems that we have done that (?). The BatchTableEnvironment.registerTableSource(name, tableSource) only accept a BatchTableSource. In contrast, the StreamTableEnvironment.registerTableSource(name, tableSource) only accept a StreamTableSource. So that, if a TableSource implements both batch and stream, we will determine batch or stream by the type of table environment. I think the TableSourceITCase.testCsvTableSource in batch and stream package can explain it. Am I right ?
- Jark Wu > 在 2016年8月29日,下午8:59,Timo Walther <twal...@apache.org> 写道: > > At first glance, I thought we are losing the possibility to distingish > between choosing a batch or streaming table if a TableSource implements both. > Because currently you are using a StreamTableSource as default if a > TableSource implements both types. I think it would be better to determine > batch or stream using the type of execution environment. What do you think? > > Timo > > > Am 29/08/16 um 14:31 schrieb Jark Wu: >> Hi Timo, >> >> Yes, you are right. The STREAM keyword is invalid now. If there is a STREAM >> keyword in the query, the parser will throw "can’t convert table xxx to >> stream" exception. Because we register the table as a regular table not >> streamable. >> >> - Jark Wu >> >>> 在 2016年8月29日,下午8:13,Timo Walther <twal...@apache.org> 写道: >>> >>> Hi Jark, >>> >>> your code looks good and it also simplifies many parts. So the STREAM >>> keyword is not optional but invalid now, right? What happens if there is >>> keyword in the query? >>> >>> Timo >>> >>> >>> Am 29/08/16 um 05:40 schrieb Jark Wu: >>>> Hi Fabian, Timo, >>>> >>>> I have created a prototype for removing STREAM keyword and using batch sql >>>> parser for stream jobs. >>>> >>>> This is the working brach: >>>> https://github.com/wuchong/flink/tree/remove-stream >>>> <https://github.com/wuchong/flink/tree/remove-stream> >>>> >>>> Looking forward to your feedback. >>>> >>>> - Jark Wu >>>> >>>>> 在 2016年8月24日,下午4:56,Fabian Hueske <fhue...@gmail.com> 写道: >>>>> >>>>> Starting with a prototype would be great, Jark. >>>>> We had some trouble with Calcite's StreamableTable interface anyways. A >>>>> few >>>>> things can be simplified if we do not declare our tables as streamable. >>>>> I would try to implement DataStreamTable (and all related classes and >>>>> methods) equivalent to DataSetTables if possible. >>>>> >>>>> Best, Fabian >>>>> >>>>> 2016-08-24 6:27 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com>: >>>>> >>>>>> Hi Fabian, >>>>>> >>>>>> You are right, the main thing we need to change for removing STREAM >>>>>> keyword is the table registration. If you would like, I can do a >>>>>> prototype. >>>>>> >>>>>> Hi Timo, >>>>>> >>>>>> I’m glad to contribute our work back to Flink. I will look into it and >>>>>> create JIRAs next days. >>>>>> >>>>>> - Jark Wu >>>>>> >>>>>>> 在 2016年8月24日,上午12:13,Fabian Hueske <fhue...@gmail.com> 写道: >>>>>>> >>>>>>> Hi Jark, >>>>>>> >>>>>>> We can think about removing the STREAM keyword or not. In principle, >>>>>>> Calcite should allow the same windowing syntax on streaming and static >>>>>>> tables (this is one of the main goals of Calcite). The Table API can >>>>>>> also >>>>>>> distinguish stream and batch without the STREAM keyword by looking at >>>>>>> the >>>>>>> ExecutionEnvironment. >>>>>>> I think we would need to change the way that tables are registered in >>>>>>> Calcite's catalog and also add more validation (check that time windows >>>>>>> refer to a time column, etc). >>>>>>> A prototype should help to see what the consequence of removing the >>>>>> STREAM >>>>>>> keyword (which is actually, changing the table registration, the parser >>>>>> is >>>>>>> the same) would be. >>>>>>> >>>>>>> Regarding streaming aggregates without window definition: We can >>>>>> certainly >>>>>>> implement this feature in the Table API. There are a few points that >>>>>>> need >>>>>>> to be considered like value expiration after a certain time of update >>>>>>> inactivity (otherwise the state might grow infinitely). But these >>>>>>> aspects >>>>>>> should be rather easy to solve. I think for SQL, such running aggregates >>>>>>> are a special case of the Sliding Windows as discussed in Calcite's >>>>>>> StreamSQL document [1]. >>>>>>> >>>>>>> Thanks also for the document! I'll take that into account when sketching >>>>>>> the FLIP for streaming aggregation support. >>>>>>> >>>>>>> Cheers, Fabian >>>>>>> >>>>>>> [1] http://calcite.apache.org/docs/stream.html#sliding-windows >>>>>>> >>>>>>> 2016-08-23 13:09 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com>: >>>>>>> >>>>>>>> Hi Fabian, Timo, >>>>>>>> >>>>>>>> Sorry for the late response. >>>>>>>> >>>>>>>> Regarding Calcite’s StreamSQL syntax, what I concern is only the STREAM >>>>>>>> keyword and no agg-without-window. Which makes different syntax for >>>>>>>> streaming and static tables. I don’t think Flink should have a custom >>>>>> SQL >>>>>>>> syntax, but it’s better to have a consistent syntax for batch and >>>>>>>> streaming. Regarding window syntax , I think it’s good and reasonable >>>>>>>> to >>>>>>>> follow Calcite’s syntax. Actually, we implement Blink SQL Window >>>>>> following >>>>>>>> Calcite’s syntax[1]. >>>>>>>> >>>>>>>> In addition, I describe the Blink SQL design including UDF, UDTF, UDAF, >>>>>>>> Window in google doc[1]. Hope that can help for the upcoming Flink SQL >>>>>>>> design. >>>>>>>> >>>>>>>> +1 for creating FLIP >>>>>>>> >>>>>>>> [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzb >>>>>>>> buVFPZWBYuY1Ek >>>>>>>> >>>>>>>> >>>>>>>> - Jark Wu >>>>>>>> >>>>>>>>> 在 2016年8月23日,下午3:47,Fabian Hueske <fhue...@gmail.com> 写道: >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I did a bit of prototyping yesterday to check to what extend Calcite >>>>>>>>> supports window operations on streams if we would implement them for >>>>>> the >>>>>>>>> Table API. >>>>>>>>> For the Table API we do not go through Calcite's SQL parser and >>>>>>>> validator, >>>>>>>>> but generate the logical plan (tree of RelNodes) ourselves mostly >>>>>>>>> using >>>>>>>>> Calcite's Relbuilder. >>>>>>>>> It turns out that Calcite does not restrict grouped aggregations on >>>>>>>> streams >>>>>>>>> at this abstraction level, i.e., it does not perform any checks. >>>>>>>>> >>>>>>>>> I think it should be possible to implement windowed aggregates for the >>>>>>>>> Table API. Once CALCITE-1345 [1] is implemented (and released), >>>>>> windowed >>>>>>>>> aggregates are also supported by the SQL parser, validator, and >>>>>>>> optimizer. >>>>>>>>> In order to make them work with our implementation we would need to >>>>>> adapt >>>>>>>>> our solution to it (only internally), but I am sure we could reuse a >>>>>> lot >>>>>>>> of >>>>>>>>> our initial implementation (Table API, validation, execution). >>>>>>>>> >>>>>>>>> I drafted an API proposal a few months ago [2] and could convert this >>>>>>>> into >>>>>>>>> a FLIP to discuss the API and break it down into subtasks. >>>>>>>>> >>>>>>>>> What do you think? >>>>>>>>> >>>>>>>>> Cheers, Fabian >>>>>>>>> >>>>>>>>> [1] https://issues.apache.org/jira/browse/CALCITE-1345 >>>>>>>>> [2] >>>>>>>>> https://docs.google.com/document/d/19kSOAOINKCSWLBCKRq2WvNtmuaA9o >>>>>>>> 3AyCh2ePqr3V5E >>>>>>>>> 2016-08-19 11:04 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >>>>>>>>> >>>>>>>>>> Hi Jark, >>>>>>>>>> >>>>>>>>>> thanks for starting this discussion. Actually, I think we are rather >>>>>>>>>> "blocked" on the internal handling of streaming windows in Calcite >>>>>> than >>>>>>>> the >>>>>>>>>> SQL parser. IMO, it should be possible to exchange or modify the >>>>>> parser >>>>>>>> if >>>>>>>>>> we want that. >>>>>>>>>> >>>>>>>>>> Regarding Calcite's StreamSQL syntax: Except for the STREAM keyword, >>>>>>>>>> Calcite closely follows the SQL standard (e.g.,no special keywords >>>>>> like >>>>>>>>>> WINDOW. Instead stream specific aspects like tumbling windows are >>>>>>>>>> done >>>>>>>> as >>>>>>>>>> functions such as TUMBLE [1]). One main motivation of the Calcite >>>>>>>> community >>>>>>>>>> is to have the same syntax for streaming and static tables. This >>>>>>>> includes >>>>>>>>>> support for tables which are static and streaming at the same time >>>>>> (the >>>>>>>>>> example of [1] is a table about orders to which new order records are >>>>>>>>>> added). When querying such a table, the STREAM keyword is required to >>>>>>>>>> distinguish the cases of a batch query which returns a result set and >>>>>> a >>>>>>>>>> standing query which returns a result stream. In the context of Flink >>>>>> we >>>>>>>>>> can can do the distinction using the type of the TableEnvironment. So >>>>>> we >>>>>>>>>> could use the batch parser, but would need to change a couple things >>>>>>>>>> internally and add checks for proper grouping on the timestamp column >>>>>>>> when >>>>>>>>>> doing windows, etc. So far the discussion about the StreamSQL syntax >>>>>>>> rather >>>>>>>>>> focused on the question whether 1) StreamSQL should follow the SQL >>>>>>>> standard >>>>>>>>>> (as Calcite proposes) or 2) whether Flink should use a custom syntax >>>>>>>> with >>>>>>>>>> stream specific features. For instance a tumbling window is expressed >>>>>> in >>>>>>>>>> the GROUP BY clause [1] when following standard SQL but it could be >>>>>>>> defined >>>>>>>>>> using a special WINDOW keyword in a custom StreamSQL dialect. >>>>>>>>>> >>>>>>>>>> You are right that we have a dependency on Calcite. However, I think >>>>>>>> this >>>>>>>>>> dependency is rather in the internals than the parser, i.e., how does >>>>>>>> the >>>>>>>>>> validator/optimizer support and handle monotone / quasi-monotone >>>>>>>> attributes >>>>>>>>>> and windows. I am not sure how much is already supported but the >>>>>> Calcite >>>>>>>>>> community is working on this [2]. I think we need these features in >>>>>>>> Calcite >>>>>>>>>> unless we want to completely remove our dependency on Calcite for >>>>>>>>>> StreamSQL. I would not be in favor of removing Calcite at this point. >>>>>> We >>>>>>>>>> put a lot of effort into refactoring the Table API internals. Instead >>>>>> we >>>>>>>>>> should start to talk to the Calcite community and see how far they >>>>>> are, >>>>>>>>>> what is missing, and how we can help. >>>>>>>>>> >>>>>>>>>> I will start a discussion on the Calcite dev mailing list in the next >>>>>>>> days >>>>>>>>>> and ask about the status of StreamSQL. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Fabian >>>>>>>>>> >>>>>>>>>> [1] http://calcite.apache.org/docs/stream.html#tumbling- >>>>>>>> windows-improved >>>>>>>>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345 >>>>>>>>>> >>> >>> -- >>> Freundliche Grüße / Kind Regards >>> >>> Timo Walther >>> >>> Follow me: @twalthr >>> https://www.linkedin.com/in/twalthr >> > > > -- > Freundliche Grüße / Kind Regards > > Timo Walther > > Follow me: @twalthr > https://www.linkedin.com/in/twalthr