Does EMIT HAVING have anything to do with aggregate queries (GROUP BY and HAVING), or is it just a coincidence that you use the same word, HAVING?
I support the idea of latency controls, but I am nervous about allowing full expressions in the EMIT clause if we don't have to. Aggregate queries have a peculiar scope. (You can only reference grouped expressions, for example.) I don't think we should drag that peculiar scope into the EMIT clause. The simplest thing is to only allow the EMIT clause to reference column aliases, which is similar to the scope used by ORDER BY. Am I allowed to combine EMIT and ORDER BY in the same SELECT block? If so, which comes first? On Tue, Jan 21, 2020 at 11:22 AM Rui Wang <[email protected]> wrote: > > Hi community, > > First of all, thanks all your help on the CALCITE-3272 (TUMBLE now works as > a table-valued function). As it starts to work, I want to continue > discussing the following work of streaming sql: a way to control > materialization latency of a streaming query. A small note for people who > are not familiar with streaming: because steaming queries are long running > queries (maybe months or up to year), usually there is a need to see > results before query is terminated. Thus it will be desired to have a way > to allow users to specify, e.g. how frequently they want to see some result > from the query. > > (The following will be a summary of my proposal. I can convert it to a > design doc if people prefer that way. Just let me know) > > *Priori work* > My idea is built on top of "one sql to rule them all paper" [1]. Kudos to > people who contributed that paper, which became the foundation of my > discussion. > > From [1], an EMIT clause was actually proposed to be added to the end of > the query. Two syntax of EMIT clause was also proposed: > 1. EMIT after the watermark passes the end of the window. E.g. EMIT > [STREAM] AFTER WATERMARK. > 2. Delay emit for a period of time after a change happens (e.g. element > arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6' MINUTES > > *Typical Streaming emitting latency patterns* > 1. Event time triggers. Emitting depends on the relationship between > watermark and event timestamp of events. Check this video [2] if you want > to have an introduction of watermark in the streaming world, and data > completeness concept based on event-timestamp. > 2. Processing time triggers. Emitting depends on the system clock. This is > a natural idea of emitting. E.g. emit the current result every hour without > considering if data in a window is already complete. > 3. data-driven triggers. E.g. emit when accumulated events exceed a > threshold (e.g. emit when have acculucated 1000 events) > 4. Composite triggers. There is a need to concat 1, 2, 3 by OR and AND to > achieve better latency control. > > *Proposal to discuss* > I want to extend the proposal in [1] and propose EMIT HAVING syntax and two > aggregation functions. > > *EMIT HAVING bool_expression*. EMIT HAVING syntax should be after normal > HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves similar > to HAVING. There are two differences that are worth mentioning: > 1. It’s not a filter. it controls when table-valued function in FROM clause > should emit a set. The set is specified by group-by keys. > 2.GROUP BY keys are visible to EMIT HAVING while it is not the case for > HAVING. > > *current_watermark(). *current_watermark() is an aggregation function that > returns a timestamp that provides where watermark is for a set. > > *processing_time_since_first_element().* processing_time_since_first_element() > is an aggregation function that returns the system clock time (i.e. > processing time) since the first element appears in the set. > > *Motivating examples* > The following will be a list of motivating examples of how this proposal > will achieve different ways of latency control. > > Assume there is a steaming query that apply fixed windowing (thus TUMBLE) > as the following: > > “SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS > T”. Based on this query, let’s say derived table T from orders has the > following schema: > > > order_id BIGINT > > product_id BIGINT > > order_meta STRING > > event_ts TIMESTAMP > > window_start TIMESTAMP > > window_end TIMESTAMP > > > > The following will be a table to demonstrate for different trigger cases, > how the query will be modified by adopting EMIT syntax to express the same > semantic: > > Trigger > > SQL Query > > AfterWatermark.pastEndOfWindow() > (emit after watermark passes end of window, thus for a window data is > believed complete) > > SELECT * > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > EMIT HAVING current_watermark() >= T.window_end > > AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration. > standardMinutes(5) > > (emit after a delay of a 5 minutes when first element appear in window) > > SELECT * > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > GROUP BY T.window_end > > EMIT HAVING processing_time_since_first_element() >= INTERVAL 5 MINUTE > > AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1)) > (emit after every event appears after data is really believed complete in a > window) > > SELECT * > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > GROUP BY T.window_end > > EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1 > > AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane > ().plusDuration(Duration.standardMinutes(5)) > > (emit before a window is complete, by following the delay emit strategy) > > SELECT * > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > GROUP BY T.window_end > > EMIT HAVING current_watermark() < T.window_end AND > processing_time_since_first_element() >= INTERVAL 5 MINUTE > > AfterWatermark.pastEndOfWindow() > > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDuration > (Duration.standardMinutes(5)) > > .withLateFirings(AfterPane.elementCountAtLeast(1)) > > (a combination of emitting before window closes, emit on window closes and > emit after window closes) > > SELECT * > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > GROUP BY T.window_end > > EMIT HAVING > > current_watermark() >= T.window_end OR > > (current_watermark() > T.window_end AND COUNT(*) > 1) OR > > (current_watermark() < T.window_end AND > processing_time_since_first_element() >= INTERVAL 5 MINUTE) > > > AfterWatermark.pastEndOfWindow() > > .withAllowedLateness(Duration.standardDays(2))) > > (emit after window closes plus a tolerance of 2 days late data) > > SELECT * > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > GROUP BY T.window_end > > EMIT HAVING > > current_watermark() >= T.window_end AND current_watermark() < T.window_end > + INTERVAL 2 DAY > > Composite triggers > illustrated by examples above that AND and OR can be used to concat > different bool expressions. > > > > Please let me know your thoughts and any other way you prefer to continue > discussing. > > [1]: https://arxiv.org/pdf/1905.12133.pdf > [2]: https://www.youtube.com/watch?v=TWxSLmkWPm4 > > > Thanks, > Rui Wang
