Hi community,

First of all, thanks all your help on the CALCITE-3272 (TUMBLE now works as
a table-valued function). As it starts to work, I want to continue
discussing the following work of streaming sql: a way to control
materialization latency of a streaming query. A small note for people who
are not familiar with streaming: because steaming queries are long running
queries (maybe months or up to year), usually there is a need to see
results before query is terminated. Thus it will be desired to have a way
to allow users to specify, e.g. how frequently they want to see some result
from the query.

(The following will be a summary of my proposal. I can convert it to a
design doc if people prefer that way. Just let me know)

*Priori work*
My idea is built on top of "one sql to rule them all paper" [1]. Kudos to
people who contributed that paper, which became the foundation of my
discussion.

>From [1], an EMIT clause was actually proposed to be added to the end of
the query. Two syntax of EMIT clause was also proposed:
1. EMIT after the watermark passes the end of the window. E.g. EMIT
[STREAM] AFTER WATERMARK.
2. Delay emit for a period of time after a change happens (e.g. element
arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6' MINUTES

*Typical Streaming emitting latency patterns*
1. Event time triggers. Emitting depends on the relationship between
watermark and event timestamp of events. Check this video [2] if you want
to have an introduction of watermark in the streaming world, and data
completeness concept based on event-timestamp.
2. Processing time triggers. Emitting depends on the system clock. This is
a natural idea of emitting. E.g. emit the current result every hour without
considering if data in a window is already complete.
3. data-driven triggers. E.g. emit when accumulated events exceed a
threshold (e.g. emit when have acculucated 1000 events)
4. Composite triggers. There is a need to concat 1, 2, 3 by OR and AND to
achieve better latency control.

*Proposal to discuss*
I want to extend the proposal in [1] and propose EMIT HAVING syntax and two
aggregation functions.

*EMIT HAVING bool_expression*. EMIT HAVING syntax should be after normal
HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves similar
to HAVING. There are two differences that are worth mentioning:
1. It’s not a filter. it controls when table-valued function in FROM clause
should emit a set. The set is specified by group-by keys.
2.GROUP BY keys are visible to EMIT HAVING while it is not the case for
HAVING.

*current_watermark(). *current_watermark() is an aggregation function that
returns a timestamp that provides where watermark is for a set.

*processing_time_since_first_element().* processing_time_since_first_element()
is an aggregation function that returns the system clock time (i.e.
processing time) since the first element appears in the set.

*Motivating examples*
The following will be a list of motivating examples of how this proposal
will achieve different ways of latency control.

Assume there is a steaming query that apply fixed windowing (thus TUMBLE)
as the following:

“SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS
T”. Based on this query, let’s say derived table T from orders  has the
following schema:


order_id BIGINT

product_id BIGINT

order_meta STRING

event_ts TIMESTAMP

window_start TIMESTAMP

window_end TIMESTAMP



The following will be a table to demonstrate for different trigger cases,
how the query will be modified by adopting EMIT syntax to express the same
semantic:

Trigger

SQL Query

AfterWatermark.pastEndOfWindow()
(emit after watermark passes end of window, thus for a window data is
believed complete)

SELECT *

FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T

EMIT HAVING current_watermark() >= T.window_end

AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration.
standardMinutes(5)

(emit after a delay of a 5 minutes when first element appear in window)

SELECT *

FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T

GROUP BY T.window_end

EMIT HAVING processing_time_since_first_element() >= INTERVAL 5 MINUTE

AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1))
(emit after every event appears after data is really believed complete in a
window)

SELECT *

FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T

GROUP BY T.window_end

EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1

AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane
().plusDuration(Duration.standardMinutes(5))

(emit before a window is complete, by following the delay emit strategy)

SELECT *

FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T

GROUP BY T.window_end

EMIT HAVING current_watermark() < T.window_end AND
processing_time_since_first_element() >= INTERVAL 5 MINUTE

AfterWatermark.pastEndOfWindow()

.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDuration
(Duration.standardMinutes(5))

.withLateFirings(AfterPane.elementCountAtLeast(1))

(a combination of emitting before window closes, emit on window closes and
emit after window closes)

SELECT *

FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T

GROUP BY T.window_end

EMIT HAVING

current_watermark() >= T.window_end OR

(current_watermark() > T.window_end AND COUNT(*) > 1) OR

(current_watermark() < T.window_end AND
processing_time_since_first_element() >= INTERVAL 5 MINUTE)


AfterWatermark.pastEndOfWindow()

.withAllowedLateness(Duration.standardDays(2)))

(emit after window closes plus a tolerance of 2 days late data)

SELECT *

FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T

GROUP BY T.window_end

EMIT HAVING

current_watermark() >= T.window_end AND current_watermark() < T.window_end
+ INTERVAL 2 DAY

Composite triggers
illustrated by examples above that AND and OR can be used to concat
different bool expressions.



Please let me know your thoughts and any other way you prefer to continue
discussing.

[1]: https://arxiv.org/pdf/1905.12133.pdf
[2]: https://www.youtube.com/watch?v=TWxSLmkWPm4


Thanks,
Rui Wang

Reply via email to