I think there was a big missing in my summary about the position of EMIT and the execution order (and I forgot about ORDER BY). Try to address them here:
SELECT [FROM TVF windowing] // windowing happen here [WHERE clause] [GROUP BY clause] [HAVING clause] [ORDER BY clause] [LIMIT clause] [EMIT clause] // materialization latency The position of EMIT is indeed a bit confusing. As the right execution order should be: FROM -> EMIT -> others like normal query. FROM is continuously generating data and EMIT decide when to emit a part of data, and then other clauses are applied to emitted data and update downstream. So at least two open questions: 1. What should we use for EMIT? EMIT HAVING (can use aggregation columns like COUNT(*)), EMIT WHERE (can only use single column alias like ORDER BY) or EMIT AFTER (undefined yet if we want to support expressions, I hope we do). 2. Where is the EMIT clause? Maybe the most clear position is to put it after FROM. -Rui On Tue, Jan 21, 2020 at 1:09 PM Rui Wang <[email protected]> wrote: > > > On Tue, Jan 21, 2020 at 12:34 PM Julian Hyde <[email protected]> wrote: > >> Does EMIT HAVING have anything to do with aggregate queries (GROUP BY >> and HAVING), or is it just a coincidence that you use the same word, >> HAVING? >> > > EMIT HAVING is independent from HAVING, but EMIT HAVING does have a > relationship to GROUP BY: EMIT HAVING requires a GROUP BY. It is a GROUP BY > key, then apply EMIT HAVING expressions on sets specified by those keys. > However we could loose the constraint to allow EMIT HAVING appears even > without GROUP BY, which just means that apply emit control on the whole > data set than control the emitting per group. > > In my opinion, the execution order is: grouping (GROUP BY) -> EMIT control > (emit having to decide which part of data can be emitted) -> aggregation > (normal having and other aggregations). For batch/classic DB workload, the > EMIT step will always emit all data. So such idea is compatible with > existing DB users. > > I happen to choose EMIT HAVING because the emit control is very similar to > HAVING (and some bits of WHERE) that the only difference is: HAVING is a > filter while EMIT HAVING control the emit. E.g apply HAVING expressions to > data means if pass this data to downstream or not. And applying EMIT > HAVING expressions means if pass this data to downstream now or later (or > discard it if the window closes). > > If you think the idea actually causes confusion rather than convenience to > onboard people to use steaming sql, we can replace EMIT HAVING by EMIT > AFTER, per the original design. > > I support the idea of latency controls, but I am nervous about >> allowing full expressions in the EMIT clause if we don't have to. >> >> > Yep. It's a design choice of allowing expressions or keeping a set of > dedicated SQL syntax for some emit strategies. If don't use extensible EMIT > expressions, we likely will need to provide a long list of syntax for > different emit strategies. For examples: > EMIT AFTER WATERMARK > EMIT AFTER DELAY > EMIT AFTER AFTER WATERMARK BUT LATE > EMIT AFTER COUNT > EMIT AFTER BEFORE WATERMARK > etc. > > Again it's a design choice so I am open to both ideas. > > However, personally I prefer the EMIT expressions idea because I found it > was very easy to explain EMIT expressions to SQL people who don't have much > streaming brackgroup. Basically you can say EMIT expressions are just > applied to rows of table from table-valued function. If there are GROUP BY, > each apply expression to each group accordingly. and the result of > expression indicates if it's ready to emit. This easiness is also mainly > from that we have introduced window start and window end into the table, > thus we should have all data we needed in table to write expressions > against them (except for processing time triggers). > > The downside of expressions though is people will be allowed to write any > expression they want, and engines will take responsibility to validate > those. > > > >> Aggregate queries have a peculiar scope. (You can only reference >> grouped expressions, for example.) I don't think we should drag that >> peculiar scope into the EMIT clause. The simplest thing is to only >> allow the EMIT clause to reference column aliases, which is similar to >> the scope used by ORDER BY. >> >> That sounds like one of the prototype ideas: EMIT WHERE. In EMIT WHERE, > just like WHERE, you will not reference aggregation columns but just column > aliases. It can solve the event time trigger and processing time triggers. > However, it has a shortcoming of cannot express: EMIT WHEN there are 100 > elements accumulated, which require a COUNT(*). > > > >> Am I allowed to combine EMIT and ORDER BY in the same SELECT block? If >> so, which comes first? >> >> > Sorry I forgot about ORDER BY. In my opinion EMIT should be applied after > FROM, but before all other WHERE, HAVING (or aggregation), ORDER BY. Note > that EMIT controls when to emit data from the streaming dataset, thus all > filters and aggregations should be applied after data is read to emit. Note > again to emphasize that for classic DB/batch cases, EMIT is explicitly > there which just EMIT all data once after FROM as all data is already > known. > > > On Tue, Jan 21, 2020 at 12:41 PM Julian Hyde <[email protected]> wrote: > >> One more thought. Since EMIT produces a relation, your "EMIT HAVING >> current_watermark() >= T.window_end AND current_watermark() < >> T.window_end + INTERVAL 2 DAY" could perhaps be accomplished by >> wrapping the EMIT query as a sub-query and using ordinal SQL >> expressions on the system columns added by EMIT. (I'm not saying we >> should do this. But when designing a feature, it's worth calling out >> whether it adds power or whether it is syntactic sugar.) > > > Sounds like just do a normal WHERE based on the table from sub-query. yep > that's an option. Then open question from it is if we want to mix "EMIT'' > semantic (which is not really a classic SQL filter) or "Filter" semantic > (WHERE and HAVING) into existing WHERE and HAVING. I prefer not to, to > leave classic DB queries unchanged (their WHERE is just a per row filter > which having any new semantic, e.g. latency control, added) > > > >> On Tue, Jan 21, 2020 at 11:22 AM Rui Wang <[email protected]> wrote: >> > >> > Hi community, >> > >> > First of all, thanks all your help on the CALCITE-3272 (TUMBLE now >> works as >> > a table-valued function). As it starts to work, I want to continue >> > discussing the following work of streaming sql: a way to control >> > materialization latency of a streaming query. A small note for people >> who >> > are not familiar with streaming: because steaming queries are long >> running >> > queries (maybe months or up to year), usually there is a need to see >> > results before query is terminated. Thus it will be desired to have a >> way >> > to allow users to specify, e.g. how frequently they want to see some >> result >> > from the query. >> > >> > (The following will be a summary of my proposal. I can convert it to a >> > design doc if people prefer that way. Just let me know) >> > >> > *Priori work* >> > My idea is built on top of "one sql to rule them all paper" [1]. Kudos >> to >> > people who contributed that paper, which became the foundation of my >> > discussion. >> > >> > From [1], an EMIT clause was actually proposed to be added to the end of >> > the query. Two syntax of EMIT clause was also proposed: >> > 1. EMIT after the watermark passes the end of the window. E.g. EMIT >> > [STREAM] AFTER WATERMARK. >> > 2. Delay emit for a period of time after a change happens (e.g. element >> > arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6' >> MINUTES >> > >> > *Typical Streaming emitting latency patterns* >> > 1. Event time triggers. Emitting depends on the relationship between >> > watermark and event timestamp of events. Check this video [2] if you >> want >> > to have an introduction of watermark in the streaming world, and data >> > completeness concept based on event-timestamp. >> > 2. Processing time triggers. Emitting depends on the system clock. This >> is >> > a natural idea of emitting. E.g. emit the current result every hour >> without >> > considering if data in a window is already complete. >> > 3. data-driven triggers. E.g. emit when accumulated events exceed a >> > threshold (e.g. emit when have acculucated 1000 events) >> > 4. Composite triggers. There is a need to concat 1, 2, 3 by OR and AND >> to >> > achieve better latency control. >> > >> > *Proposal to discuss* >> > I want to extend the proposal in [1] and propose EMIT HAVING syntax and >> two >> > aggregation functions. >> > >> > *EMIT HAVING bool_expression*. EMIT HAVING syntax should be after normal >> > HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves >> similar >> > to HAVING. There are two differences that are worth mentioning: >> > 1. It’s not a filter. it controls when table-valued function in FROM >> clause >> > should emit a set. The set is specified by group-by keys. >> > 2.GROUP BY keys are visible to EMIT HAVING while it is not the case for >> > HAVING. >> > >> > *current_watermark(). *current_watermark() is an aggregation function >> that >> > returns a timestamp that provides where watermark is for a set. >> > >> > *processing_time_since_first_element().* >> processing_time_since_first_element() >> > is an aggregation function that returns the system clock time (i.e. >> > processing time) since the first element appears in the set. >> > >> > *Motivating examples* >> > The following will be a list of motivating examples of how this proposal >> > will achieve different ways of latency control. >> > >> > Assume there is a steaming query that apply fixed windowing (thus >> TUMBLE) >> > as the following: >> > >> > “SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) >> AS >> > T”. Based on this query, let’s say derived table T from orders has the >> > following schema: >> > >> > >> > order_id BIGINT >> > >> > product_id BIGINT >> > >> > order_meta STRING >> > >> > event_ts TIMESTAMP >> > >> > window_start TIMESTAMP >> > >> > window_end TIMESTAMP >> > >> > >> > >> > The following will be a table to demonstrate for different trigger >> cases, >> > how the query will be modified by adopting EMIT syntax to express the >> same >> > semantic: >> > >> > Trigger >> > >> > SQL Query >> > >> > AfterWatermark.pastEndOfWindow() >> > (emit after watermark passes end of window, thus for a window data is >> > believed complete) >> > >> > SELECT * >> > >> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T >> > >> > EMIT HAVING current_watermark() >= T.window_end >> > >> > AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration. >> > standardMinutes(5) >> > >> > (emit after a delay of a 5 minutes when first element appear in window) >> > >> > SELECT * >> > >> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T >> > >> > GROUP BY T.window_end >> > >> > EMIT HAVING processing_time_since_first_element() >= INTERVAL 5 MINUTE >> > >> > AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1)) >> > (emit after every event appears after data is really believed complete >> in a >> > window) >> > >> > SELECT * >> > >> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T >> > >> > GROUP BY T.window_end >> > >> > EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1 >> > >> > >> AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane >> > ().plusDuration(Duration.standardMinutes(5)) >> > >> > (emit before a window is complete, by following the delay emit strategy) >> > >> > SELECT * >> > >> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T >> > >> > GROUP BY T.window_end >> > >> > EMIT HAVING current_watermark() < T.window_end AND >> > processing_time_since_first_element() >= INTERVAL 5 MINUTE >> > >> > AfterWatermark.pastEndOfWindow() >> > >> > >> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDuration >> > (Duration.standardMinutes(5)) >> > >> > .withLateFirings(AfterPane.elementCountAtLeast(1)) >> > >> > (a combination of emitting before window closes, emit on window closes >> and >> > emit after window closes) >> > >> > SELECT * >> > >> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T >> > >> > GROUP BY T.window_end >> > >> > EMIT HAVING >> > >> > current_watermark() >= T.window_end OR >> > >> > (current_watermark() > T.window_end AND COUNT(*) > 1) OR >> > >> > (current_watermark() < T.window_end AND >> > processing_time_since_first_element() >= INTERVAL 5 MINUTE) >> > >> > >> > AfterWatermark.pastEndOfWindow() >> > >> > .withAllowedLateness(Duration.standardDays(2))) >> > >> > (emit after window closes plus a tolerance of 2 days late data) >> > >> > SELECT * >> > >> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T >> > >> > GROUP BY T.window_end >> > >> > EMIT HAVING >> > >> > current_watermark() >= T.window_end AND current_watermark() < >> T.window_end >> > + INTERVAL 2 DAY >> > >> > Composite triggers >> > illustrated by examples above that AND and OR can be used to concat >> > different bool expressions. >> > >> > >> > >> > Please let me know your thoughts and any other way you prefer to >> continue >> > discussing. >> > >> > [1]: https://arxiv.org/pdf/1905.12133.pdf >> > [2]: https://www.youtube.com/watch?v=TWxSLmkWPm4 >> > >> > >> > Thanks, >> > Rui Wang >> >
