On Tue, Jan 21, 2020 at 12:34 PM Julian Hyde <[email protected]> wrote:
> Does EMIT HAVING have anything to do with aggregate queries (GROUP BY > and HAVING), or is it just a coincidence that you use the same word, > HAVING? > EMIT HAVING is independent from HAVING, but EMIT HAVING does have a relationship to GROUP BY: EMIT HAVING requires a GROUP BY. It is a GROUP BY key, then apply EMIT HAVING expressions on sets specified by those keys. However we could loose the constraint to allow EMIT HAVING appears even without GROUP BY, which just means that apply emit control on the whole data set than control the emitting per group. In my opinion, the execution order is: grouping (GROUP BY) -> EMIT control (emit having to decide which part of data can be emitted) -> aggregation (normal having and other aggregations). For batch/classic DB workload, the EMIT step will always emit all data. So such idea is compatible with existing DB users. I happen to choose EMIT HAVING because the emit control is very similar to HAVING (and some bits of WHERE) that the only difference is: HAVING is a filter while EMIT HAVING control the emit. E.g apply HAVING expressions to data means if pass this data to downstream or not. And applying EMIT HAVING expressions means if pass this data to downstream now or later (or discard it if the window closes). If you think the idea actually causes confusion rather than convenience to onboard people to use steaming sql, we can replace EMIT HAVING by EMIT AFTER, per the original design. I support the idea of latency controls, but I am nervous about > allowing full expressions in the EMIT clause if we don't have to. > > Yep. It's a design choice of allowing expressions or keeping a set of dedicated SQL syntax for some emit strategies. If don't use extensible EMIT expressions, we likely will need to provide a long list of syntax for different emit strategies. For examples: EMIT AFTER WATERMARK EMIT AFTER DELAY EMIT AFTER AFTER WATERMARK BUT LATE EMIT AFTER COUNT EMIT AFTER BEFORE WATERMARK etc. Again it's a design choice so I am open to both ideas. However, personally I prefer the EMIT expressions idea because I found it was very easy to explain EMIT expressions to SQL people who don't have much streaming brackgroup. Basically you can say EMIT expressions are just applied to rows of table from table-valued function. If there are GROUP BY, each apply expression to each group accordingly. and the result of expression indicates if it's ready to emit. This easiness is also mainly from that we have introduced window start and window end into the table, thus we should have all data we needed in table to write expressions against them (except for processing time triggers). The downside of expressions though is people will be allowed to write any expression they want, and engines will take responsibility to validate those. > Aggregate queries have a peculiar scope. (You can only reference > grouped expressions, for example.) I don't think we should drag that > peculiar scope into the EMIT clause. The simplest thing is to only > allow the EMIT clause to reference column aliases, which is similar to > the scope used by ORDER BY. > > That sounds like one of the prototype ideas: EMIT WHERE. In EMIT WHERE, just like WHERE, you will not reference aggregation columns but just column aliases. It can solve the event time trigger and processing time triggers. However, it has a shortcoming of cannot express: EMIT WHEN there are 100 elements accumulated, which require a COUNT(*). > Am I allowed to combine EMIT and ORDER BY in the same SELECT block? If > so, which comes first? > > Sorry I forgot about ORDER BY. In my opinion EMIT should be applied after FROM, but before all other WHERE, HAVING (or aggregation), ORDER BY. Note that EMIT controls when to emit data from the streaming dataset, thus all filters and aggregations should be applied after data is read to emit. Note again to emphasize that for classic DB/batch cases, EMIT is explicitly there which just EMIT all data once after FROM as all data is already known. On Tue, Jan 21, 2020 at 12:41 PM Julian Hyde <[email protected]> wrote: > One more thought. Since EMIT produces a relation, your "EMIT HAVING > current_watermark() >= T.window_end AND current_watermark() < > T.window_end + INTERVAL 2 DAY" could perhaps be accomplished by > wrapping the EMIT query as a sub-query and using ordinal SQL > expressions on the system columns added by EMIT. (I'm not saying we > should do this. But when designing a feature, it's worth calling out > whether it adds power or whether it is syntactic sugar.) Sounds like just do a normal WHERE based on the table from sub-query. yep that's an option. Then open question from it is if we want to mix "EMIT'' semantic (which is not really a classic SQL filter) or "Filter" semantic (WHERE and HAVING) into existing WHERE and HAVING. I prefer not to, to leave classic DB queries unchanged (their WHERE is just a per row filter which having any new semantic, e.g. latency control, added) > On Tue, Jan 21, 2020 at 11:22 AM Rui Wang <[email protected]> wrote: > > > > Hi community, > > > > First of all, thanks all your help on the CALCITE-3272 (TUMBLE now works > as > > a table-valued function). As it starts to work, I want to continue > > discussing the following work of streaming sql: a way to control > > materialization latency of a streaming query. A small note for people who > > are not familiar with streaming: because steaming queries are long > running > > queries (maybe months or up to year), usually there is a need to see > > results before query is terminated. Thus it will be desired to have a way > > to allow users to specify, e.g. how frequently they want to see some > result > > from the query. > > > > (The following will be a summary of my proposal. I can convert it to a > > design doc if people prefer that way. Just let me know) > > > > *Priori work* > > My idea is built on top of "one sql to rule them all paper" [1]. Kudos to > > people who contributed that paper, which became the foundation of my > > discussion. > > > > From [1], an EMIT clause was actually proposed to be added to the end of > > the query. Two syntax of EMIT clause was also proposed: > > 1. EMIT after the watermark passes the end of the window. E.g. EMIT > > [STREAM] AFTER WATERMARK. > > 2. Delay emit for a period of time after a change happens (e.g. element > > arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6' MINUTES > > > > *Typical Streaming emitting latency patterns* > > 1. Event time triggers. Emitting depends on the relationship between > > watermark and event timestamp of events. Check this video [2] if you want > > to have an introduction of watermark in the streaming world, and data > > completeness concept based on event-timestamp. > > 2. Processing time triggers. Emitting depends on the system clock. This > is > > a natural idea of emitting. E.g. emit the current result every hour > without > > considering if data in a window is already complete. > > 3. data-driven triggers. E.g. emit when accumulated events exceed a > > threshold (e.g. emit when have acculucated 1000 events) > > 4. Composite triggers. There is a need to concat 1, 2, 3 by OR and AND to > > achieve better latency control. > > > > *Proposal to discuss* > > I want to extend the proposal in [1] and propose EMIT HAVING syntax and > two > > aggregation functions. > > > > *EMIT HAVING bool_expression*. EMIT HAVING syntax should be after normal > > HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves > similar > > to HAVING. There are two differences that are worth mentioning: > > 1. It’s not a filter. it controls when table-valued function in FROM > clause > > should emit a set. The set is specified by group-by keys. > > 2.GROUP BY keys are visible to EMIT HAVING while it is not the case for > > HAVING. > > > > *current_watermark(). *current_watermark() is an aggregation function > that > > returns a timestamp that provides where watermark is for a set. > > > > *processing_time_since_first_element().* > processing_time_since_first_element() > > is an aggregation function that returns the system clock time (i.e. > > processing time) since the first element appears in the set. > > > > *Motivating examples* > > The following will be a list of motivating examples of how this proposal > > will achieve different ways of latency control. > > > > Assume there is a steaming query that apply fixed windowing (thus TUMBLE) > > as the following: > > > > “SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) > AS > > T”. Based on this query, let’s say derived table T from orders has the > > following schema: > > > > > > order_id BIGINT > > > > product_id BIGINT > > > > order_meta STRING > > > > event_ts TIMESTAMP > > > > window_start TIMESTAMP > > > > window_end TIMESTAMP > > > > > > > > The following will be a table to demonstrate for different trigger cases, > > how the query will be modified by adopting EMIT syntax to express the > same > > semantic: > > > > Trigger > > > > SQL Query > > > > AfterWatermark.pastEndOfWindow() > > (emit after watermark passes end of window, thus for a window data is > > believed complete) > > > > SELECT * > > > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > > > EMIT HAVING current_watermark() >= T.window_end > > > > AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration. > > standardMinutes(5) > > > > (emit after a delay of a 5 minutes when first element appear in window) > > > > SELECT * > > > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > > > GROUP BY T.window_end > > > > EMIT HAVING processing_time_since_first_element() >= INTERVAL 5 MINUTE > > > > AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1)) > > (emit after every event appears after data is really believed complete > in a > > window) > > > > SELECT * > > > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > > > GROUP BY T.window_end > > > > EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1 > > > > > AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane > > ().plusDuration(Duration.standardMinutes(5)) > > > > (emit before a window is complete, by following the delay emit strategy) > > > > SELECT * > > > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > > > GROUP BY T.window_end > > > > EMIT HAVING current_watermark() < T.window_end AND > > processing_time_since_first_element() >= INTERVAL 5 MINUTE > > > > AfterWatermark.pastEndOfWindow() > > > > > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDuration > > (Duration.standardMinutes(5)) > > > > .withLateFirings(AfterPane.elementCountAtLeast(1)) > > > > (a combination of emitting before window closes, emit on window closes > and > > emit after window closes) > > > > SELECT * > > > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > > > GROUP BY T.window_end > > > > EMIT HAVING > > > > current_watermark() >= T.window_end OR > > > > (current_watermark() > T.window_end AND COUNT(*) > 1) OR > > > > (current_watermark() < T.window_end AND > > processing_time_since_first_element() >= INTERVAL 5 MINUTE) > > > > > > AfterWatermark.pastEndOfWindow() > > > > .withAllowedLateness(Duration.standardDays(2))) > > > > (emit after window closes plus a tolerance of 2 days late data) > > > > SELECT * > > > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > > > GROUP BY T.window_end > > > > EMIT HAVING > > > > current_watermark() >= T.window_end AND current_watermark() < > T.window_end > > + INTERVAL 2 DAY > > > > Composite triggers > > illustrated by examples above that AND and OR can be used to concat > > different bool expressions. > > > > > > > > Please let me know your thoughts and any other way you prefer to continue > > discussing. > > > > [1]: https://arxiv.org/pdf/1905.12133.pdf > > [2]: https://www.youtube.com/watch?v=TWxSLmkWPm4 > > > > > > Thanks, > > Rui Wang >
