I think there was a big missing in my summary about the position of EMIT
and the execution order (and I forgot about ORDER BY). Try to address them
here:

SELECT

[FROM TVF windowing] // windowing happen here

[WHERE clause]

[GROUP BY clause]

[HAVING clause]

[ORDER BY clause]

[LIMIT clause]
[EMIT clause] // materialization latency

The position of EMIT is indeed a bit confusing. As the right execution
order should be: FROM -> EMIT -> others like normal query. FROM is
continuously generating data and EMIT decide when to emit a part of data,
and then other clauses are applied to emitted data and update downstream.

So at least two open questions:
1. What should we use for EMIT? EMIT HAVING (can use aggregation columns
like COUNT(*)), EMIT WHERE (can only use single column alias like ORDER BY)
or EMIT AFTER (undefined yet if we want to support expressions, I hope we
do).
2. Where is the EMIT clause? Maybe the most clear position is to put it
after FROM.


-Rui


On Tue, Jan 21, 2020 at 1:09 PM Rui Wang <[email protected]> wrote:

>
>
> On Tue, Jan 21, 2020 at 12:34 PM Julian Hyde <[email protected]> wrote:
>
>> Does EMIT HAVING have anything to do with aggregate queries (GROUP BY
>> and HAVING), or is it just a coincidence that you use the same word,
>> HAVING?
>>
>
> EMIT HAVING is independent from HAVING, but EMIT HAVING does have a
> relationship to GROUP BY: EMIT HAVING requires a GROUP BY. It is a GROUP BY
> key, then apply EMIT HAVING expressions on sets specified by those keys.
> However we could loose the constraint to allow EMIT HAVING appears even
> without GROUP BY, which just means that apply emit control on the whole
> data set than control the emitting per group.
>
> In my opinion, the execution order is: grouping (GROUP BY) -> EMIT control
> (emit having to decide which part of data can be emitted) -> aggregation
> (normal having and other aggregations). For batch/classic DB workload, the
> EMIT step will always emit all data. So such idea is compatible with
> existing DB users.
>
> I happen to choose EMIT HAVING because the emit control is very similar to
> HAVING (and some bits of WHERE) that the only difference is: HAVING is a
> filter while EMIT HAVING control the emit. E.g apply HAVING expressions to
> data means if pass this data to downstream or not. And applying EMIT
> HAVING expressions means if pass this data to downstream now or later (or
> discard it if the window closes).
>
> If you think the idea actually causes confusion rather than convenience to
> onboard people to use steaming sql, we can replace EMIT HAVING by EMIT
> AFTER, per the original design.
>
> I support the idea of latency controls, but I am nervous about
>> allowing full expressions in the EMIT clause if we don't have to.
>>
>>
> Yep. It's a design choice of allowing expressions or keeping a set of
> dedicated SQL syntax for some emit strategies. If don't use extensible EMIT
> expressions, we likely will need to provide a long list of syntax for
> different emit strategies. For examples:
> EMIT AFTER WATERMARK
> EMIT AFTER DELAY
> EMIT AFTER AFTER WATERMARK BUT LATE
> EMIT AFTER COUNT
> EMIT AFTER BEFORE WATERMARK
> etc.
>
> Again it's a design choice so I am open to both ideas.
>
> However, personally I prefer the EMIT expressions idea because I found it
> was very easy to explain EMIT expressions to SQL people who don't have much
> streaming brackgroup. Basically you can say EMIT expressions are just
> applied to rows of table from table-valued function. If there are GROUP BY,
> each apply expression to each group accordingly. and the result of
> expression indicates if it's ready to emit. This easiness is also mainly
> from that we have introduced window start and window end into the table,
> thus we should have all data we needed in table to write expressions
> against them (except for processing time triggers).
>
> The downside of expressions though is people will be allowed to write any
> expression they want, and engines will take responsibility to validate
> those.
>
>
>
>> Aggregate queries have a peculiar scope. (You can only reference
>> grouped expressions, for example.) I don't think we should drag that
>> peculiar scope into the EMIT clause. The simplest thing is to only
>> allow the EMIT clause to reference column aliases, which is similar to
>> the scope used by ORDER BY.
>>
>> That sounds like one of the prototype ideas: EMIT WHERE. In EMIT WHERE,
> just like WHERE, you will not reference aggregation columns but just column
> aliases. It can solve the event time trigger and processing time triggers.
> However, it has a shortcoming of cannot express: EMIT WHEN there are 100
> elements accumulated, which require a COUNT(*).
>
>
>
>> Am I allowed to combine EMIT and ORDER BY in the same SELECT block? If
>> so, which comes first?
>>
>>
> Sorry I forgot about ORDER BY. In my opinion EMIT should be applied after
> FROM, but before all other WHERE, HAVING (or aggregation), ORDER BY. Note
> that EMIT controls when to emit data from the streaming dataset, thus all
> filters and aggregations should be applied after data is read to emit. Note
> again to emphasize that for classic DB/batch cases, EMIT is explicitly
> there which just EMIT all data once after FROM as all data is already
> known.
>
>
> On Tue, Jan 21, 2020 at 12:41 PM Julian Hyde <[email protected]> wrote:
>
>> One more thought. Since EMIT produces a relation, your "EMIT HAVING
>> current_watermark() >= T.window_end AND current_watermark() <
>> T.window_end + INTERVAL 2 DAY" could perhaps be accomplished by
>> wrapping the EMIT query as a sub-query and using ordinal SQL
>> expressions on the system columns added by EMIT. (I'm not saying we
>> should do this. But when designing a feature, it's worth calling out
>> whether it adds power or whether it is syntactic sugar.)
>
>
> Sounds like just do a normal WHERE based on the table from sub-query. yep
> that's an option. Then open question from it is if we want to mix "EMIT''
> semantic (which is not really a classic SQL filter) or "Filter" semantic
> (WHERE and HAVING) into existing WHERE and HAVING. I prefer not to, to
> leave classic DB queries unchanged (their WHERE is just a per row filter
> which having any new semantic, e.g. latency control, added)
>
>
>
>> On Tue, Jan 21, 2020 at 11:22 AM Rui Wang <[email protected]> wrote:
>> >
>> > Hi community,
>> >
>> > First of all, thanks all your help on the CALCITE-3272 (TUMBLE now
>> works as
>> > a table-valued function). As it starts to work, I want to continue
>> > discussing the following work of streaming sql: a way to control
>> > materialization latency of a streaming query. A small note for people
>> who
>> > are not familiar with streaming: because steaming queries are long
>> running
>> > queries (maybe months or up to year), usually there is a need to see
>> > results before query is terminated. Thus it will be desired to have a
>> way
>> > to allow users to specify, e.g. how frequently they want to see some
>> result
>> > from the query.
>> >
>> > (The following will be a summary of my proposal. I can convert it to a
>> > design doc if people prefer that way. Just let me know)
>> >
>> > *Priori work*
>> > My idea is built on top of "one sql to rule them all paper" [1]. Kudos
>> to
>> > people who contributed that paper, which became the foundation of my
>> > discussion.
>> >
>> > From [1], an EMIT clause was actually proposed to be added to the end of
>> > the query. Two syntax of EMIT clause was also proposed:
>> > 1. EMIT after the watermark passes the end of the window. E.g. EMIT
>> > [STREAM] AFTER WATERMARK.
>> > 2. Delay emit for a period of time after a change happens (e.g. element
>> > arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6'
>> MINUTES
>> >
>> > *Typical Streaming emitting latency patterns*
>> > 1. Event time triggers. Emitting depends on the relationship between
>> > watermark and event timestamp of events. Check this video [2] if you
>> want
>> > to have an introduction of watermark in the streaming world, and data
>> > completeness concept based on event-timestamp.
>> > 2. Processing time triggers. Emitting depends on the system clock. This
>> is
>> > a natural idea of emitting. E.g. emit the current result every hour
>> without
>> > considering if data in a window is already complete.
>> > 3. data-driven triggers. E.g. emit when accumulated events exceed a
>> > threshold (e.g. emit when have acculucated 1000 events)
>> > 4. Composite triggers. There is a need to concat 1, 2, 3 by OR and AND
>> to
>> > achieve better latency control.
>> >
>> > *Proposal to discuss*
>> > I want to extend the proposal in [1] and propose EMIT HAVING syntax and
>> two
>> > aggregation functions.
>> >
>> > *EMIT HAVING bool_expression*. EMIT HAVING syntax should be after normal
>> > HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves
>> similar
>> > to HAVING. There are two differences that are worth mentioning:
>> > 1. It’s not a filter. it controls when table-valued function in FROM
>> clause
>> > should emit a set. The set is specified by group-by keys.
>> > 2.GROUP BY keys are visible to EMIT HAVING while it is not the case for
>> > HAVING.
>> >
>> > *current_watermark(). *current_watermark() is an aggregation function
>> that
>> > returns a timestamp that provides where watermark is for a set.
>> >
>> > *processing_time_since_first_element().*
>> processing_time_since_first_element()
>> > is an aggregation function that returns the system clock time (i.e.
>> > processing time) since the first element appears in the set.
>> >
>> > *Motivating examples*
>> > The following will be a list of motivating examples of how this proposal
>> > will achieve different ways of latency control.
>> >
>> > Assume there is a steaming query that apply fixed windowing (thus
>> TUMBLE)
>> > as the following:
>> >
>> > “SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE)
>> AS
>> > T”. Based on this query, let’s say derived table T from orders  has the
>> > following schema:
>> >
>> >
>> > order_id BIGINT
>> >
>> > product_id BIGINT
>> >
>> > order_meta STRING
>> >
>> > event_ts TIMESTAMP
>> >
>> > window_start TIMESTAMP
>> >
>> > window_end TIMESTAMP
>> >
>> >
>> >
>> > The following will be a table to demonstrate for different trigger
>> cases,
>> > how the query will be modified by adopting EMIT syntax to express the
>> same
>> > semantic:
>> >
>> > Trigger
>> >
>> > SQL Query
>> >
>> > AfterWatermark.pastEndOfWindow()
>> > (emit after watermark passes end of window, thus for a window data is
>> > believed complete)
>> >
>> > SELECT *
>> >
>> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >
>> > EMIT HAVING current_watermark() >= T.window_end
>> >
>> > AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration.
>> > standardMinutes(5)
>> >
>> > (emit after a delay of a 5 minutes when first element appear in window)
>> >
>> > SELECT *
>> >
>> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >
>> > GROUP BY T.window_end
>> >
>> > EMIT HAVING processing_time_since_first_element() >= INTERVAL 5 MINUTE
>> >
>> > AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1))
>> > (emit after every event appears after data is really believed complete
>> in a
>> > window)
>> >
>> > SELECT *
>> >
>> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >
>> > GROUP BY T.window_end
>> >
>> > EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1
>> >
>> >
>> AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane
>> > ().plusDuration(Duration.standardMinutes(5))
>> >
>> > (emit before a window is complete, by following the delay emit strategy)
>> >
>> > SELECT *
>> >
>> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >
>> > GROUP BY T.window_end
>> >
>> > EMIT HAVING current_watermark() < T.window_end AND
>> > processing_time_since_first_element() >= INTERVAL 5 MINUTE
>> >
>> > AfterWatermark.pastEndOfWindow()
>> >
>> >
>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDuration
>> > (Duration.standardMinutes(5))
>> >
>> > .withLateFirings(AfterPane.elementCountAtLeast(1))
>> >
>> > (a combination of emitting before window closes, emit on window closes
>> and
>> > emit after window closes)
>> >
>> > SELECT *
>> >
>> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >
>> > GROUP BY T.window_end
>> >
>> > EMIT HAVING
>> >
>> > current_watermark() >= T.window_end OR
>> >
>> > (current_watermark() > T.window_end AND COUNT(*) > 1) OR
>> >
>> > (current_watermark() < T.window_end AND
>> > processing_time_since_first_element() >= INTERVAL 5 MINUTE)
>> >
>> >
>> > AfterWatermark.pastEndOfWindow()
>> >
>> > .withAllowedLateness(Duration.standardDays(2)))
>> >
>> > (emit after window closes plus a tolerance of 2 days late data)
>> >
>> > SELECT *
>> >
>> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >
>> > GROUP BY T.window_end
>> >
>> > EMIT HAVING
>> >
>> > current_watermark() >= T.window_end AND current_watermark() <
>> T.window_end
>> > + INTERVAL 2 DAY
>> >
>> > Composite triggers
>> > illustrated by examples above that AND and OR can be used to concat
>> > different bool expressions.
>> >
>> >
>> >
>> > Please let me know your thoughts and any other way you prefer to
>> continue
>> > discussing.
>> >
>> > [1]: https://arxiv.org/pdf/1905.12133.pdf
>> > [2]: https://www.youtube.com/watch?v=TWxSLmkWPm4
>> >
>> >
>> > Thanks,
>> > Rui Wang
>>
>

Reply via email to