One more thought. Since EMIT produces a relation, your "EMIT HAVING current_watermark() >= T.window_end AND current_watermark() < T.window_end + INTERVAL 2 DAY" could perhaps be accomplished by wrapping the EMIT query as a sub-query and using ordinal SQL expressions on the system columns added by EMIT. (I'm not saying we should do this. But when designing a feature, it's worth calling out whether it adds power or whether it is syntactic sugar.)
On Tue, Jan 21, 2020 at 12:34 PM Julian Hyde <[email protected]> wrote: > > Does EMIT HAVING have anything to do with aggregate queries (GROUP BY > and HAVING), or is it just a coincidence that you use the same word, > HAVING? > > I support the idea of latency controls, but I am nervous about > allowing full expressions in the EMIT clause if we don't have to. > > Aggregate queries have a peculiar scope. (You can only reference > grouped expressions, for example.) I don't think we should drag that > peculiar scope into the EMIT clause. The simplest thing is to only > allow the EMIT clause to reference column aliases, which is similar to > the scope used by ORDER BY. > > Am I allowed to combine EMIT and ORDER BY in the same SELECT block? If > so, which comes first? > > On Tue, Jan 21, 2020 at 11:22 AM Rui Wang <[email protected]> wrote: > > > > Hi community, > > > > First of all, thanks all your help on the CALCITE-3272 (TUMBLE now works as > > a table-valued function). As it starts to work, I want to continue > > discussing the following work of streaming sql: a way to control > > materialization latency of a streaming query. A small note for people who > > are not familiar with streaming: because steaming queries are long running > > queries (maybe months or up to year), usually there is a need to see > > results before query is terminated. Thus it will be desired to have a way > > to allow users to specify, e.g. how frequently they want to see some result > > from the query. > > > > (The following will be a summary of my proposal. I can convert it to a > > design doc if people prefer that way. Just let me know) > > > > *Priori work* > > My idea is built on top of "one sql to rule them all paper" [1]. Kudos to > > people who contributed that paper, which became the foundation of my > > discussion. > > > > From [1], an EMIT clause was actually proposed to be added to the end of > > the query. Two syntax of EMIT clause was also proposed: > > 1. EMIT after the watermark passes the end of the window. E.g. EMIT > > [STREAM] AFTER WATERMARK. > > 2. Delay emit for a period of time after a change happens (e.g. element > > arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6' MINUTES > > > > *Typical Streaming emitting latency patterns* > > 1. Event time triggers. Emitting depends on the relationship between > > watermark and event timestamp of events. Check this video [2] if you want > > to have an introduction of watermark in the streaming world, and data > > completeness concept based on event-timestamp. > > 2. Processing time triggers. Emitting depends on the system clock. This is > > a natural idea of emitting. E.g. emit the current result every hour without > > considering if data in a window is already complete. > > 3. data-driven triggers. E.g. emit when accumulated events exceed a > > threshold (e.g. emit when have acculucated 1000 events) > > 4. Composite triggers. There is a need to concat 1, 2, 3 by OR and AND to > > achieve better latency control. > > > > *Proposal to discuss* > > I want to extend the proposal in [1] and propose EMIT HAVING syntax and two > > aggregation functions. > > > > *EMIT HAVING bool_expression*. EMIT HAVING syntax should be after normal > > HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves similar > > to HAVING. There are two differences that are worth mentioning: > > 1. It’s not a filter. it controls when table-valued function in FROM clause > > should emit a set. The set is specified by group-by keys. > > 2.GROUP BY keys are visible to EMIT HAVING while it is not the case for > > HAVING. > > > > *current_watermark(). *current_watermark() is an aggregation function that > > returns a timestamp that provides where watermark is for a set. > > > > *processing_time_since_first_element().* > > processing_time_since_first_element() > > is an aggregation function that returns the system clock time (i.e. > > processing time) since the first element appears in the set. > > > > *Motivating examples* > > The following will be a list of motivating examples of how this proposal > > will achieve different ways of latency control. > > > > Assume there is a steaming query that apply fixed windowing (thus TUMBLE) > > as the following: > > > > “SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS > > T”. Based on this query, let’s say derived table T from orders has the > > following schema: > > > > > > order_id BIGINT > > > > product_id BIGINT > > > > order_meta STRING > > > > event_ts TIMESTAMP > > > > window_start TIMESTAMP > > > > window_end TIMESTAMP > > > > > > > > The following will be a table to demonstrate for different trigger cases, > > how the query will be modified by adopting EMIT syntax to express the same > > semantic: > > > > Trigger > > > > SQL Query > > > > AfterWatermark.pastEndOfWindow() > > (emit after watermark passes end of window, thus for a window data is > > believed complete) > > > > SELECT * > > > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > > > EMIT HAVING current_watermark() >= T.window_end > > > > AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration. > > standardMinutes(5) > > > > (emit after a delay of a 5 minutes when first element appear in window) > > > > SELECT * > > > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > > > GROUP BY T.window_end > > > > EMIT HAVING processing_time_since_first_element() >= INTERVAL 5 MINUTE > > > > AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1)) > > (emit after every event appears after data is really believed complete in a > > window) > > > > SELECT * > > > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > > > GROUP BY T.window_end > > > > EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1 > > > > AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane > > ().plusDuration(Duration.standardMinutes(5)) > > > > (emit before a window is complete, by following the delay emit strategy) > > > > SELECT * > > > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > > > GROUP BY T.window_end > > > > EMIT HAVING current_watermark() < T.window_end AND > > processing_time_since_first_element() >= INTERVAL 5 MINUTE > > > > AfterWatermark.pastEndOfWindow() > > > > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDuration > > (Duration.standardMinutes(5)) > > > > .withLateFirings(AfterPane.elementCountAtLeast(1)) > > > > (a combination of emitting before window closes, emit on window closes and > > emit after window closes) > > > > SELECT * > > > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > > > GROUP BY T.window_end > > > > EMIT HAVING > > > > current_watermark() >= T.window_end OR > > > > (current_watermark() > T.window_end AND COUNT(*) > 1) OR > > > > (current_watermark() < T.window_end AND > > processing_time_since_first_element() >= INTERVAL 5 MINUTE) > > > > > > AfterWatermark.pastEndOfWindow() > > > > .withAllowedLateness(Duration.standardDays(2))) > > > > (emit after window closes plus a tolerance of 2 days late data) > > > > SELECT * > > > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > > > GROUP BY T.window_end > > > > EMIT HAVING > > > > current_watermark() >= T.window_end AND current_watermark() < T.window_end > > + INTERVAL 2 DAY > > > > Composite triggers > > illustrated by examples above that AND and OR can be used to concat > > different bool expressions. > > > > > > > > Please let me know your thoughts and any other way you prefer to continue > > discussing. > > > > [1]: https://arxiv.org/pdf/1905.12133.pdf > > [2]: https://www.youtube.com/watch?v=TWxSLmkWPm4 > > > > > > Thanks, > > Rui Wang
