In the SIGMOD paper, EMIT could be applied to any query. Think of it as 
executing the query at T, then executing it at T+delta, compare the results, 
and emit any added or removed records.

Tying it to aggregate queries seems wrong. (If we have a special semantics for 
aggregate queries, how are we possibly going to make the semantics well-defined 
for, say, user-defined table functions?)

Yes, I know aggregate queries have weird behavior. If you have computed ’select 
product, sum(amount) from orders group by product’, and a new order with 
(product 100, amount 2), then you are going to see (100, 6) retracted and (100, 
8) added. But I think we have to live with that. Otherwise EMIT semantics get a 
lot more complicated.

Julian


> On Jan 21, 2020, at 1:24 PM, Rui Wang <[email protected]> wrote:
> 
> I think there was a big missing in my summary about the position of EMIT
> and the execution order (and I forgot about ORDER BY). Try to address them
> here:
> 
> SELECT
> 
> [FROM TVF windowing] // windowing happen here
> 
> [WHERE clause]
> 
> [GROUP BY clause]
> 
> [HAVING clause]
> 
> [ORDER BY clause]
> 
> [LIMIT clause]
> [EMIT clause] // materialization latency
> 
> The position of EMIT is indeed a bit confusing. As the right execution
> order should be: FROM -> EMIT -> others like normal query. FROM is
> continuously generating data and EMIT decide when to emit a part of data,
> and then other clauses are applied to emitted data and update downstream.
> 
> So at least two open questions:
> 1. What should we use for EMIT? EMIT HAVING (can use aggregation columns
> like COUNT(*)), EMIT WHERE (can only use single column alias like ORDER BY)
> or EMIT AFTER (undefined yet if we want to support expressions, I hope we
> do).
> 2. Where is the EMIT clause? Maybe the most clear position is to put it
> after FROM.
> 
> 
> -Rui
> 
> 
> On Tue, Jan 21, 2020 at 1:09 PM Rui Wang <[email protected]> wrote:
> 
>> 
>> 
>> On Tue, Jan 21, 2020 at 12:34 PM Julian Hyde <[email protected]> wrote:
>> 
>>> Does EMIT HAVING have anything to do with aggregate queries (GROUP BY
>>> and HAVING), or is it just a coincidence that you use the same word,
>>> HAVING?
>>> 
>> 
>> EMIT HAVING is independent from HAVING, but EMIT HAVING does have a
>> relationship to GROUP BY: EMIT HAVING requires a GROUP BY. It is a GROUP BY
>> key, then apply EMIT HAVING expressions on sets specified by those keys.
>> However we could loose the constraint to allow EMIT HAVING appears even
>> without GROUP BY, which just means that apply emit control on the whole
>> data set than control the emitting per group.
>> 
>> In my opinion, the execution order is: grouping (GROUP BY) -> EMIT control
>> (emit having to decide which part of data can be emitted) -> aggregation
>> (normal having and other aggregations). For batch/classic DB workload, the
>> EMIT step will always emit all data. So such idea is compatible with
>> existing DB users.
>> 
>> I happen to choose EMIT HAVING because the emit control is very similar to
>> HAVING (and some bits of WHERE) that the only difference is: HAVING is a
>> filter while EMIT HAVING control the emit. E.g apply HAVING expressions to
>> data means if pass this data to downstream or not. And applying EMIT
>> HAVING expressions means if pass this data to downstream now or later (or
>> discard it if the window closes).
>> 
>> If you think the idea actually causes confusion rather than convenience to
>> onboard people to use steaming sql, we can replace EMIT HAVING by EMIT
>> AFTER, per the original design.
>> 
>> I support the idea of latency controls, but I am nervous about
>>> allowing full expressions in the EMIT clause if we don't have to.
>>> 
>>> 
>> Yep. It's a design choice of allowing expressions or keeping a set of
>> dedicated SQL syntax for some emit strategies. If don't use extensible EMIT
>> expressions, we likely will need to provide a long list of syntax for
>> different emit strategies. For examples:
>> EMIT AFTER WATERMARK
>> EMIT AFTER DELAY
>> EMIT AFTER AFTER WATERMARK BUT LATE
>> EMIT AFTER COUNT
>> EMIT AFTER BEFORE WATERMARK
>> etc.
>> 
>> Again it's a design choice so I am open to both ideas.
>> 
>> However, personally I prefer the EMIT expressions idea because I found it
>> was very easy to explain EMIT expressions to SQL people who don't have much
>> streaming brackgroup. Basically you can say EMIT expressions are just
>> applied to rows of table from table-valued function. If there are GROUP BY,
>> each apply expression to each group accordingly. and the result of
>> expression indicates if it's ready to emit. This easiness is also mainly
>> from that we have introduced window start and window end into the table,
>> thus we should have all data we needed in table to write expressions
>> against them (except for processing time triggers).
>> 
>> The downside of expressions though is people will be allowed to write any
>> expression they want, and engines will take responsibility to validate
>> those.
>> 
>> 
>> 
>>> Aggregate queries have a peculiar scope. (You can only reference
>>> grouped expressions, for example.) I don't think we should drag that
>>> peculiar scope into the EMIT clause. The simplest thing is to only
>>> allow the EMIT clause to reference column aliases, which is similar to
>>> the scope used by ORDER BY.
>>> 
>>> That sounds like one of the prototype ideas: EMIT WHERE. In EMIT WHERE,
>> just like WHERE, you will not reference aggregation columns but just column
>> aliases. It can solve the event time trigger and processing time triggers.
>> However, it has a shortcoming of cannot express: EMIT WHEN there are 100
>> elements accumulated, which require a COUNT(*).
>> 
>> 
>> 
>>> Am I allowed to combine EMIT and ORDER BY in the same SELECT block? If
>>> so, which comes first?
>>> 
>>> 
>> Sorry I forgot about ORDER BY. In my opinion EMIT should be applied after
>> FROM, but before all other WHERE, HAVING (or aggregation), ORDER BY. Note
>> that EMIT controls when to emit data from the streaming dataset, thus all
>> filters and aggregations should be applied after data is read to emit. Note
>> again to emphasize that for classic DB/batch cases, EMIT is explicitly
>> there which just EMIT all data once after FROM as all data is already
>> known.
>> 
>> 
>> On Tue, Jan 21, 2020 at 12:41 PM Julian Hyde <[email protected]> wrote:
>> 
>>> One more thought. Since EMIT produces a relation, your "EMIT HAVING
>>> current_watermark() >= T.window_end AND current_watermark() <
>>> T.window_end + INTERVAL 2 DAY" could perhaps be accomplished by
>>> wrapping the EMIT query as a sub-query and using ordinal SQL
>>> expressions on the system columns added by EMIT. (I'm not saying we
>>> should do this. But when designing a feature, it's worth calling out
>>> whether it adds power or whether it is syntactic sugar.)
>> 
>> 
>> Sounds like just do a normal WHERE based on the table from sub-query. yep
>> that's an option. Then open question from it is if we want to mix "EMIT''
>> semantic (which is not really a classic SQL filter) or "Filter" semantic
>> (WHERE and HAVING) into existing WHERE and HAVING. I prefer not to, to
>> leave classic DB queries unchanged (their WHERE is just a per row filter
>> which having any new semantic, e.g. latency control, added)
>> 
>> 
>> 
>>> On Tue, Jan 21, 2020 at 11:22 AM Rui Wang <[email protected]> wrote:
>>>> 
>>>> Hi community,
>>>> 
>>>> First of all, thanks all your help on the CALCITE-3272 (TUMBLE now
>>> works as
>>>> a table-valued function). As it starts to work, I want to continue
>>>> discussing the following work of streaming sql: a way to control
>>>> materialization latency of a streaming query. A small note for people
>>> who
>>>> are not familiar with streaming: because steaming queries are long
>>> running
>>>> queries (maybe months or up to year), usually there is a need to see
>>>> results before query is terminated. Thus it will be desired to have a
>>> way
>>>> to allow users to specify, e.g. how frequently they want to see some
>>> result
>>>> from the query.
>>>> 
>>>> (The following will be a summary of my proposal. I can convert it to a
>>>> design doc if people prefer that way. Just let me know)
>>>> 
>>>> *Priori work*
>>>> My idea is built on top of "one sql to rule them all paper" [1]. Kudos
>>> to
>>>> people who contributed that paper, which became the foundation of my
>>>> discussion.
>>>> 
>>>> From [1], an EMIT clause was actually proposed to be added to the end of
>>>> the query. Two syntax of EMIT clause was also proposed:
>>>> 1. EMIT after the watermark passes the end of the window. E.g. EMIT
>>>> [STREAM] AFTER WATERMARK.
>>>> 2. Delay emit for a period of time after a change happens (e.g. element
>>>> arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6'
>>> MINUTES
>>>> 
>>>> *Typical Streaming emitting latency patterns*
>>>> 1. Event time triggers. Emitting depends on the relationship between
>>>> watermark and event timestamp of events. Check this video [2] if you
>>> want
>>>> to have an introduction of watermark in the streaming world, and data
>>>> completeness concept based on event-timestamp.
>>>> 2. Processing time triggers. Emitting depends on the system clock. This
>>> is
>>>> a natural idea of emitting. E.g. emit the current result every hour
>>> without
>>>> considering if data in a window is already complete.
>>>> 3. data-driven triggers. E.g. emit when accumulated events exceed a
>>>> threshold (e.g. emit when have acculucated 1000 events)
>>>> 4. Composite triggers. There is a need to concat 1, 2, 3 by OR and AND
>>> to
>>>> achieve better latency control.
>>>> 
>>>> *Proposal to discuss*
>>>> I want to extend the proposal in [1] and propose EMIT HAVING syntax and
>>> two
>>>> aggregation functions.
>>>> 
>>>> *EMIT HAVING bool_expression*. EMIT HAVING syntax should be after normal
>>>> HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves
>>> similar
>>>> to HAVING. There are two differences that are worth mentioning:
>>>> 1. It’s not a filter. it controls when table-valued function in FROM
>>> clause
>>>> should emit a set. The set is specified by group-by keys.
>>>> 2.GROUP BY keys are visible to EMIT HAVING while it is not the case for
>>>> HAVING.
>>>> 
>>>> *current_watermark(). *current_watermark() is an aggregation function
>>> that
>>>> returns a timestamp that provides where watermark is for a set.
>>>> 
>>>> *processing_time_since_first_element().*
>>> processing_time_since_first_element()
>>>> is an aggregation function that returns the system clock time (i.e.
>>>> processing time) since the first element appears in the set.
>>>> 
>>>> *Motivating examples*
>>>> The following will be a list of motivating examples of how this proposal
>>>> will achieve different ways of latency control.
>>>> 
>>>> Assume there is a steaming query that apply fixed windowing (thus
>>> TUMBLE)
>>>> as the following:
>>>> 
>>>> “SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE)
>>> AS
>>>> T”. Based on this query, let’s say derived table T from orders  has the
>>>> following schema:
>>>> 
>>>> 
>>>> order_id BIGINT
>>>> 
>>>> product_id BIGINT
>>>> 
>>>> order_meta STRING
>>>> 
>>>> event_ts TIMESTAMP
>>>> 
>>>> window_start TIMESTAMP
>>>> 
>>>> window_end TIMESTAMP
>>>> 
>>>> 
>>>> 
>>>> The following will be a table to demonstrate for different trigger
>>> cases,
>>>> how the query will be modified by adopting EMIT syntax to express the
>>> same
>>>> semantic:
>>>> 
>>>> Trigger
>>>> 
>>>> SQL Query
>>>> 
>>>> AfterWatermark.pastEndOfWindow()
>>>> (emit after watermark passes end of window, thus for a window data is
>>>> believed complete)
>>>> 
>>>> SELECT *
>>>> 
>>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>>>> 
>>>> EMIT HAVING current_watermark() >= T.window_end
>>>> 
>>>> AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration.
>>>> standardMinutes(5)
>>>> 
>>>> (emit after a delay of a 5 minutes when first element appear in window)
>>>> 
>>>> SELECT *
>>>> 
>>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>>>> 
>>>> GROUP BY T.window_end
>>>> 
>>>> EMIT HAVING processing_time_since_first_element() >= INTERVAL 5 MINUTE
>>>> 
>>>> AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1))
>>>> (emit after every event appears after data is really believed complete
>>> in a
>>>> window)
>>>> 
>>>> SELECT *
>>>> 
>>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>>>> 
>>>> GROUP BY T.window_end
>>>> 
>>>> EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1
>>>> 
>>>> 
>>> AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane
>>>> ().plusDuration(Duration.standardMinutes(5))
>>>> 
>>>> (emit before a window is complete, by following the delay emit strategy)
>>>> 
>>>> SELECT *
>>>> 
>>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>>>> 
>>>> GROUP BY T.window_end
>>>> 
>>>> EMIT HAVING current_watermark() < T.window_end AND
>>>> processing_time_since_first_element() >= INTERVAL 5 MINUTE
>>>> 
>>>> AfterWatermark.pastEndOfWindow()
>>>> 
>>>> 
>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDuration
>>>> (Duration.standardMinutes(5))
>>>> 
>>>> .withLateFirings(AfterPane.elementCountAtLeast(1))
>>>> 
>>>> (a combination of emitting before window closes, emit on window closes
>>> and
>>>> emit after window closes)
>>>> 
>>>> SELECT *
>>>> 
>>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>>>> 
>>>> GROUP BY T.window_end
>>>> 
>>>> EMIT HAVING
>>>> 
>>>> current_watermark() >= T.window_end OR
>>>> 
>>>> (current_watermark() > T.window_end AND COUNT(*) > 1) OR
>>>> 
>>>> (current_watermark() < T.window_end AND
>>>> processing_time_since_first_element() >= INTERVAL 5 MINUTE)
>>>> 
>>>> 
>>>> AfterWatermark.pastEndOfWindow()
>>>> 
>>>> .withAllowedLateness(Duration.standardDays(2)))
>>>> 
>>>> (emit after window closes plus a tolerance of 2 days late data)
>>>> 
>>>> SELECT *
>>>> 
>>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>>>> 
>>>> GROUP BY T.window_end
>>>> 
>>>> EMIT HAVING
>>>> 
>>>> current_watermark() >= T.window_end AND current_watermark() <
>>> T.window_end
>>>> + INTERVAL 2 DAY
>>>> 
>>>> Composite triggers
>>>> illustrated by examples above that AND and OR can be used to concat
>>>> different bool expressions.
>>>> 
>>>> 
>>>> 
>>>> Please let me know your thoughts and any other way you prefer to
>>> continue
>>>> discussing.
>>>> 
>>>> [1]: https://arxiv.org/pdf/1905.12133.pdf
>>>> [2]: https://www.youtube.com/watch?v=TWxSLmkWPm4
>>>> 
>>>> 
>>>> Thanks,
>>>> Rui Wang
>>> 
>> 

Reply via email to