Forgot to mention the SQL syntax change: SELECT clause
[FROM TVF windowing] // windowing happen here [Where clause] [GROUP BY clause] [HAVING clause] [EMIT HAVING clause] // materialization latency control -Rui On Tue, Jan 21, 2020 at 11:21 AM Rui Wang <[email protected]> wrote: > Hi community, > > First of all, thanks all your help on the CALCITE-3272 (TUMBLE now works > as a table-valued function). As it starts to work, I want to continue > discussing the following work of streaming sql: a way to control > materialization latency of a streaming query. A small note for people who > are not familiar with streaming: because steaming queries are long running > queries (maybe months or up to year), usually there is a need to see > results before query is terminated. Thus it will be desired to have a way > to allow users to specify, e.g. how frequently they want to see some result > from the query. > > (The following will be a summary of my proposal. I can convert it to a > design doc if people prefer that way. Just let me know) > > *Priori work* > My idea is built on top of "one sql to rule them all paper" [1]. Kudos to > people who contributed that paper, which became the foundation of my > discussion. > > From [1], an EMIT clause was actually proposed to be added to the end of > the query. Two syntax of EMIT clause was also proposed: > 1. EMIT after the watermark passes the end of the window. E.g. EMIT > [STREAM] AFTER WATERMARK. > 2. Delay emit for a period of time after a change happens (e.g. element > arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6' MINUTES > > *Typical Streaming emitting latency patterns* > 1. Event time triggers. Emitting depends on the relationship between > watermark and event timestamp of events. Check this video [2] if you want > to have an introduction of watermark in the streaming world, and data > completeness concept based on event-timestamp. > 2. Processing time triggers. Emitting depends on the system clock. This is > a natural idea of emitting. E.g. emit the current result every hour without > considering if data in a window is already complete. > 3. data-driven triggers. E.g. emit when accumulated events exceed a > threshold (e.g. emit when have acculucated 1000 events) > 4. Composite triggers. There is a need to concat 1, 2, 3 by OR and AND to > achieve better latency control. > > *Proposal to discuss* > I want to extend the proposal in [1] and propose EMIT HAVING syntax and > two aggregation functions. > > *EMIT HAVING bool_expression*. EMIT HAVING syntax should be after normal > HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves similar > to HAVING. There are two differences that are worth mentioning: > 1. It’s not a filter. it controls when table-valued function in FROM > clause should emit a set. The set is specified by group-by keys. > 2.GROUP BY keys are visible to EMIT HAVING while it is not the case for > HAVING. > > *current_watermark(). *current_watermark() is an aggregation function > that returns a timestamp that provides where watermark is for a set. > > *processing_time_since_first_element().* processing_time_since_first_element() > is an aggregation function that returns the system clock time (i.e. > processing time) since the first element appears in the set. > > *Motivating examples* > The following will be a list of motivating examples of how this proposal > will achieve different ways of latency control. > > Assume there is a steaming query that apply fixed windowing (thus TUMBLE) > as the following: > > “SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS > T”. Based on this query, let’s say derived table T from orders has the > following schema: > > > order_id BIGINT > > product_id BIGINT > > order_meta STRING > > event_ts TIMESTAMP > > window_start TIMESTAMP > > window_end TIMESTAMP > > > > The following will be a table to demonstrate for different trigger cases, > how the query will be modified by adopting EMIT syntax to express the same > semantic: > > Trigger > > SQL Query > > AfterWatermark.pastEndOfWindow() > (emit after watermark passes end of window, thus for a window data is > believed complete) > > SELECT * > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > EMIT HAVING current_watermark() >= T.window_end > > AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration. > standardMinutes(5) > > (emit after a delay of a 5 minutes when first element appear in window) > > SELECT * > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > GROUP BY T.window_end > > EMIT HAVING processing_time_since_first_element() >= INTERVAL 5 MINUTE > > AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1)) > (emit after every event appears after data is really believed complete in > a window) > > SELECT * > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > GROUP BY T.window_end > > EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1 > > AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane > ().plusDuration(Duration.standardMinutes(5)) > > (emit before a window is complete, by following the delay emit strategy) > > SELECT * > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > GROUP BY T.window_end > > EMIT HAVING current_watermark() < T.window_end AND > processing_time_since_first_element() >= INTERVAL 5 MINUTE > > AfterWatermark.pastEndOfWindow() > > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane(). > plusDuration(Duration.standardMinutes(5)) > > .withLateFirings(AfterPane.elementCountAtLeast(1)) > > (a combination of emitting before window closes, emit on window closes and > emit after window closes) > > SELECT * > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > GROUP BY T.window_end > > EMIT HAVING > > current_watermark() >= T.window_end OR > > (current_watermark() > T.window_end AND COUNT(*) > 1) OR > > (current_watermark() < T.window_end AND > processing_time_since_first_element() >= INTERVAL 5 MINUTE) > > > AfterWatermark.pastEndOfWindow() > > .withAllowedLateness(Duration.standardDays(2))) > > (emit after window closes plus a tolerance of 2 days late data) > > SELECT * > > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T > > GROUP BY T.window_end > > EMIT HAVING > > current_watermark() >= T.window_end AND current_watermark() < T.window_end > + INTERVAL 2 DAY > > Composite triggers > illustrated by examples above that AND and OR can be used to concat > different bool expressions. > > > > Please let me know your thoughts and any other way you prefer to continue > discussing. > > [1]: https://arxiv.org/pdf/1905.12133.pdf > [2]: https://www.youtube.com/watch?v=TWxSLmkWPm4 > > > Thanks, > Rui Wang > > >
