It seems that we have done that (?).  The 
BatchTableEnvironment.registerTableSource(name, tableSource) only accept a 
BatchTableSource. In contrast, the 
StreamTableEnvironment.registerTableSource(name, tableSource) only accept a 
StreamTableSource. So that, if a TableSource implements both batch and stream, 
we will determine batch or stream by the type of table environment.  I think 
the TableSourceITCase.testCsvTableSource in batch and stream package can 
explain it.  Am I right ? 

- Jark Wu 

> 在 2016年8月29日,下午8:59,Timo Walther <twal...@apache.org> 写道:
> 
> At first glance, I thought we are losing the possibility to distingish 
> between choosing a batch or streaming table if a TableSource implements both. 
> Because currently you are using a StreamTableSource as default if a 
> TableSource implements both types. I think it would be better to determine 
> batch or stream using the type of execution environment. What do you think?
> 
> Timo
> 
> 
> Am 29/08/16 um 14:31 schrieb Jark Wu:
>> Hi Timo,
>> 
>> Yes, you are right. The STREAM keyword is invalid now. If there is a STREAM 
>> keyword in the query, the parser will throw "can’t convert table xxx to 
>> stream" exception. Because we register the table as a regular table not 
>> streamable.
>> 
>> - Jark Wu
>> 
>>> 在 2016年8月29日,下午8:13,Timo Walther <twal...@apache.org> 写道:
>>> 
>>> Hi Jark,
>>> 
>>> your code looks good and it also simplifies many parts. So the STREAM 
>>> keyword is not optional but invalid now, right? What happens if there is 
>>> keyword in the query?
>>> 
>>> Timo
>>> 
>>> 
>>> Am 29/08/16 um 05:40 schrieb Jark Wu:
>>>> Hi Fabian, Timo,
>>>> 
>>>> I have created a prototype for removing STREAM keyword and using batch sql 
>>>> parser for stream jobs.
>>>> 
>>>> This is the working brach:  
>>>> https://github.com/wuchong/flink/tree/remove-stream 
>>>> <https://github.com/wuchong/flink/tree/remove-stream>
>>>> 
>>>> Looking forward to your feedback.
>>>> 
>>>> - Jark Wu
>>>> 
>>>>> 在 2016年8月24日,下午4:56,Fabian Hueske <fhue...@gmail.com> 写道:
>>>>> 
>>>>> Starting with a prototype would be great, Jark.
>>>>> We had some trouble with Calcite's StreamableTable interface anyways. A 
>>>>> few
>>>>> things can be simplified if we do not declare our tables as streamable.
>>>>> I would try to implement DataStreamTable (and all related classes and
>>>>> methods) equivalent to DataSetTables if possible.
>>>>> 
>>>>> Best, Fabian
>>>>> 
>>>>> 2016-08-24 6:27 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com>:
>>>>> 
>>>>>> Hi Fabian,
>>>>>> 
>>>>>> You are right, the main thing we need to change for removing STREAM
>>>>>> keyword is the table registration. If you would like, I can do a 
>>>>>> prototype.
>>>>>> 
>>>>>> Hi Timo,
>>>>>> 
>>>>>> I’m glad to contribute our work back to Flink. I will look into it and
>>>>>> create JIRAs next days.
>>>>>> 
>>>>>> - Jark Wu
>>>>>> 
>>>>>>> 在 2016年8月24日,上午12:13,Fabian Hueske <fhue...@gmail.com> 写道:
>>>>>>> 
>>>>>>> Hi Jark,
>>>>>>> 
>>>>>>> We can think about removing the STREAM keyword or not. In principle,
>>>>>>> Calcite should allow the same windowing syntax on streaming and static
>>>>>>> tables (this is one of the main goals of Calcite). The Table API can 
>>>>>>> also
>>>>>>> distinguish stream and batch without the STREAM keyword by looking at 
>>>>>>> the
>>>>>>> ExecutionEnvironment.
>>>>>>> I think we would need to change the way that tables are registered in
>>>>>>> Calcite's catalog and also add more validation (check that time windows
>>>>>>> refer to a time column, etc).
>>>>>>> A prototype should help to see what the consequence of removing the
>>>>>> STREAM
>>>>>>> keyword (which is actually, changing the table registration, the parser
>>>>>> is
>>>>>>> the same) would be.
>>>>>>> 
>>>>>>> Regarding streaming aggregates without window definition: We can
>>>>>> certainly
>>>>>>> implement this feature in the Table API. There are a few points that 
>>>>>>> need
>>>>>>> to be considered like value expiration after a certain time of update
>>>>>>> inactivity (otherwise the state might grow infinitely). But these 
>>>>>>> aspects
>>>>>>> should be rather easy to solve. I think for SQL, such running aggregates
>>>>>>> are a special case of the Sliding Windows as discussed in Calcite's
>>>>>>> StreamSQL document [1].
>>>>>>> 
>>>>>>> Thanks also for the document! I'll take that into account when sketching
>>>>>>> the FLIP for streaming aggregation support.
>>>>>>> 
>>>>>>> Cheers, Fabian
>>>>>>> 
>>>>>>> [1] http://calcite.apache.org/docs/stream.html#sliding-windows
>>>>>>> 
>>>>>>> 2016-08-23 13:09 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com>:
>>>>>>> 
>>>>>>>> Hi Fabian, Timo,
>>>>>>>> 
>>>>>>>> Sorry for the late response.
>>>>>>>> 
>>>>>>>> Regarding Calcite’s StreamSQL syntax, what I concern is only the STREAM
>>>>>>>> keyword and no agg-without-window. Which makes different syntax for
>>>>>>>> streaming and static tables. I don’t think Flink should have a custom
>>>>>> SQL
>>>>>>>> syntax, but it’s better to have a consistent syntax for batch and
>>>>>>>> streaming. Regarding window syntax , I think it’s good and reasonable 
>>>>>>>> to
>>>>>>>> follow Calcite’s syntax. Actually, we implement Blink SQL Window
>>>>>> following
>>>>>>>> Calcite’s syntax[1].
>>>>>>>> 
>>>>>>>> In addition, I describe the Blink SQL design including UDF, UDTF, UDAF,
>>>>>>>> Window in google doc[1]. Hope that can help for the upcoming Flink SQL
>>>>>>>> design.
>>>>>>>> 
>>>>>>>> +1 for creating FLIP
>>>>>>>> 
>>>>>>>> [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzb
>>>>>>>> buVFPZWBYuY1Ek
>>>>>>>> 
>>>>>>>> 
>>>>>>>> - Jark Wu
>>>>>>>> 
>>>>>>>>> 在 2016年8月23日,下午3:47,Fabian Hueske <fhue...@gmail.com> 写道:
>>>>>>>>> 
>>>>>>>>> Hi,
>>>>>>>>> 
>>>>>>>>> I did a bit of prototyping yesterday to check to what extend Calcite
>>>>>>>>> supports window operations on streams if we would implement them for
>>>>>> the
>>>>>>>>> Table API.
>>>>>>>>> For the Table API we do not go through Calcite's SQL parser and
>>>>>>>> validator,
>>>>>>>>> but generate the logical plan (tree of RelNodes) ourselves mostly 
>>>>>>>>> using
>>>>>>>>> Calcite's Relbuilder.
>>>>>>>>> It turns out that Calcite does not restrict grouped aggregations on
>>>>>>>> streams
>>>>>>>>> at this abstraction level, i.e., it does not perform any checks.
>>>>>>>>> 
>>>>>>>>> I think it should be possible to implement windowed aggregates for the
>>>>>>>>> Table API. Once CALCITE-1345 [1] is implemented (and released),
>>>>>> windowed
>>>>>>>>> aggregates are also supported by the SQL parser, validator, and
>>>>>>>> optimizer.
>>>>>>>>> In order to make them work with our implementation we would need to
>>>>>> adapt
>>>>>>>>> our solution to it (only internally), but I am sure we could reuse a
>>>>>> lot
>>>>>>>> of
>>>>>>>>> our initial implementation (Table API, validation, execution).
>>>>>>>>> 
>>>>>>>>> I drafted an API proposal a few months ago [2] and could convert this
>>>>>>>> into
>>>>>>>>> a FLIP to discuss the API and break it down into subtasks.
>>>>>>>>> 
>>>>>>>>> What do you think?
>>>>>>>>> 
>>>>>>>>> Cheers, Fabian
>>>>>>>>> 
>>>>>>>>> [1] https://issues.apache.org/jira/browse/CALCITE-1345
>>>>>>>>> [2]
>>>>>>>>> https://docs.google.com/document/d/19kSOAOINKCSWLBCKRq2WvNtmuaA9o
>>>>>>>> 3AyCh2ePqr3V5E
>>>>>>>>> 2016-08-19 11:04 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>>>>>>>>> 
>>>>>>>>>> Hi Jark,
>>>>>>>>>> 
>>>>>>>>>> thanks for starting this discussion. Actually, I think we are rather
>>>>>>>>>> "blocked" on the internal handling of streaming windows in Calcite
>>>>>> than
>>>>>>>> the
>>>>>>>>>> SQL parser. IMO, it should be possible to exchange or modify the
>>>>>> parser
>>>>>>>> if
>>>>>>>>>> we want that.
>>>>>>>>>> 
>>>>>>>>>> Regarding Calcite's StreamSQL syntax: Except for the STREAM keyword,
>>>>>>>>>> Calcite closely follows the SQL standard (e.g.,no special keywords
>>>>>> like
>>>>>>>>>> WINDOW. Instead stream specific aspects like tumbling windows are 
>>>>>>>>>> done
>>>>>>>> as
>>>>>>>>>> functions such as TUMBLE [1]). One main motivation of the Calcite
>>>>>>>> community
>>>>>>>>>> is to have the same syntax for streaming and static tables. This
>>>>>>>> includes
>>>>>>>>>> support for tables which are static and streaming at the same time
>>>>>> (the
>>>>>>>>>> example of [1] is a table about orders to which new order records are
>>>>>>>>>> added). When querying such a table, the STREAM keyword is required to
>>>>>>>>>> distinguish the cases of a batch query which returns a result set and
>>>>>> a
>>>>>>>>>> standing query which returns a result stream. In the context of Flink
>>>>>> we
>>>>>>>>>> can can do the distinction using the type of the TableEnvironment. So
>>>>>> we
>>>>>>>>>> could use the batch parser, but would need to change a couple things
>>>>>>>>>> internally and add checks for proper grouping on the timestamp column
>>>>>>>> when
>>>>>>>>>> doing windows, etc. So far the discussion about the StreamSQL syntax
>>>>>>>> rather
>>>>>>>>>> focused on the question whether 1) StreamSQL should follow the SQL
>>>>>>>> standard
>>>>>>>>>> (as Calcite proposes) or 2) whether Flink should use a custom syntax
>>>>>>>> with
>>>>>>>>>> stream specific features. For instance a tumbling window is expressed
>>>>>> in
>>>>>>>>>> the GROUP BY clause [1] when following standard SQL but it could be
>>>>>>>> defined
>>>>>>>>>> using a special WINDOW keyword in a custom StreamSQL dialect.
>>>>>>>>>> 
>>>>>>>>>> You are right that we have a dependency on Calcite. However, I think
>>>>>>>> this
>>>>>>>>>> dependency is rather in the internals than the parser, i.e., how does
>>>>>>>> the
>>>>>>>>>> validator/optimizer support and handle monotone / quasi-monotone
>>>>>>>> attributes
>>>>>>>>>> and windows. I am not sure how much is already supported but the
>>>>>> Calcite
>>>>>>>>>> community is working on this [2]. I think we need these features in
>>>>>>>> Calcite
>>>>>>>>>> unless we want to completely remove our dependency on Calcite for
>>>>>>>>>> StreamSQL. I would not be in favor of removing Calcite at this point.
>>>>>> We
>>>>>>>>>> put a lot of effort into refactoring the Table API internals. Instead
>>>>>> we
>>>>>>>>>> should start to talk to the Calcite community and see how far they
>>>>>> are,
>>>>>>>>>> what is missing, and how we can help.
>>>>>>>>>> 
>>>>>>>>>> I will start a discussion on the Calcite dev mailing list in the next
>>>>>>>> days
>>>>>>>>>> and ask about the status of StreamSQL.
>>>>>>>>>> 
>>>>>>>>>> Best,
>>>>>>>>>> Fabian
>>>>>>>>>> 
>>>>>>>>>> [1] http://calcite.apache.org/docs/stream.html#tumbling-
>>>>>>>> windows-improved
>>>>>>>>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345
>>>>>>>>>> 
>>> 
>>> -- 
>>> Freundliche Grüße / Kind Regards
>>> 
>>> Timo Walther
>>> 
>>> Follow me: @twalthr
>>> https://www.linkedin.com/in/twalthr
>> 
> 
> 
> -- 
> Freundliche Grüße / Kind Regards
> 
> Timo Walther
> 
> Follow me: @twalthr
> https://www.linkedin.com/in/twalthr

Reply via email to