Forgot to mention the SQL syntax change:

SELECT clause

[FROM TVF windowing] // windowing happen here

[Where clause]

[GROUP BY clause]

[HAVING clause]
[EMIT HAVING clause] // materialization latency control


-Rui

On Tue, Jan 21, 2020 at 11:21 AM Rui Wang <[email protected]> wrote:

> Hi community,
>
> First of all, thanks all your help on the CALCITE-3272 (TUMBLE now works
> as a table-valued function). As it starts to work, I want to continue
> discussing the following work of streaming sql: a way to control
> materialization latency of a streaming query. A small note for people who
> are not familiar with streaming: because steaming queries are long running
> queries (maybe months or up to year), usually there is a need to see
> results before query is terminated. Thus it will be desired to have a way
> to allow users to specify, e.g. how frequently they want to see some result
> from the query.
>
> (The following will be a summary of my proposal. I can convert it to a
> design doc if people prefer that way. Just let me know)
>
> *Priori work*
> My idea is built on top of "one sql to rule them all paper" [1]. Kudos to
> people who contributed that paper, which became the foundation of my
> discussion.
>
> From [1], an EMIT clause was actually proposed to be added to the end of
> the query. Two syntax of EMIT clause was also proposed:
> 1. EMIT after the watermark passes the end of the window. E.g. EMIT
> [STREAM] AFTER WATERMARK.
> 2. Delay emit for a period of time after a change happens (e.g. element
> arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6' MINUTES
>
> *Typical Streaming emitting latency patterns*
> 1. Event time triggers. Emitting depends on the relationship between
> watermark and event timestamp of events. Check this video [2] if you want
> to have an introduction of watermark in the streaming world, and data
> completeness concept based on event-timestamp.
> 2. Processing time triggers. Emitting depends on the system clock. This is
> a natural idea of emitting. E.g. emit the current result every hour without
> considering if data in a window is already complete.
> 3. data-driven triggers. E.g. emit when accumulated events exceed a
> threshold (e.g. emit when have acculucated 1000 events)
> 4. Composite triggers. There is a need to concat 1, 2, 3 by OR and AND to
> achieve better latency control.
>
> *Proposal to discuss*
> I want to extend the proposal in [1] and propose EMIT HAVING syntax and
> two aggregation functions.
>
> *EMIT HAVING bool_expression*. EMIT HAVING syntax should be after normal
> HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves similar
> to HAVING. There are two differences that are worth mentioning:
> 1. It’s not a filter. it controls when table-valued function in FROM
> clause should emit a set. The set is specified by group-by keys.
> 2.GROUP BY keys are visible to EMIT HAVING while it is not the case for
> HAVING.
>
> *current_watermark(). *current_watermark() is an aggregation function
> that returns a timestamp that provides where watermark is for a set.
>
> *processing_time_since_first_element().* processing_time_since_first_element()
> is an aggregation function that returns the system clock time (i.e.
> processing time) since the first element appears in the set.
>
> *Motivating examples*
> The following will be a list of motivating examples of how this proposal
> will achieve different ways of latency control.
>
> Assume there is a steaming query that apply fixed windowing (thus TUMBLE)
> as the following:
>
> “SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS
> T”. Based on this query, let’s say derived table T from orders  has the
> following schema:
>
>
> order_id BIGINT
>
> product_id BIGINT
>
> order_meta STRING
>
> event_ts TIMESTAMP
>
> window_start TIMESTAMP
>
> window_end TIMESTAMP
>
>
>
> The following will be a table to demonstrate for different trigger cases,
> how the query will be modified by adopting EMIT syntax to express the same
> semantic:
>
> Trigger
>
> SQL Query
>
> AfterWatermark.pastEndOfWindow()
> (emit after watermark passes end of window, thus for a window data is
> believed complete)
>
> SELECT *
>
> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>
> EMIT HAVING current_watermark() >= T.window_end
>
> AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration.
> standardMinutes(5)
>
> (emit after a delay of a 5 minutes when first element appear in window)
>
> SELECT *
>
> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>
> GROUP BY T.window_end
>
> EMIT HAVING processing_time_since_first_element() >= INTERVAL 5 MINUTE
>
> AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1))
> (emit after every event appears after data is really believed complete in
> a window)
>
> SELECT *
>
> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>
> GROUP BY T.window_end
>
> EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1
>
> AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane
> ().plusDuration(Duration.standardMinutes(5))
>
> (emit before a window is complete, by following the delay emit strategy)
>
> SELECT *
>
> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>
> GROUP BY T.window_end
>
> EMIT HAVING current_watermark() < T.window_end AND
> processing_time_since_first_element() >= INTERVAL 5 MINUTE
>
> AfterWatermark.pastEndOfWindow()
>
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().
> plusDuration(Duration.standardMinutes(5))
>
> .withLateFirings(AfterPane.elementCountAtLeast(1))
>
> (a combination of emitting before window closes, emit on window closes and
> emit after window closes)
>
> SELECT *
>
> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>
> GROUP BY T.window_end
>
> EMIT HAVING
>
> current_watermark() >= T.window_end OR
>
> (current_watermark() > T.window_end AND COUNT(*) > 1) OR
>
> (current_watermark() < T.window_end AND
> processing_time_since_first_element() >= INTERVAL 5 MINUTE)
>
>
> AfterWatermark.pastEndOfWindow()
>
> .withAllowedLateness(Duration.standardDays(2)))
>
> (emit after window closes plus a tolerance of 2 days late data)
>
> SELECT *
>
> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>
> GROUP BY T.window_end
>
> EMIT HAVING
>
> current_watermark() >= T.window_end AND current_watermark() < T.window_end
> + INTERVAL 2 DAY
>
> Composite triggers
> illustrated by examples above that AND and OR can be used to concat
> different bool expressions.
>
>
>
> Please let me know your thoughts and any other way you prefer to continue
> discussing.
>
> [1]: https://arxiv.org/pdf/1905.12133.pdf
> [2]: https://www.youtube.com/watch?v=TWxSLmkWPm4
>
>
> Thanks,
> Rui Wang
>
>
>

Reply via email to