A small update:

I tried to prototype how to support just "EMIT AFTER WATERMARK". Seems like
supporting such syntax in planner without alias reference is pretty
straightforward.

Regarding to engine implementation (e.g. Enumerables), while I am still
prototyping, I feel like the implementation will be the same as "EMIT
window_start >= current_watermark()":

Basically we need a way for different engines to provide their watermark,
and seems like function/operator is a handy way. In this case what is in
plans with EMIT could still be a RexNode that gives an expression between
watermark (a operator that leaves for engine to implement) and a column.
etc.



-Rui



On Wed, Jan 22, 2020 at 12:18 PM Rui Wang <[email protected]> wrote:

> That makes sense. I will remove the aggregation constraint (i.e. EMIT
> requires a GROUP BY). Let's say now EMIT should work on any query for
> general purpose.
>
> Because the above contains too much information, let me further summarize
> critical points here and see if we could reach a consensus:
>
> 1. Do we agree EMIT should execute after FROM, but before any other
> clauses, assuming EMIT works on any query?
> My opinion is EMIT should execute after FROM. It actually matches what
> Julian has said: "Think of it as executing the query at T, then executing
> it at T+delta". Emit just controls how large the delta is. And all other
> comparisons are just the following WHERE, GROUP BY, HAVING, ORDER BY,
> LIMIT, etc. It will also match with DB cases, where EMIT produces a single
> delta once that is from -inf to +inf on the timeline.
>
>
> 2. Can we support EMIT predicates rather than a list of fixed emit
> strategy?
> To recap, pros and cons of EMIT predicates:
> pros: 1)  extensible with several predefined functions. And if there is a
> new need, it will very likely to define a function than defining
> new keywords/syntax. 2) Easy to understand (think about it will be applied
> to tables to decide when to emit rows).
> cons: 1) Users will gain a lot of powers to write expressions.
> pros and cons of special EMIT strategy syntax:
> pros: 1) uses will not have a lot of power to write expressions as syntax
> is fixed (they can tune a few parameters though)
> cons: 1) had trouble explaining it to SQL people (it sounds like a hack).
> 2) there are 5 or more strategies known so we need a list that is longer
> than what was proposed in paper. 3) Potential backward-compatible issues in
> case emit strategies change.
>
> Lastly, for the table evolving problem that Julian mentioned (e.g. see
> (100, 6) retracted and (100, 8) added), I totally agree with it because of
> the nature of streaming: you probably never know if data is complete when a
> result emits, thus the result could be updated later.
>
>
> -Rui
>
>
> On Wed, Jan 22, 2020 at 11:29 AM Julian Hyde <[email protected]>
> wrote:
>
>> In the SIGMOD paper, EMIT could be applied to any query. Think of it as
>> executing the query at T, then executing it at T+delta, compare the
>> results, and emit any added or removed records.
>>
>> Tying it to aggregate queries seems wrong. (If we have a special
>> semantics for aggregate queries, how are we possibly going to make the
>> semantics well-defined for, say, user-defined table functions?)
>>
>> Yes, I know aggregate queries have weird behavior. If you have computed
>> ’select product, sum(amount) from orders group by product’, and a new order
>> with (product 100, amount 2), then you are going to see (100, 6) retracted
>> and (100, 8) added. But I think we have to live with that. Otherwise EMIT
>> semantics get a lot more complicated.
>>
>> Julian
>>
>>
>> > On Jan 21, 2020, at 1:24 PM, Rui Wang <[email protected]> wrote:
>> >
>> > I think there was a big missing in my summary about the position of EMIT
>> > and the execution order (and I forgot about ORDER BY). Try to address
>> them
>> > here:
>> >
>> > SELECT
>> >
>> > [FROM TVF windowing] // windowing happen here
>> >
>> > [WHERE clause]
>> >
>> > [GROUP BY clause]
>> >
>> > [HAVING clause]
>> >
>> > [ORDER BY clause]
>> >
>> > [LIMIT clause]
>> > [EMIT clause] // materialization latency
>> >
>> > The position of EMIT is indeed a bit confusing. As the right execution
>> > order should be: FROM -> EMIT -> others like normal query. FROM is
>> > continuously generating data and EMIT decide when to emit a part of
>> data,
>> > and then other clauses are applied to emitted data and update
>> downstream.
>> >
>> > So at least two open questions:
>> > 1. What should we use for EMIT? EMIT HAVING (can use aggregation columns
>> > like COUNT(*)), EMIT WHERE (can only use single column alias like ORDER
>> BY)
>> > or EMIT AFTER (undefined yet if we want to support expressions, I hope
>> we
>> > do).
>> > 2. Where is the EMIT clause? Maybe the most clear position is to put it
>> > after FROM.
>> >
>> >
>> > -Rui
>> >
>> >
>> > On Tue, Jan 21, 2020 at 1:09 PM Rui Wang <[email protected]> wrote:
>> >
>> >>
>> >>
>> >> On Tue, Jan 21, 2020 at 12:34 PM Julian Hyde <[email protected]> wrote:
>> >>
>> >>> Does EMIT HAVING have anything to do with aggregate queries (GROUP BY
>> >>> and HAVING), or is it just a coincidence that you use the same word,
>> >>> HAVING?
>> >>>
>> >>
>> >> EMIT HAVING is independent from HAVING, but EMIT HAVING does have a
>> >> relationship to GROUP BY: EMIT HAVING requires a GROUP BY. It is a
>> GROUP BY
>> >> key, then apply EMIT HAVING expressions on sets specified by those
>> keys.
>> >> However we could loose the constraint to allow EMIT HAVING appears even
>> >> without GROUP BY, which just means that apply emit control on the whole
>> >> data set than control the emitting per group.
>> >>
>> >> In my opinion, the execution order is: grouping (GROUP BY) -> EMIT
>> control
>> >> (emit having to decide which part of data can be emitted) ->
>> aggregation
>> >> (normal having and other aggregations). For batch/classic DB workload,
>> the
>> >> EMIT step will always emit all data. So such idea is compatible with
>> >> existing DB users.
>> >>
>> >> I happen to choose EMIT HAVING because the emit control is very
>> similar to
>> >> HAVING (and some bits of WHERE) that the only difference is: HAVING is
>> a
>> >> filter while EMIT HAVING control the emit. E.g apply HAVING
>> expressions to
>> >> data means if pass this data to downstream or not. And applying EMIT
>> >> HAVING expressions means if pass this data to downstream now or later
>> (or
>> >> discard it if the window closes).
>> >>
>> >> If you think the idea actually causes confusion rather than
>> convenience to
>> >> onboard people to use steaming sql, we can replace EMIT HAVING by EMIT
>> >> AFTER, per the original design.
>> >>
>> >> I support the idea of latency controls, but I am nervous about
>> >>> allowing full expressions in the EMIT clause if we don't have to.
>> >>>
>> >>>
>> >> Yep. It's a design choice of allowing expressions or keeping a set of
>> >> dedicated SQL syntax for some emit strategies. If don't use extensible
>> EMIT
>> >> expressions, we likely will need to provide a long list of syntax for
>> >> different emit strategies. For examples:
>> >> EMIT AFTER WATERMARK
>> >> EMIT AFTER DELAY
>> >> EMIT AFTER AFTER WATERMARK BUT LATE
>> >> EMIT AFTER COUNT
>> >> EMIT AFTER BEFORE WATERMARK
>> >> etc.
>> >>
>> >> Again it's a design choice so I am open to both ideas.
>> >>
>> >> However, personally I prefer the EMIT expressions idea because I found
>> it
>> >> was very easy to explain EMIT expressions to SQL people who don't have
>> much
>> >> streaming brackgroup. Basically you can say EMIT expressions are just
>> >> applied to rows of table from table-valued function. If there are
>> GROUP BY,
>> >> each apply expression to each group accordingly. and the result of
>> >> expression indicates if it's ready to emit. This easiness is also
>> mainly
>> >> from that we have introduced window start and window end into the
>> table,
>> >> thus we should have all data we needed in table to write expressions
>> >> against them (except for processing time triggers).
>> >>
>> >> The downside of expressions though is people will be allowed to write
>> any
>> >> expression they want, and engines will take responsibility to validate
>> >> those.
>> >>
>> >>
>> >>
>> >>> Aggregate queries have a peculiar scope. (You can only reference
>> >>> grouped expressions, for example.) I don't think we should drag that
>> >>> peculiar scope into the EMIT clause. The simplest thing is to only
>> >>> allow the EMIT clause to reference column aliases, which is similar to
>> >>> the scope used by ORDER BY.
>> >>>
>> >>> That sounds like one of the prototype ideas: EMIT WHERE. In EMIT
>> WHERE,
>> >> just like WHERE, you will not reference aggregation columns but just
>> column
>> >> aliases. It can solve the event time trigger and processing time
>> triggers.
>> >> However, it has a shortcoming of cannot express: EMIT WHEN there are
>> 100
>> >> elements accumulated, which require a COUNT(*).
>> >>
>> >>
>> >>
>> >>> Am I allowed to combine EMIT and ORDER BY in the same SELECT block? If
>> >>> so, which comes first?
>> >>>
>> >>>
>> >> Sorry I forgot about ORDER BY. In my opinion EMIT should be applied
>> after
>> >> FROM, but before all other WHERE, HAVING (or aggregation), ORDER BY.
>> Note
>> >> that EMIT controls when to emit data from the streaming dataset, thus
>> all
>> >> filters and aggregations should be applied after data is read to emit.
>> Note
>> >> again to emphasize that for classic DB/batch cases, EMIT is explicitly
>> >> there which just EMIT all data once after FROM as all data is already
>> >> known.
>> >>
>> >>
>> >> On Tue, Jan 21, 2020 at 12:41 PM Julian Hyde <[email protected]> wrote:
>> >>
>> >>> One more thought. Since EMIT produces a relation, your "EMIT HAVING
>> >>> current_watermark() >= T.window_end AND current_watermark() <
>> >>> T.window_end + INTERVAL 2 DAY" could perhaps be accomplished by
>> >>> wrapping the EMIT query as a sub-query and using ordinal SQL
>> >>> expressions on the system columns added by EMIT. (I'm not saying we
>> >>> should do this. But when designing a feature, it's worth calling out
>> >>> whether it adds power or whether it is syntactic sugar.)
>> >>
>> >>
>> >> Sounds like just do a normal WHERE based on the table from sub-query.
>> yep
>> >> that's an option. Then open question from it is if we want to mix
>> "EMIT''
>> >> semantic (which is not really a classic SQL filter) or "Filter"
>> semantic
>> >> (WHERE and HAVING) into existing WHERE and HAVING. I prefer not to, to
>> >> leave classic DB queries unchanged (their WHERE is just a per row
>> filter
>> >> which having any new semantic, e.g. latency control, added)
>> >>
>> >>
>> >>
>> >>> On Tue, Jan 21, 2020 at 11:22 AM Rui Wang <[email protected]>
>> wrote:
>> >>>>
>> >>>> Hi community,
>> >>>>
>> >>>> First of all, thanks all your help on the CALCITE-3272 (TUMBLE now
>> >>> works as
>> >>>> a table-valued function). As it starts to work, I want to continue
>> >>>> discussing the following work of streaming sql: a way to control
>> >>>> materialization latency of a streaming query. A small note for people
>> >>> who
>> >>>> are not familiar with streaming: because steaming queries are long
>> >>> running
>> >>>> queries (maybe months or up to year), usually there is a need to see
>> >>>> results before query is terminated. Thus it will be desired to have a
>> >>> way
>> >>>> to allow users to specify, e.g. how frequently they want to see some
>> >>> result
>> >>>> from the query.
>> >>>>
>> >>>> (The following will be a summary of my proposal. I can convert it to
>> a
>> >>>> design doc if people prefer that way. Just let me know)
>> >>>>
>> >>>> *Priori work*
>> >>>> My idea is built on top of "one sql to rule them all paper" [1].
>> Kudos
>> >>> to
>> >>>> people who contributed that paper, which became the foundation of my
>> >>>> discussion.
>> >>>>
>> >>>> From [1], an EMIT clause was actually proposed to be added to the
>> end of
>> >>>> the query. Two syntax of EMIT clause was also proposed:
>> >>>> 1. EMIT after the watermark passes the end of the window. E.g. EMIT
>> >>>> [STREAM] AFTER WATERMARK.
>> >>>> 2. Delay emit for a period of time after a change happens (e.g.
>> element
>> >>>> arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6'
>> >>> MINUTES
>> >>>>
>> >>>> *Typical Streaming emitting latency patterns*
>> >>>> 1. Event time triggers. Emitting depends on the relationship between
>> >>>> watermark and event timestamp of events. Check this video [2] if you
>> >>> want
>> >>>> to have an introduction of watermark in the streaming world, and data
>> >>>> completeness concept based on event-timestamp.
>> >>>> 2. Processing time triggers. Emitting depends on the system clock.
>> This
>> >>> is
>> >>>> a natural idea of emitting. E.g. emit the current result every hour
>> >>> without
>> >>>> considering if data in a window is already complete.
>> >>>> 3. data-driven triggers. E.g. emit when accumulated events exceed a
>> >>>> threshold (e.g. emit when have acculucated 1000 events)
>> >>>> 4. Composite triggers. There is a need to concat 1, 2, 3 by OR and
>> AND
>> >>> to
>> >>>> achieve better latency control.
>> >>>>
>> >>>> *Proposal to discuss*
>> >>>> I want to extend the proposal in [1] and propose EMIT HAVING syntax
>> and
>> >>> two
>> >>>> aggregation functions.
>> >>>>
>> >>>> *EMIT HAVING bool_expression*. EMIT HAVING syntax should be after
>> normal
>> >>>> HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves
>> >>> similar
>> >>>> to HAVING. There are two differences that are worth mentioning:
>> >>>> 1. It’s not a filter. it controls when table-valued function in FROM
>> >>> clause
>> >>>> should emit a set. The set is specified by group-by keys.
>> >>>> 2.GROUP BY keys are visible to EMIT HAVING while it is not the case
>> for
>> >>>> HAVING.
>> >>>>
>> >>>> *current_watermark(). *current_watermark() is an aggregation function
>> >>> that
>> >>>> returns a timestamp that provides where watermark is for a set.
>> >>>>
>> >>>> *processing_time_since_first_element().*
>> >>> processing_time_since_first_element()
>> >>>> is an aggregation function that returns the system clock time (i.e.
>> >>>> processing time) since the first element appears in the set.
>> >>>>
>> >>>> *Motivating examples*
>> >>>> The following will be a list of motivating examples of how this
>> proposal
>> >>>> will achieve different ways of latency control.
>> >>>>
>> >>>> Assume there is a steaming query that apply fixed windowing (thus
>> >>> TUMBLE)
>> >>>> as the following:
>> >>>>
>> >>>> “SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10
>> MINUTE)
>> >>> AS
>> >>>> T”. Based on this query, let’s say derived table T from orders  has
>> the
>> >>>> following schema:
>> >>>>
>> >>>>
>> >>>> order_id BIGINT
>> >>>>
>> >>>> product_id BIGINT
>> >>>>
>> >>>> order_meta STRING
>> >>>>
>> >>>> event_ts TIMESTAMP
>> >>>>
>> >>>> window_start TIMESTAMP
>> >>>>
>> >>>> window_end TIMESTAMP
>> >>>>
>> >>>>
>> >>>>
>> >>>> The following will be a table to demonstrate for different trigger
>> >>> cases,
>> >>>> how the query will be modified by adopting EMIT syntax to express the
>> >>> same
>> >>>> semantic:
>> >>>>
>> >>>> Trigger
>> >>>>
>> >>>> SQL Query
>> >>>>
>> >>>> AfterWatermark.pastEndOfWindow()
>> >>>> (emit after watermark passes end of window, thus for a window data is
>> >>>> believed complete)
>> >>>>
>> >>>> SELECT *
>> >>>>
>> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >>>>
>> >>>> EMIT HAVING current_watermark() >= T.window_end
>> >>>>
>> >>>> AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration.
>> >>>> standardMinutes(5)
>> >>>>
>> >>>> (emit after a delay of a 5 minutes when first element appear in
>> window)
>> >>>>
>> >>>> SELECT *
>> >>>>
>> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >>>>
>> >>>> GROUP BY T.window_end
>> >>>>
>> >>>> EMIT HAVING processing_time_since_first_element() >= INTERVAL 5
>> MINUTE
>> >>>>
>> >>>> AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1))
>> >>>> (emit after every event appears after data is really believed
>> complete
>> >>> in a
>> >>>> window)
>> >>>>
>> >>>> SELECT *
>> >>>>
>> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >>>>
>> >>>> GROUP BY T.window_end
>> >>>>
>> >>>> EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1
>> >>>>
>> >>>>
>> >>>
>> AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane
>> >>>> ().plusDuration(Duration.standardMinutes(5))
>> >>>>
>> >>>> (emit before a window is complete, by following the delay emit
>> strategy)
>> >>>>
>> >>>> SELECT *
>> >>>>
>> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >>>>
>> >>>> GROUP BY T.window_end
>> >>>>
>> >>>> EMIT HAVING current_watermark() < T.window_end AND
>> >>>> processing_time_since_first_element() >= INTERVAL 5 MINUTE
>> >>>>
>> >>>> AfterWatermark.pastEndOfWindow()
>> >>>>
>> >>>>
>> >>>
>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDuration
>> >>>> (Duration.standardMinutes(5))
>> >>>>
>> >>>> .withLateFirings(AfterPane.elementCountAtLeast(1))
>> >>>>
>> >>>> (a combination of emitting before window closes, emit on window
>> closes
>> >>> and
>> >>>> emit after window closes)
>> >>>>
>> >>>> SELECT *
>> >>>>
>> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >>>>
>> >>>> GROUP BY T.window_end
>> >>>>
>> >>>> EMIT HAVING
>> >>>>
>> >>>> current_watermark() >= T.window_end OR
>> >>>>
>> >>>> (current_watermark() > T.window_end AND COUNT(*) > 1) OR
>> >>>>
>> >>>> (current_watermark() < T.window_end AND
>> >>>> processing_time_since_first_element() >= INTERVAL 5 MINUTE)
>> >>>>
>> >>>>
>> >>>> AfterWatermark.pastEndOfWindow()
>> >>>>
>> >>>> .withAllowedLateness(Duration.standardDays(2)))
>> >>>>
>> >>>> (emit after window closes plus a tolerance of 2 days late data)
>> >>>>
>> >>>> SELECT *
>> >>>>
>> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >>>>
>> >>>> GROUP BY T.window_end
>> >>>>
>> >>>> EMIT HAVING
>> >>>>
>> >>>> current_watermark() >= T.window_end AND current_watermark() <
>> >>> T.window_end
>> >>>> + INTERVAL 2 DAY
>> >>>>
>> >>>> Composite triggers
>> >>>> illustrated by examples above that AND and OR can be used to concat
>> >>>> different bool expressions.
>> >>>>
>> >>>>
>> >>>>
>> >>>> Please let me know your thoughts and any other way you prefer to
>> >>> continue
>> >>>> discussing.
>> >>>>
>> >>>> [1]: https://arxiv.org/pdf/1905.12133.pdf
>> >>>> [2]: https://www.youtube.com/watch?v=TWxSLmkWPm4
>> >>>>
>> >>>>
>> >>>> Thanks,
>> >>>> Rui Wang
>> >>>
>> >>
>>
>>

Reply via email to