A small update: I tried to prototype how to support just "EMIT AFTER WATERMARK". Seems like supporting such syntax in planner without alias reference is pretty straightforward.
Regarding to engine implementation (e.g. Enumerables), while I am still prototyping, I feel like the implementation will be the same as "EMIT window_start >= current_watermark()": Basically we need a way for different engines to provide their watermark, and seems like function/operator is a handy way. In this case what is in plans with EMIT could still be a RexNode that gives an expression between watermark (a operator that leaves for engine to implement) and a column. etc. -Rui On Wed, Jan 22, 2020 at 12:18 PM Rui Wang <[email protected]> wrote: > That makes sense. I will remove the aggregation constraint (i.e. EMIT > requires a GROUP BY). Let's say now EMIT should work on any query for > general purpose. > > Because the above contains too much information, let me further summarize > critical points here and see if we could reach a consensus: > > 1. Do we agree EMIT should execute after FROM, but before any other > clauses, assuming EMIT works on any query? > My opinion is EMIT should execute after FROM. It actually matches what > Julian has said: "Think of it as executing the query at T, then executing > it at T+delta". Emit just controls how large the delta is. And all other > comparisons are just the following WHERE, GROUP BY, HAVING, ORDER BY, > LIMIT, etc. It will also match with DB cases, where EMIT produces a single > delta once that is from -inf to +inf on the timeline. > > > 2. Can we support EMIT predicates rather than a list of fixed emit > strategy? > To recap, pros and cons of EMIT predicates: > pros: 1) extensible with several predefined functions. And if there is a > new need, it will very likely to define a function than defining > new keywords/syntax. 2) Easy to understand (think about it will be applied > to tables to decide when to emit rows). > cons: 1) Users will gain a lot of powers to write expressions. > pros and cons of special EMIT strategy syntax: > pros: 1) uses will not have a lot of power to write expressions as syntax > is fixed (they can tune a few parameters though) > cons: 1) had trouble explaining it to SQL people (it sounds like a hack). > 2) there are 5 or more strategies known so we need a list that is longer > than what was proposed in paper. 3) Potential backward-compatible issues in > case emit strategies change. > > Lastly, for the table evolving problem that Julian mentioned (e.g. see > (100, 6) retracted and (100, 8) added), I totally agree with it because of > the nature of streaming: you probably never know if data is complete when a > result emits, thus the result could be updated later. > > > -Rui > > > On Wed, Jan 22, 2020 at 11:29 AM Julian Hyde <[email protected]> > wrote: > >> In the SIGMOD paper, EMIT could be applied to any query. Think of it as >> executing the query at T, then executing it at T+delta, compare the >> results, and emit any added or removed records. >> >> Tying it to aggregate queries seems wrong. (If we have a special >> semantics for aggregate queries, how are we possibly going to make the >> semantics well-defined for, say, user-defined table functions?) >> >> Yes, I know aggregate queries have weird behavior. If you have computed >> ’select product, sum(amount) from orders group by product’, and a new order >> with (product 100, amount 2), then you are going to see (100, 6) retracted >> and (100, 8) added. But I think we have to live with that. Otherwise EMIT >> semantics get a lot more complicated. >> >> Julian >> >> >> > On Jan 21, 2020, at 1:24 PM, Rui Wang <[email protected]> wrote: >> > >> > I think there was a big missing in my summary about the position of EMIT >> > and the execution order (and I forgot about ORDER BY). Try to address >> them >> > here: >> > >> > SELECT >> > >> > [FROM TVF windowing] // windowing happen here >> > >> > [WHERE clause] >> > >> > [GROUP BY clause] >> > >> > [HAVING clause] >> > >> > [ORDER BY clause] >> > >> > [LIMIT clause] >> > [EMIT clause] // materialization latency >> > >> > The position of EMIT is indeed a bit confusing. As the right execution >> > order should be: FROM -> EMIT -> others like normal query. FROM is >> > continuously generating data and EMIT decide when to emit a part of >> data, >> > and then other clauses are applied to emitted data and update >> downstream. >> > >> > So at least two open questions: >> > 1. What should we use for EMIT? EMIT HAVING (can use aggregation columns >> > like COUNT(*)), EMIT WHERE (can only use single column alias like ORDER >> BY) >> > or EMIT AFTER (undefined yet if we want to support expressions, I hope >> we >> > do). >> > 2. Where is the EMIT clause? Maybe the most clear position is to put it >> > after FROM. >> > >> > >> > -Rui >> > >> > >> > On Tue, Jan 21, 2020 at 1:09 PM Rui Wang <[email protected]> wrote: >> > >> >> >> >> >> >> On Tue, Jan 21, 2020 at 12:34 PM Julian Hyde <[email protected]> wrote: >> >> >> >>> Does EMIT HAVING have anything to do with aggregate queries (GROUP BY >> >>> and HAVING), or is it just a coincidence that you use the same word, >> >>> HAVING? >> >>> >> >> >> >> EMIT HAVING is independent from HAVING, but EMIT HAVING does have a >> >> relationship to GROUP BY: EMIT HAVING requires a GROUP BY. It is a >> GROUP BY >> >> key, then apply EMIT HAVING expressions on sets specified by those >> keys. >> >> However we could loose the constraint to allow EMIT HAVING appears even >> >> without GROUP BY, which just means that apply emit control on the whole >> >> data set than control the emitting per group. >> >> >> >> In my opinion, the execution order is: grouping (GROUP BY) -> EMIT >> control >> >> (emit having to decide which part of data can be emitted) -> >> aggregation >> >> (normal having and other aggregations). For batch/classic DB workload, >> the >> >> EMIT step will always emit all data. So such idea is compatible with >> >> existing DB users. >> >> >> >> I happen to choose EMIT HAVING because the emit control is very >> similar to >> >> HAVING (and some bits of WHERE) that the only difference is: HAVING is >> a >> >> filter while EMIT HAVING control the emit. E.g apply HAVING >> expressions to >> >> data means if pass this data to downstream or not. And applying EMIT >> >> HAVING expressions means if pass this data to downstream now or later >> (or >> >> discard it if the window closes). >> >> >> >> If you think the idea actually causes confusion rather than >> convenience to >> >> onboard people to use steaming sql, we can replace EMIT HAVING by EMIT >> >> AFTER, per the original design. >> >> >> >> I support the idea of latency controls, but I am nervous about >> >>> allowing full expressions in the EMIT clause if we don't have to. >> >>> >> >>> >> >> Yep. It's a design choice of allowing expressions or keeping a set of >> >> dedicated SQL syntax for some emit strategies. If don't use extensible >> EMIT >> >> expressions, we likely will need to provide a long list of syntax for >> >> different emit strategies. For examples: >> >> EMIT AFTER WATERMARK >> >> EMIT AFTER DELAY >> >> EMIT AFTER AFTER WATERMARK BUT LATE >> >> EMIT AFTER COUNT >> >> EMIT AFTER BEFORE WATERMARK >> >> etc. >> >> >> >> Again it's a design choice so I am open to both ideas. >> >> >> >> However, personally I prefer the EMIT expressions idea because I found >> it >> >> was very easy to explain EMIT expressions to SQL people who don't have >> much >> >> streaming brackgroup. Basically you can say EMIT expressions are just >> >> applied to rows of table from table-valued function. If there are >> GROUP BY, >> >> each apply expression to each group accordingly. and the result of >> >> expression indicates if it's ready to emit. This easiness is also >> mainly >> >> from that we have introduced window start and window end into the >> table, >> >> thus we should have all data we needed in table to write expressions >> >> against them (except for processing time triggers). >> >> >> >> The downside of expressions though is people will be allowed to write >> any >> >> expression they want, and engines will take responsibility to validate >> >> those. >> >> >> >> >> >> >> >>> Aggregate queries have a peculiar scope. (You can only reference >> >>> grouped expressions, for example.) I don't think we should drag that >> >>> peculiar scope into the EMIT clause. The simplest thing is to only >> >>> allow the EMIT clause to reference column aliases, which is similar to >> >>> the scope used by ORDER BY. >> >>> >> >>> That sounds like one of the prototype ideas: EMIT WHERE. In EMIT >> WHERE, >> >> just like WHERE, you will not reference aggregation columns but just >> column >> >> aliases. It can solve the event time trigger and processing time >> triggers. >> >> However, it has a shortcoming of cannot express: EMIT WHEN there are >> 100 >> >> elements accumulated, which require a COUNT(*). >> >> >> >> >> >> >> >>> Am I allowed to combine EMIT and ORDER BY in the same SELECT block? If >> >>> so, which comes first? >> >>> >> >>> >> >> Sorry I forgot about ORDER BY. In my opinion EMIT should be applied >> after >> >> FROM, but before all other WHERE, HAVING (or aggregation), ORDER BY. >> Note >> >> that EMIT controls when to emit data from the streaming dataset, thus >> all >> >> filters and aggregations should be applied after data is read to emit. >> Note >> >> again to emphasize that for classic DB/batch cases, EMIT is explicitly >> >> there which just EMIT all data once after FROM as all data is already >> >> known. >> >> >> >> >> >> On Tue, Jan 21, 2020 at 12:41 PM Julian Hyde <[email protected]> wrote: >> >> >> >>> One more thought. Since EMIT produces a relation, your "EMIT HAVING >> >>> current_watermark() >= T.window_end AND current_watermark() < >> >>> T.window_end + INTERVAL 2 DAY" could perhaps be accomplished by >> >>> wrapping the EMIT query as a sub-query and using ordinal SQL >> >>> expressions on the system columns added by EMIT. (I'm not saying we >> >>> should do this. But when designing a feature, it's worth calling out >> >>> whether it adds power or whether it is syntactic sugar.) >> >> >> >> >> >> Sounds like just do a normal WHERE based on the table from sub-query. >> yep >> >> that's an option. Then open question from it is if we want to mix >> "EMIT'' >> >> semantic (which is not really a classic SQL filter) or "Filter" >> semantic >> >> (WHERE and HAVING) into existing WHERE and HAVING. I prefer not to, to >> >> leave classic DB queries unchanged (their WHERE is just a per row >> filter >> >> which having any new semantic, e.g. latency control, added) >> >> >> >> >> >> >> >>> On Tue, Jan 21, 2020 at 11:22 AM Rui Wang <[email protected]> >> wrote: >> >>>> >> >>>> Hi community, >> >>>> >> >>>> First of all, thanks all your help on the CALCITE-3272 (TUMBLE now >> >>> works as >> >>>> a table-valued function). As it starts to work, I want to continue >> >>>> discussing the following work of streaming sql: a way to control >> >>>> materialization latency of a streaming query. A small note for people >> >>> who >> >>>> are not familiar with streaming: because steaming queries are long >> >>> running >> >>>> queries (maybe months or up to year), usually there is a need to see >> >>>> results before query is terminated. Thus it will be desired to have a >> >>> way >> >>>> to allow users to specify, e.g. how frequently they want to see some >> >>> result >> >>>> from the query. >> >>>> >> >>>> (The following will be a summary of my proposal. I can convert it to >> a >> >>>> design doc if people prefer that way. Just let me know) >> >>>> >> >>>> *Priori work* >> >>>> My idea is built on top of "one sql to rule them all paper" [1]. >> Kudos >> >>> to >> >>>> people who contributed that paper, which became the foundation of my >> >>>> discussion. >> >>>> >> >>>> From [1], an EMIT clause was actually proposed to be added to the >> end of >> >>>> the query. Two syntax of EMIT clause was also proposed: >> >>>> 1. EMIT after the watermark passes the end of the window. E.g. EMIT >> >>>> [STREAM] AFTER WATERMARK. >> >>>> 2. Delay emit for a period of time after a change happens (e.g. >> element >> >>>> arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6' >> >>> MINUTES >> >>>> >> >>>> *Typical Streaming emitting latency patterns* >> >>>> 1. Event time triggers. Emitting depends on the relationship between >> >>>> watermark and event timestamp of events. Check this video [2] if you >> >>> want >> >>>> to have an introduction of watermark in the streaming world, and data >> >>>> completeness concept based on event-timestamp. >> >>>> 2. Processing time triggers. Emitting depends on the system clock. >> This >> >>> is >> >>>> a natural idea of emitting. E.g. emit the current result every hour >> >>> without >> >>>> considering if data in a window is already complete. >> >>>> 3. data-driven triggers. E.g. emit when accumulated events exceed a >> >>>> threshold (e.g. emit when have acculucated 1000 events) >> >>>> 4. Composite triggers. There is a need to concat 1, 2, 3 by OR and >> AND >> >>> to >> >>>> achieve better latency control. >> >>>> >> >>>> *Proposal to discuss* >> >>>> I want to extend the proposal in [1] and propose EMIT HAVING syntax >> and >> >>> two >> >>>> aggregation functions. >> >>>> >> >>>> *EMIT HAVING bool_expression*. EMIT HAVING syntax should be after >> normal >> >>>> HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves >> >>> similar >> >>>> to HAVING. There are two differences that are worth mentioning: >> >>>> 1. It’s not a filter. it controls when table-valued function in FROM >> >>> clause >> >>>> should emit a set. The set is specified by group-by keys. >> >>>> 2.GROUP BY keys are visible to EMIT HAVING while it is not the case >> for >> >>>> HAVING. >> >>>> >> >>>> *current_watermark(). *current_watermark() is an aggregation function >> >>> that >> >>>> returns a timestamp that provides where watermark is for a set. >> >>>> >> >>>> *processing_time_since_first_element().* >> >>> processing_time_since_first_element() >> >>>> is an aggregation function that returns the system clock time (i.e. >> >>>> processing time) since the first element appears in the set. >> >>>> >> >>>> *Motivating examples* >> >>>> The following will be a list of motivating examples of how this >> proposal >> >>>> will achieve different ways of latency control. >> >>>> >> >>>> Assume there is a steaming query that apply fixed windowing (thus >> >>> TUMBLE) >> >>>> as the following: >> >>>> >> >>>> “SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 >> MINUTE) >> >>> AS >> >>>> T”. Based on this query, let’s say derived table T from orders has >> the >> >>>> following schema: >> >>>> >> >>>> >> >>>> order_id BIGINT >> >>>> >> >>>> product_id BIGINT >> >>>> >> >>>> order_meta STRING >> >>>> >> >>>> event_ts TIMESTAMP >> >>>> >> >>>> window_start TIMESTAMP >> >>>> >> >>>> window_end TIMESTAMP >> >>>> >> >>>> >> >>>> >> >>>> The following will be a table to demonstrate for different trigger >> >>> cases, >> >>>> how the query will be modified by adopting EMIT syntax to express the >> >>> same >> >>>> semantic: >> >>>> >> >>>> Trigger >> >>>> >> >>>> SQL Query >> >>>> >> >>>> AfterWatermark.pastEndOfWindow() >> >>>> (emit after watermark passes end of window, thus for a window data is >> >>>> believed complete) >> >>>> >> >>>> SELECT * >> >>>> >> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T >> >>>> >> >>>> EMIT HAVING current_watermark() >= T.window_end >> >>>> >> >>>> AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration. >> >>>> standardMinutes(5) >> >>>> >> >>>> (emit after a delay of a 5 minutes when first element appear in >> window) >> >>>> >> >>>> SELECT * >> >>>> >> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T >> >>>> >> >>>> GROUP BY T.window_end >> >>>> >> >>>> EMIT HAVING processing_time_since_first_element() >= INTERVAL 5 >> MINUTE >> >>>> >> >>>> AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1)) >> >>>> (emit after every event appears after data is really believed >> complete >> >>> in a >> >>>> window) >> >>>> >> >>>> SELECT * >> >>>> >> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T >> >>>> >> >>>> GROUP BY T.window_end >> >>>> >> >>>> EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1 >> >>>> >> >>>> >> >>> >> AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane >> >>>> ().plusDuration(Duration.standardMinutes(5)) >> >>>> >> >>>> (emit before a window is complete, by following the delay emit >> strategy) >> >>>> >> >>>> SELECT * >> >>>> >> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T >> >>>> >> >>>> GROUP BY T.window_end >> >>>> >> >>>> EMIT HAVING current_watermark() < T.window_end AND >> >>>> processing_time_since_first_element() >= INTERVAL 5 MINUTE >> >>>> >> >>>> AfterWatermark.pastEndOfWindow() >> >>>> >> >>>> >> >>> >> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDuration >> >>>> (Duration.standardMinutes(5)) >> >>>> >> >>>> .withLateFirings(AfterPane.elementCountAtLeast(1)) >> >>>> >> >>>> (a combination of emitting before window closes, emit on window >> closes >> >>> and >> >>>> emit after window closes) >> >>>> >> >>>> SELECT * >> >>>> >> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T >> >>>> >> >>>> GROUP BY T.window_end >> >>>> >> >>>> EMIT HAVING >> >>>> >> >>>> current_watermark() >= T.window_end OR >> >>>> >> >>>> (current_watermark() > T.window_end AND COUNT(*) > 1) OR >> >>>> >> >>>> (current_watermark() < T.window_end AND >> >>>> processing_time_since_first_element() >= INTERVAL 5 MINUTE) >> >>>> >> >>>> >> >>>> AfterWatermark.pastEndOfWindow() >> >>>> >> >>>> .withAllowedLateness(Duration.standardDays(2))) >> >>>> >> >>>> (emit after window closes plus a tolerance of 2 days late data) >> >>>> >> >>>> SELECT * >> >>>> >> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T >> >>>> >> >>>> GROUP BY T.window_end >> >>>> >> >>>> EMIT HAVING >> >>>> >> >>>> current_watermark() >= T.window_end AND current_watermark() < >> >>> T.window_end >> >>>> + INTERVAL 2 DAY >> >>>> >> >>>> Composite triggers >> >>>> illustrated by examples above that AND and OR can be used to concat >> >>>> different bool expressions. >> >>>> >> >>>> >> >>>> >> >>>> Please let me know your thoughts and any other way you prefer to >> >>> continue >> >>>> discussing. >> >>>> >> >>>> [1]: https://arxiv.org/pdf/1905.12133.pdf >> >>>> [2]: https://www.youtube.com/watch?v=TWxSLmkWPm4 >> >>>> >> >>>> >> >>>> Thanks, >> >>>> Rui Wang >> >>> >> >> >> >>
