In the SIGMOD paper, EMIT could be applied to any query. Think of it as executing the query at T, then executing it at T+delta, compare the results, and emit any added or removed records.
Tying it to aggregate queries seems wrong. (If we have a special semantics for aggregate queries, how are we possibly going to make the semantics well-defined for, say, user-defined table functions?) Yes, I know aggregate queries have weird behavior. If you have computed ’select product, sum(amount) from orders group by product’, and a new order with (product 100, amount 2), then you are going to see (100, 6) retracted and (100, 8) added. But I think we have to live with that. Otherwise EMIT semantics get a lot more complicated. Julian > On Jan 21, 2020, at 1:24 PM, Rui Wang <[email protected]> wrote: > > I think there was a big missing in my summary about the position of EMIT > and the execution order (and I forgot about ORDER BY). Try to address them > here: > > SELECT > > [FROM TVF windowing] // windowing happen here > > [WHERE clause] > > [GROUP BY clause] > > [HAVING clause] > > [ORDER BY clause] > > [LIMIT clause] > [EMIT clause] // materialization latency > > The position of EMIT is indeed a bit confusing. As the right execution > order should be: FROM -> EMIT -> others like normal query. FROM is > continuously generating data and EMIT decide when to emit a part of data, > and then other clauses are applied to emitted data and update downstream. > > So at least two open questions: > 1. What should we use for EMIT? EMIT HAVING (can use aggregation columns > like COUNT(*)), EMIT WHERE (can only use single column alias like ORDER BY) > or EMIT AFTER (undefined yet if we want to support expressions, I hope we > do). > 2. Where is the EMIT clause? Maybe the most clear position is to put it > after FROM. > > > -Rui > > > On Tue, Jan 21, 2020 at 1:09 PM Rui Wang <[email protected]> wrote: > >> >> >> On Tue, Jan 21, 2020 at 12:34 PM Julian Hyde <[email protected]> wrote: >> >>> Does EMIT HAVING have anything to do with aggregate queries (GROUP BY >>> and HAVING), or is it just a coincidence that you use the same word, >>> HAVING? >>> >> >> EMIT HAVING is independent from HAVING, but EMIT HAVING does have a >> relationship to GROUP BY: EMIT HAVING requires a GROUP BY. It is a GROUP BY >> key, then apply EMIT HAVING expressions on sets specified by those keys. >> However we could loose the constraint to allow EMIT HAVING appears even >> without GROUP BY, which just means that apply emit control on the whole >> data set than control the emitting per group. >> >> In my opinion, the execution order is: grouping (GROUP BY) -> EMIT control >> (emit having to decide which part of data can be emitted) -> aggregation >> (normal having and other aggregations). For batch/classic DB workload, the >> EMIT step will always emit all data. So such idea is compatible with >> existing DB users. >> >> I happen to choose EMIT HAVING because the emit control is very similar to >> HAVING (and some bits of WHERE) that the only difference is: HAVING is a >> filter while EMIT HAVING control the emit. E.g apply HAVING expressions to >> data means if pass this data to downstream or not. And applying EMIT >> HAVING expressions means if pass this data to downstream now or later (or >> discard it if the window closes). >> >> If you think the idea actually causes confusion rather than convenience to >> onboard people to use steaming sql, we can replace EMIT HAVING by EMIT >> AFTER, per the original design. >> >> I support the idea of latency controls, but I am nervous about >>> allowing full expressions in the EMIT clause if we don't have to. >>> >>> >> Yep. It's a design choice of allowing expressions or keeping a set of >> dedicated SQL syntax for some emit strategies. If don't use extensible EMIT >> expressions, we likely will need to provide a long list of syntax for >> different emit strategies. For examples: >> EMIT AFTER WATERMARK >> EMIT AFTER DELAY >> EMIT AFTER AFTER WATERMARK BUT LATE >> EMIT AFTER COUNT >> EMIT AFTER BEFORE WATERMARK >> etc. >> >> Again it's a design choice so I am open to both ideas. >> >> However, personally I prefer the EMIT expressions idea because I found it >> was very easy to explain EMIT expressions to SQL people who don't have much >> streaming brackgroup. Basically you can say EMIT expressions are just >> applied to rows of table from table-valued function. If there are GROUP BY, >> each apply expression to each group accordingly. and the result of >> expression indicates if it's ready to emit. This easiness is also mainly >> from that we have introduced window start and window end into the table, >> thus we should have all data we needed in table to write expressions >> against them (except for processing time triggers). >> >> The downside of expressions though is people will be allowed to write any >> expression they want, and engines will take responsibility to validate >> those. >> >> >> >>> Aggregate queries have a peculiar scope. (You can only reference >>> grouped expressions, for example.) I don't think we should drag that >>> peculiar scope into the EMIT clause. The simplest thing is to only >>> allow the EMIT clause to reference column aliases, which is similar to >>> the scope used by ORDER BY. >>> >>> That sounds like one of the prototype ideas: EMIT WHERE. In EMIT WHERE, >> just like WHERE, you will not reference aggregation columns but just column >> aliases. It can solve the event time trigger and processing time triggers. >> However, it has a shortcoming of cannot express: EMIT WHEN there are 100 >> elements accumulated, which require a COUNT(*). >> >> >> >>> Am I allowed to combine EMIT and ORDER BY in the same SELECT block? If >>> so, which comes first? >>> >>> >> Sorry I forgot about ORDER BY. In my opinion EMIT should be applied after >> FROM, but before all other WHERE, HAVING (or aggregation), ORDER BY. Note >> that EMIT controls when to emit data from the streaming dataset, thus all >> filters and aggregations should be applied after data is read to emit. Note >> again to emphasize that for classic DB/batch cases, EMIT is explicitly >> there which just EMIT all data once after FROM as all data is already >> known. >> >> >> On Tue, Jan 21, 2020 at 12:41 PM Julian Hyde <[email protected]> wrote: >> >>> One more thought. Since EMIT produces a relation, your "EMIT HAVING >>> current_watermark() >= T.window_end AND current_watermark() < >>> T.window_end + INTERVAL 2 DAY" could perhaps be accomplished by >>> wrapping the EMIT query as a sub-query and using ordinal SQL >>> expressions on the system columns added by EMIT. (I'm not saying we >>> should do this. But when designing a feature, it's worth calling out >>> whether it adds power or whether it is syntactic sugar.) >> >> >> Sounds like just do a normal WHERE based on the table from sub-query. yep >> that's an option. Then open question from it is if we want to mix "EMIT'' >> semantic (which is not really a classic SQL filter) or "Filter" semantic >> (WHERE and HAVING) into existing WHERE and HAVING. I prefer not to, to >> leave classic DB queries unchanged (their WHERE is just a per row filter >> which having any new semantic, e.g. latency control, added) >> >> >> >>> On Tue, Jan 21, 2020 at 11:22 AM Rui Wang <[email protected]> wrote: >>>> >>>> Hi community, >>>> >>>> First of all, thanks all your help on the CALCITE-3272 (TUMBLE now >>> works as >>>> a table-valued function). As it starts to work, I want to continue >>>> discussing the following work of streaming sql: a way to control >>>> materialization latency of a streaming query. A small note for people >>> who >>>> are not familiar with streaming: because steaming queries are long >>> running >>>> queries (maybe months or up to year), usually there is a need to see >>>> results before query is terminated. Thus it will be desired to have a >>> way >>>> to allow users to specify, e.g. how frequently they want to see some >>> result >>>> from the query. >>>> >>>> (The following will be a summary of my proposal. I can convert it to a >>>> design doc if people prefer that way. Just let me know) >>>> >>>> *Priori work* >>>> My idea is built on top of "one sql to rule them all paper" [1]. Kudos >>> to >>>> people who contributed that paper, which became the foundation of my >>>> discussion. >>>> >>>> From [1], an EMIT clause was actually proposed to be added to the end of >>>> the query. Two syntax of EMIT clause was also proposed: >>>> 1. EMIT after the watermark passes the end of the window. E.g. EMIT >>>> [STREAM] AFTER WATERMARK. >>>> 2. Delay emit for a period of time after a change happens (e.g. element >>>> arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6' >>> MINUTES >>>> >>>> *Typical Streaming emitting latency patterns* >>>> 1. Event time triggers. Emitting depends on the relationship between >>>> watermark and event timestamp of events. Check this video [2] if you >>> want >>>> to have an introduction of watermark in the streaming world, and data >>>> completeness concept based on event-timestamp. >>>> 2. Processing time triggers. Emitting depends on the system clock. This >>> is >>>> a natural idea of emitting. E.g. emit the current result every hour >>> without >>>> considering if data in a window is already complete. >>>> 3. data-driven triggers. E.g. emit when accumulated events exceed a >>>> threshold (e.g. emit when have acculucated 1000 events) >>>> 4. Composite triggers. There is a need to concat 1, 2, 3 by OR and AND >>> to >>>> achieve better latency control. >>>> >>>> *Proposal to discuss* >>>> I want to extend the proposal in [1] and propose EMIT HAVING syntax and >>> two >>>> aggregation functions. >>>> >>>> *EMIT HAVING bool_expression*. EMIT HAVING syntax should be after normal >>>> HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves >>> similar >>>> to HAVING. There are two differences that are worth mentioning: >>>> 1. It’s not a filter. it controls when table-valued function in FROM >>> clause >>>> should emit a set. The set is specified by group-by keys. >>>> 2.GROUP BY keys are visible to EMIT HAVING while it is not the case for >>>> HAVING. >>>> >>>> *current_watermark(). *current_watermark() is an aggregation function >>> that >>>> returns a timestamp that provides where watermark is for a set. >>>> >>>> *processing_time_since_first_element().* >>> processing_time_since_first_element() >>>> is an aggregation function that returns the system clock time (i.e. >>>> processing time) since the first element appears in the set. >>>> >>>> *Motivating examples* >>>> The following will be a list of motivating examples of how this proposal >>>> will achieve different ways of latency control. >>>> >>>> Assume there is a steaming query that apply fixed windowing (thus >>> TUMBLE) >>>> as the following: >>>> >>>> “SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) >>> AS >>>> T”. Based on this query, let’s say derived table T from orders has the >>>> following schema: >>>> >>>> >>>> order_id BIGINT >>>> >>>> product_id BIGINT >>>> >>>> order_meta STRING >>>> >>>> event_ts TIMESTAMP >>>> >>>> window_start TIMESTAMP >>>> >>>> window_end TIMESTAMP >>>> >>>> >>>> >>>> The following will be a table to demonstrate for different trigger >>> cases, >>>> how the query will be modified by adopting EMIT syntax to express the >>> same >>>> semantic: >>>> >>>> Trigger >>>> >>>> SQL Query >>>> >>>> AfterWatermark.pastEndOfWindow() >>>> (emit after watermark passes end of window, thus for a window data is >>>> believed complete) >>>> >>>> SELECT * >>>> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T >>>> >>>> EMIT HAVING current_watermark() >= T.window_end >>>> >>>> AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration. >>>> standardMinutes(5) >>>> >>>> (emit after a delay of a 5 minutes when first element appear in window) >>>> >>>> SELECT * >>>> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T >>>> >>>> GROUP BY T.window_end >>>> >>>> EMIT HAVING processing_time_since_first_element() >= INTERVAL 5 MINUTE >>>> >>>> AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1)) >>>> (emit after every event appears after data is really believed complete >>> in a >>>> window) >>>> >>>> SELECT * >>>> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T >>>> >>>> GROUP BY T.window_end >>>> >>>> EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1 >>>> >>>> >>> AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane >>>> ().plusDuration(Duration.standardMinutes(5)) >>>> >>>> (emit before a window is complete, by following the delay emit strategy) >>>> >>>> SELECT * >>>> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T >>>> >>>> GROUP BY T.window_end >>>> >>>> EMIT HAVING current_watermark() < T.window_end AND >>>> processing_time_since_first_element() >= INTERVAL 5 MINUTE >>>> >>>> AfterWatermark.pastEndOfWindow() >>>> >>>> >>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDuration >>>> (Duration.standardMinutes(5)) >>>> >>>> .withLateFirings(AfterPane.elementCountAtLeast(1)) >>>> >>>> (a combination of emitting before window closes, emit on window closes >>> and >>>> emit after window closes) >>>> >>>> SELECT * >>>> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T >>>> >>>> GROUP BY T.window_end >>>> >>>> EMIT HAVING >>>> >>>> current_watermark() >= T.window_end OR >>>> >>>> (current_watermark() > T.window_end AND COUNT(*) > 1) OR >>>> >>>> (current_watermark() < T.window_end AND >>>> processing_time_since_first_element() >= INTERVAL 5 MINUTE) >>>> >>>> >>>> AfterWatermark.pastEndOfWindow() >>>> >>>> .withAllowedLateness(Duration.standardDays(2))) >>>> >>>> (emit after window closes plus a tolerance of 2 days late data) >>>> >>>> SELECT * >>>> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T >>>> >>>> GROUP BY T.window_end >>>> >>>> EMIT HAVING >>>> >>>> current_watermark() >= T.window_end AND current_watermark() < >>> T.window_end >>>> + INTERVAL 2 DAY >>>> >>>> Composite triggers >>>> illustrated by examples above that AND and OR can be used to concat >>>> different bool expressions. >>>> >>>> >>>> >>>> Please let me know your thoughts and any other way you prefer to >>> continue >>>> discussing. >>>> >>>> [1]: https://arxiv.org/pdf/1905.12133.pdf >>>> [2]: https://www.youtube.com/watch?v=TWxSLmkWPm4 >>>> >>>> >>>> Thanks, >>>> Rui Wang >>> >>
