Hi community, First of all, thanks all your help on the CALCITE-3272 (TUMBLE now works as a table-valued function). As it starts to work, I want to continue discussing the following work of streaming sql: a way to control materialization latency of a streaming query. A small note for people who are not familiar with streaming: because steaming queries are long running queries (maybe months or up to year), usually there is a need to see results before query is terminated. Thus it will be desired to have a way to allow users to specify, e.g. how frequently they want to see some result from the query.
(The following will be a summary of my proposal. I can convert it to a design doc if people prefer that way. Just let me know) *Priori work* My idea is built on top of "one sql to rule them all paper" [1]. Kudos to people who contributed that paper, which became the foundation of my discussion. >From [1], an EMIT clause was actually proposed to be added to the end of the query. Two syntax of EMIT clause was also proposed: 1. EMIT after the watermark passes the end of the window. E.g. EMIT [STREAM] AFTER WATERMARK. 2. Delay emit for a period of time after a change happens (e.g. element arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6' MINUTES *Typical Streaming emitting latency patterns* 1. Event time triggers. Emitting depends on the relationship between watermark and event timestamp of events. Check this video [2] if you want to have an introduction of watermark in the streaming world, and data completeness concept based on event-timestamp. 2. Processing time triggers. Emitting depends on the system clock. This is a natural idea of emitting. E.g. emit the current result every hour without considering if data in a window is already complete. 3. data-driven triggers. E.g. emit when accumulated events exceed a threshold (e.g. emit when have acculucated 1000 events) 4. Composite triggers. There is a need to concat 1, 2, 3 by OR and AND to achieve better latency control. *Proposal to discuss* I want to extend the proposal in [1] and propose EMIT HAVING syntax and two aggregation functions. *EMIT HAVING bool_expression*. EMIT HAVING syntax should be after normal HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves similar to HAVING. There are two differences that are worth mentioning: 1. It’s not a filter. it controls when table-valued function in FROM clause should emit a set. The set is specified by group-by keys. 2.GROUP BY keys are visible to EMIT HAVING while it is not the case for HAVING. *current_watermark(). *current_watermark() is an aggregation function that returns a timestamp that provides where watermark is for a set. *processing_time_since_first_element().* processing_time_since_first_element() is an aggregation function that returns the system clock time (i.e. processing time) since the first element appears in the set. *Motivating examples* The following will be a list of motivating examples of how this proposal will achieve different ways of latency control. Assume there is a steaming query that apply fixed windowing (thus TUMBLE) as the following: “SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T”. Based on this query, let’s say derived table T from orders has the following schema: order_id BIGINT product_id BIGINT order_meta STRING event_ts TIMESTAMP window_start TIMESTAMP window_end TIMESTAMP The following will be a table to demonstrate for different trigger cases, how the query will be modified by adopting EMIT syntax to express the same semantic: Trigger SQL Query AfterWatermark.pastEndOfWindow() (emit after watermark passes end of window, thus for a window data is believed complete) SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T EMIT HAVING current_watermark() >= T.window_end AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration. standardMinutes(5) (emit after a delay of a 5 minutes when first element appear in window) SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T GROUP BY T.window_end EMIT HAVING processing_time_since_first_element() >= INTERVAL 5 MINUTE AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1)) (emit after every event appears after data is really believed complete in a window) SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T GROUP BY T.window_end EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1 AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane ().plusDuration(Duration.standardMinutes(5)) (emit before a window is complete, by following the delay emit strategy) SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T GROUP BY T.window_end EMIT HAVING current_watermark() < T.window_end AND processing_time_since_first_element() >= INTERVAL 5 MINUTE AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDuration (Duration.standardMinutes(5)) .withLateFirings(AfterPane.elementCountAtLeast(1)) (a combination of emitting before window closes, emit on window closes and emit after window closes) SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T GROUP BY T.window_end EMIT HAVING current_watermark() >= T.window_end OR (current_watermark() > T.window_end AND COUNT(*) > 1) OR (current_watermark() < T.window_end AND processing_time_since_first_element() >= INTERVAL 5 MINUTE) AfterWatermark.pastEndOfWindow() .withAllowedLateness(Duration.standardDays(2))) (emit after window closes plus a tolerance of 2 days late data) SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T GROUP BY T.window_end EMIT HAVING current_watermark() >= T.window_end AND current_watermark() < T.window_end + INTERVAL 2 DAY Composite triggers illustrated by examples above that AND and OR can be used to concat different bool expressions. Please let me know your thoughts and any other way you prefer to continue discussing. [1]: https://arxiv.org/pdf/1905.12133.pdf [2]: https://www.youtube.com/watch?v=TWxSLmkWPm4 Thanks, Rui Wang
