Hi Fabian, Timo, 

I have created a prototype for removing STREAM keyword and using batch sql 
parser for stream jobs. 

This is the working brach:  https://github.com/wuchong/flink/tree/remove-stream 
<https://github.com/wuchong/flink/tree/remove-stream>

Looking forward to your feedback.

- Jark Wu 

> 在 2016年8月24日,下午4:56,Fabian Hueske <fhue...@gmail.com> 写道:
> 
> Starting with a prototype would be great, Jark.
> We had some trouble with Calcite's StreamableTable interface anyways. A few
> things can be simplified if we do not declare our tables as streamable.
> I would try to implement DataStreamTable (and all related classes and
> methods) equivalent to DataSetTables if possible.
> 
> Best, Fabian
> 
> 2016-08-24 6:27 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com>:
> 
>> Hi Fabian,
>> 
>> You are right, the main thing we need to change for removing STREAM
>> keyword is the table registration. If you would like, I can do a prototype.
>> 
>> Hi Timo,
>> 
>> I’m glad to contribute our work back to Flink. I will look into it and
>> create JIRAs next days.
>> 
>> - Jark Wu
>> 
>>> 在 2016年8月24日,上午12:13,Fabian Hueske <fhue...@gmail.com> 写道:
>>> 
>>> Hi Jark,
>>> 
>>> We can think about removing the STREAM keyword or not. In principle,
>>> Calcite should allow the same windowing syntax on streaming and static
>>> tables (this is one of the main goals of Calcite). The Table API can also
>>> distinguish stream and batch without the STREAM keyword by looking at the
>>> ExecutionEnvironment.
>>> I think we would need to change the way that tables are registered in
>>> Calcite's catalog and also add more validation (check that time windows
>>> refer to a time column, etc).
>>> A prototype should help to see what the consequence of removing the
>> STREAM
>>> keyword (which is actually, changing the table registration, the parser
>> is
>>> the same) would be.
>>> 
>>> Regarding streaming aggregates without window definition: We can
>> certainly
>>> implement this feature in the Table API. There are a few points that need
>>> to be considered like value expiration after a certain time of update
>>> inactivity (otherwise the state might grow infinitely). But these aspects
>>> should be rather easy to solve. I think for SQL, such running aggregates
>>> are a special case of the Sliding Windows as discussed in Calcite's
>>> StreamSQL document [1].
>>> 
>>> Thanks also for the document! I'll take that into account when sketching
>>> the FLIP for streaming aggregation support.
>>> 
>>> Cheers, Fabian
>>> 
>>> [1] http://calcite.apache.org/docs/stream.html#sliding-windows
>>> 
>>> 2016-08-23 13:09 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com>:
>>> 
>>>> Hi Fabian, Timo,
>>>> 
>>>> Sorry for the late response.
>>>> 
>>>> Regarding Calcite’s StreamSQL syntax, what I concern is only the STREAM
>>>> keyword and no agg-without-window. Which makes different syntax for
>>>> streaming and static tables. I don’t think Flink should have a custom
>> SQL
>>>> syntax, but it’s better to have a consistent syntax for batch and
>>>> streaming. Regarding window syntax , I think it’s good and reasonable to
>>>> follow Calcite’s syntax. Actually, we implement Blink SQL Window
>> following
>>>> Calcite’s syntax[1].
>>>> 
>>>> In addition, I describe the Blink SQL design including UDF, UDTF, UDAF,
>>>> Window in google doc[1]. Hope that can help for the upcoming Flink SQL
>>>> design.
>>>> 
>>>> +1 for creating FLIP
>>>> 
>>>> [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzb
>>>> buVFPZWBYuY1Ek
>>>> 
>>>> 
>>>> - Jark Wu
>>>> 
>>>>> 在 2016年8月23日,下午3:47,Fabian Hueske <fhue...@gmail.com> 写道:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> I did a bit of prototyping yesterday to check to what extend Calcite
>>>>> supports window operations on streams if we would implement them for
>> the
>>>>> Table API.
>>>>> For the Table API we do not go through Calcite's SQL parser and
>>>> validator,
>>>>> but generate the logical plan (tree of RelNodes) ourselves mostly using
>>>>> Calcite's Relbuilder.
>>>>> It turns out that Calcite does not restrict grouped aggregations on
>>>> streams
>>>>> at this abstraction level, i.e., it does not perform any checks.
>>>>> 
>>>>> I think it should be possible to implement windowed aggregates for the
>>>>> Table API. Once CALCITE-1345 [1] is implemented (and released),
>> windowed
>>>>> aggregates are also supported by the SQL parser, validator, and
>>>> optimizer.
>>>>> In order to make them work with our implementation we would need to
>> adapt
>>>>> our solution to it (only internally), but I am sure we could reuse a
>> lot
>>>> of
>>>>> our initial implementation (Table API, validation, execution).
>>>>> 
>>>>> I drafted an API proposal a few months ago [2] and could convert this
>>>> into
>>>>> a FLIP to discuss the API and break it down into subtasks.
>>>>> 
>>>>> What do you think?
>>>>> 
>>>>> Cheers, Fabian
>>>>> 
>>>>> [1] https://issues.apache.org/jira/browse/CALCITE-1345
>>>>> [2]
>>>>> https://docs.google.com/document/d/19kSOAOINKCSWLBCKRq2WvNtmuaA9o
>>>> 3AyCh2ePqr3V5E
>>>>> 
>>>>> 2016-08-19 11:04 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>>>>> 
>>>>>> Hi Jark,
>>>>>> 
>>>>>> thanks for starting this discussion. Actually, I think we are rather
>>>>>> "blocked" on the internal handling of streaming windows in Calcite
>> than
>>>> the
>>>>>> SQL parser. IMO, it should be possible to exchange or modify the
>> parser
>>>> if
>>>>>> we want that.
>>>>>> 
>>>>>> Regarding Calcite's StreamSQL syntax: Except for the STREAM keyword,
>>>>>> Calcite closely follows the SQL standard (e.g.,no special keywords
>> like
>>>>>> WINDOW. Instead stream specific aspects like tumbling windows are done
>>>> as
>>>>>> functions such as TUMBLE [1]). One main motivation of the Calcite
>>>> community
>>>>>> is to have the same syntax for streaming and static tables. This
>>>> includes
>>>>>> support for tables which are static and streaming at the same time
>> (the
>>>>>> example of [1] is a table about orders to which new order records are
>>>>>> added). When querying such a table, the STREAM keyword is required to
>>>>>> distinguish the cases of a batch query which returns a result set and
>> a
>>>>>> standing query which returns a result stream. In the context of Flink
>> we
>>>>>> can can do the distinction using the type of the TableEnvironment. So
>> we
>>>>>> could use the batch parser, but would need to change a couple things
>>>>>> internally and add checks for proper grouping on the timestamp column
>>>> when
>>>>>> doing windows, etc. So far the discussion about the StreamSQL syntax
>>>> rather
>>>>>> focused on the question whether 1) StreamSQL should follow the SQL
>>>> standard
>>>>>> (as Calcite proposes) or 2) whether Flink should use a custom syntax
>>>> with
>>>>>> stream specific features. For instance a tumbling window is expressed
>> in
>>>>>> the GROUP BY clause [1] when following standard SQL but it could be
>>>> defined
>>>>>> using a special WINDOW keyword in a custom StreamSQL dialect.
>>>>>> 
>>>>>> You are right that we have a dependency on Calcite. However, I think
>>>> this
>>>>>> dependency is rather in the internals than the parser, i.e., how does
>>>> the
>>>>>> validator/optimizer support and handle monotone / quasi-monotone
>>>> attributes
>>>>>> and windows. I am not sure how much is already supported but the
>> Calcite
>>>>>> community is working on this [2]. I think we need these features in
>>>> Calcite
>>>>>> unless we want to completely remove our dependency on Calcite for
>>>>>> StreamSQL. I would not be in favor of removing Calcite at this point.
>> We
>>>>>> put a lot of effort into refactoring the Table API internals. Instead
>> we
>>>>>> should start to talk to the Calcite community and see how far they
>> are,
>>>>>> what is missing, and how we can help.
>>>>>> 
>>>>>> I will start a discussion on the Calcite dev mailing list in the next
>>>> days
>>>>>> and ask about the status of StreamSQL.
>>>>>> 
>>>>>> Best,
>>>>>> Fabian
>>>>>> 
>>>>>> [1] http://calcite.apache.org/docs/stream.html#tumbling-
>>>> windows-improved
>>>>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345
>>>>>> 
>>>> 
>>>> 
>> 
>> 

Reply via email to