Yes,I agree the problem is needs attention. IMO. It depends on how we
define the ‘w type. The way you above defines the 'w type as a tuple. If
you serialize 'w to a Map, the compatibility will be better. Even more we
can define ‘w as a special type. UDF and Sink can't be used directly. Must
use 'w.start, 'w.end, 'w.rowtime, 'w.proctime, 'w.XXX', and I will be very
grateful if you can share your solution to this problem,  and we also can
discuss it carefully in the PR to be opened. What to you think?

Fabian Hueske <fhue...@gmail.com> 于2018年11月23日周五 下午6:21写道:

> Something like:
>
> val x = tab.window(Tumble ... as 'w)
>     .groupBy('w, 'k1, 'k2)
>     .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
>
> x.insertInto("sinkTable") // fails because result schema has changed from
> ((start, end, rowtime), k1, k2, col1, col2) to ((start, end, rowtime,
> newProperty), k1, k2, col1, col2)
> x.select(myUdf('w)) // fails because UDF expects (start, end, rowtime) and
> not (start, end, rowtime, newProperty)
>
> Basically, every time when the composite type 'w is used as a whole.
>
>
> Am Fr., 23. Nov. 2018 um 10:45 Uhr schrieb jincheng sun <
> sunjincheng...@gmail.com>:
>
> > Hi Fabian,
> >
> > I don't fully understand the question you mentioned:
> >
> > Any query that relies on the composite type with three fields will fail
> >
> > after adding a forth field.
> >
> >
> > I am appreciate if you can give some detail examples ?
> >
> > Regards,
> > JIncheng
> >
> >
> > Fabian Hueske <fhue...@gmail.com> 于2018年11月23日周五 下午4:41写道:
> >
> > > Hi,
> > >
> > > My concerns are about the case when there is no additional select()
> > method,
> > > i.e.,
> > >
> > > tab.window(Tumble ... as 'w)
> > >     .groupBy('w, 'k1, 'k2)
> > >     .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
> > >
> > > In this case, 'w is a composite field consisting of three fields (end,
> > > start, rowtime).
> > > Once we add a new property, it would need to be added to the composite
> > > type.
> > > Any query that relies on the composite type with three fields will fail
> > > after adding a forth field.
> > >
> > > Best, Fabian
> > >
> > > Am Fr., 23. Nov. 2018 um 02:01 Uhr schrieb jincheng sun <
> > > sunjincheng...@gmail.com>:
> > >
> > > > Thanks Fabian,
> > > >
> > > > Thanks a lot for your feedback, and very important and necessary
> design
> > > > reminders!
> > > >
> > > > Yes, your are right!  Spark is the specified grouping columns
> displayed
> > > > before 1.3, but the grouping columns are implicitly passed in
> spark1.4
> > > and
> > > > later. The reason for changing this behavior is that due to the user
> > > > feedback. Although implicit delivery will have the drawbacks you
> > > mentioned,
> > > > this approach is really convenient for the user.
> > > > I agree that grouping on windows we have to pay attention to the
> > handling
> > > > of the window's properties, because we may introduce new window
> > property.
> > > > So, from the points of view, We delay the processing of the window
> > > > property, ie: we pass the complex type 'w on the tableAPI, and
> provide
> > > > different property method operations in the SELECT according to the
> > type
> > > of
> > > > 'w, such as: 'w.start, 'w.end, 'w.xxx , in the TableAPI will limit
> and
> > > > verify the attribute operations that 'w has. An example is as
> follows:
> > > >
> > > > tab.window(Tumble ... as 'w)
> > > >     .groupBy('w, 'k1, 'k2) // 'w should be a group key.
> > > >     .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2) // 'w is
> > > > composite field
> > > >     .select('k1, 'col1, 'w.rowtime as 'ts, 'w.xxx as 'xx) // In
> select
> > we
> > > > will limit and verify  that ’w.xx is allowed
> > > >
> > > > I am not sure if I fully understand your concerns, if there any
> > > understand
> > > > are mistakes, please correct me. Any feedback is appreciate!
> > > >
> > > > Bests,
> > > > Jincheng
> > > >
> > > >
> > > > Fabian Hueske <fhue...@gmail.com> 于2018年11月22日周四 下午10:13写道:
> > > >
> > > > > Hi all,
> > > > >
> > > > > First of all, it is correct that the flatMap(Expression*) and
> > > > > flatAggregate(Expression*) methods would mix scalar and table
> values.
> > > > > This would be a new concept that is not present in the current API.
> > > > > From my point of view, the semantics are quite clear, but I
> > understand
> > > > that
> > > > > others are more careful and worry about future extensions.
> > > > >
> > > > > I am fine with going for single expression arguments for map() and
> > > > > flatMap(). We can later expand them to Expression* if we feel the
> > need
> > > > and
> > > > > are more comfortable about the implications.
> > > > > Whenever, a time attribute needs to be forwarded, users can fall
> back
> > > to
> > > > > join(TableFunction) as Xiaowei mentioned.
> > > > > So we restrict the usability of the new methods but don't lose
> > > > > functionality and don't prevent future extensions.
> > > > >
> > > > > The aggregate() and flatAggregate() case is more difficult because
> > > > implicit
> > > > > forwarding of grouping fields cannot be changed later without
> > breaking
> > > > the
> > > > > API.
> > > > > There are other APIs (e.g., Spark) that also implicitly forward the
> > > > > grouping columns. So this is not uncommon.
> > > > > However, I personally don't like that approach, because it is
> > implicit
> > > > and
> > > > > introduces a new behavior that is not present in the current API.
> > > > >
> > > > > One thing to consider here is the handling of grouping on windows.
> > > > > If I understood Xiaowei correctly, a composite field that is named
> > like
> > > > the
> > > > > window alias (e.g., 'w) would be implicitly added to the result of
> > > > > aggregate() or flatAggregate().
> > > > > The composite field would have fields like (start, end, rowtime) or
> > > > (start,
> > > > > end, proctime) depending on the window type.
> > > > > If we would ever introduce a fourth window property, we might break
> > > > > existing queries.
> > > > > Is this something that we should worry about?
> > > > >
> > > > > Best,
> > > > > Fabian
> > > > >
> > > > > Am Do., 22. Nov. 2018 um 14:03 Uhr schrieb Piotr Nowojski <
> > > > > pi...@data-artisans.com>:
> > > > >
> > > > > > Hi Jincheng,
> > > > > >
> > > > > > #1) ok, got it.
> > > > > >
> > > > > > #3)
> > > > > > > From points of my view I we can using
> > > > > > > `Expression`, and after the discussion decided to use
> > Expression*,
> > > > then
> > > > > > > improve it. In any case, we can use Expression, and there is an
> > > > > > opportunity
> > > > > > > to become Expression* (compatibility). If we use Expression*
> > > > directly,
> > > > > it
> > > > > > > is difficult for us to become Expression, which will break the
> > > > > > > compatibility between versions.  What do you think?
> > > > > >
> > > > > > I don’t think that’s the case here. If we start with single param
> > > > > > `flatMap(Expression)`, it will need implicit columns to be
> present
> > in
> > > > the
> > > > > > result, which:
> > > > > >
> > > > > > a) IMO it brakes SQL convention (that’s why I’m against this)
> > > > > > b) we can not later easily introduce `flatMap(Expression*)`
> without
> > > > those
> > > > > > implicit columns, without braking the compatibility or at least
> > > without
> > > > > > making `flatMap(Expression*)` and `flatMap(Expression)` terribly
> > > > > > inconsistent.
> > > > > >
> > > > > > To elaborate on (a). It’s not nice if our own API is inconsistent
> > and
> > > > it
> > > > > > sometimes behaves one way and sometimes another way:
> > > > > >
> > > > > > table.groupBy(‘k).select(scalarAggregateFunction(‘v)) => single
> > > column
> > > > > > result, just the output of `scalarAggregateFunction`
> > > > > > vs
> > > > > > table.groupBy(‘k).flatAggregate(tableAggregateFunction(‘v)) =>
> both
> > > > > result
> > > > > > of `tableAggregateFunction` plus key (and an optional window
> > context
> > > ?)
> > > > > >
> > > > > > Thus I think we have to now decide which way we want to jump,
> since
> > > > later
> > > > > > will be too late. Or again, am I missing something? :)
> > > > > >
> > > > > > Piotrek
> > > > > >
> > > > > > > On 22 Nov 2018, at 02:07, jincheng sun <
> sunjincheng...@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > Hi Piotrek,
> > > > > > > #1)We have unbounded and bounded group window aggregate, for
> > > > unbounded
> > > > > > case
> > > > > > > we should early fire the result with retract message, we can
> not
> > > > using
> > > > > > > watermark, because unbounded aggregate never finished. (for
> > > > improvement
> > > > > > we
> > > > > > > can introduce micro-batch in feature),  for bounded window we
> > never
> > > > > > support
> > > > > > > early fire, so we do not need retract.
> > > > > > > #3)  About validation of `table.select(F(‘a).unnest(), ‘b,
> > > > > > > G(‘c).unnest())/table.flatMap(F(‘a), ‘b, scalarG(‘c))` Fabian
> had
> > > > > > mentioned
> > > > > > > above, please look at the prior mail.  For
> `table.flatMap(F(‘a),
> > > ‘b,
> > > > > > > scalarG(‘c))` that we concerned, i.e.:  we should discuss the
> > issue
> > > > of
> > > > > > > `Expression*` vs `Expression`. From points of my view I we can
> > > using
> > > > > > > `Expression`, and after the discussion decided to use
> > Expression*,
> > > > then
> > > > > > > improve it. In any case, we can use Expression, and there is an
> > > > > > opportunity
> > > > > > > to become Expression* (compatibility). If we use Expression*
> > > > directly,
> > > > > it
> > > > > > > is difficult for us to become Expression, which will break the
> > > > > > > compatibility between versions.  What do you think?
> > > > > > >
> > > > > > > If there anything not clearly, welcome any
> feedback!Agains,thanks
> > > for
> > > > > > share
> > > > > > > your thoughts!
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Jincheng
> > > > > > >
> > > > > > > Piotr Nowojski <pi...@data-artisans.com> 于2018年11月21日周三
> > 下午9:37写道:
> > > > > > >
> > > > > > >> Hi Jincheng,
> > > > > > >>
> > > > > > >>> #1) No,watermark solves the issue of the late event. Here,
> the
> > > > > > >> performance
> > > > > > >>> problem is caused by the update emit mode. i.e.: When current
> > > > > > calculation
> > > > > > >>> result is output, the previous calculation result needs to be
> > > > > > retracted.
> > > > > > >>
> > > > > > >> Hmm, yes I missed this. For time-windowed cases (some
> > > > > > >> aggregate/flatAggregate cases) emitting only on watermark
> should
> > > > solve
> > > > > > the
> > > > > > >> problem. For non time windowed cases it would reduce the
> amount
> > of
> > > > > > >> retractions, right? Or am I still missing something?
> > > > > > >>
> > > > > > >>> #3)I still hope to keep the simplicity that select only
> support
> > > > > > projected
> > > > > > >>> scalar, we can hardly tell the semantics of
> > > tab.select(flatmap('a),
> > > > > 'b,
> > > > > > >>> flatmap('d)).
> > > > > > >>
> > > > > > >> table.select(F(‘a).unnest(), ‘b, G(‘c).unnest())
> > > > > > >>
> > > > > > >> Could be rejected during some validation phase. On the other
> > hand:
> > > > > > >>
> > > > > > >> table.select(F(‘a).unnest(), ‘b, scalarG(‘c))
> > > > > > >> or
> > > > > > >> table.flatMap(F(‘a), ‘b, scalarG(‘c))
> > > > > > >>
> > > > > > >> Could work and be more or less a syntax sugar for cross apply.
> > > > > > >>
> > > > > > >> Piotrek
> > > > > > >>
> > > > > > >>> On 21 Nov 2018, at 12:16, jincheng sun <
> > sunjincheng...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >>>
> > > > > > >>> Hi shaoxuan & Hequn,
> > > > > > >>>
> > > > > > >>> Thanks for your suggestion,I'll file the JIRAs later.
> > > > > > >>> We can prepare PRs while continuing to move forward the
> ongoing
> > > > > > >> discussion.
> > > > > > >>>
> > > > > > >>> Regards,
> > > > > > >>> Jincheng
> > > > > > >>>
> > > > > > >>> jincheng sun <sunjincheng...@gmail.com> 于2018年11月21日周三
> > 下午7:07写道:
> > > > > > >>>
> > > > > > >>>> Hi Piotrek,
> > > > > > >>>> Thanks for your feedback, and thanks for  share your
> thoughts!
> > > > > > >>>>
> > > > > > >>>> #1) No,watermark solves the issue of the late event. Here,
> the
> > > > > > >> performance
> > > > > > >>>> problem is caused by the update emit mode. i.e.: When
> current
> > > > > > >> calculation
> > > > > > >>>> result is output, the previous calculation result needs to
> be
> > > > > > retracted.
> > > > > > >>>> #2) As I mentioned above we should continue the discussion
> > until
> > > > we
> > > > > > >> solve
> > > > > > >>>> the problems raised by Xiaowei and Fabian.
> > > > > > >>>> #3)I still hope to keep the simplicity that select only
> > support
> > > > > > >> projected
> > > > > > >>>> scalar, we can hardly tell the semantics of
> > > > tab.select(flatmap('a),
> > > > > > 'b,
> > > > > > >>>> flatmap('d)).
> > > > > > >>>>
> > > > > > >>>> Thanks,
> > > > > > >>>> Jincheng
> > > > > > >>>>
> > > > > > >>>> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月21日周三
> > > 下午5:24写道:
> > > > > > >>>>
> > > > > > >>>>> Hi,
> > > > > > >>>>>
> > > > > > >>>>> 1.
> > > > > > >>>>>
> > > > > > >>>>>> In fact, in addition to the design of APIs, there will be
> > > > various
> > > > > > >>>>>> performance optimization details, such as: table Aggregate
> > > > > function
> > > > > > >>>>>> emitValue will generate multiple calculation results, in
> > > extreme
> > > > > > >> cases,
> > > > > > >>>>>> each record will trigger a large number of retract
> messages,
> > > > this
> > > > > > will
> > > > > > >>>>> have
> > > > > > >>>>>> poor performance
> > > > > > >>>>>
> > > > > > >>>>> Can this be solved/mitigated by emitting the results only
> on
> > > > > > >> watermarks?
> > > > > > >>>>> I think that was the path that we decided to take both for
> > > > Temporal
> > > > > > >> Joins
> > > > > > >>>>> and upsert stream conversion. I know that this increases
> the
> > > > > latency
> > > > > > >> and
> > > > > > >>>>> there is a place for a future global setting/user
> preference
> > > > “emit
> > > > > > the
> > > > > > >> data
> > > > > > >>>>> ASAP mode”, but emitting only on watermarks seems to me as
> a
> > > > > > >> better/more
> > > > > > >>>>> sane default.
> > > > > > >>>>>
> > > > > > >>>>> 2.
> > > > > > >>>>>
> > > > > > >>>>> With respect to the API discussion and implicit columns.
> The
> > > > > problem
> > > > > > >> for
> > > > > > >>>>> me so far is I’m not sure if I like the additionally
> > complexity
> > > > of
> > > > > > >>>>> `append()` solution, while implicit columns are definitely
> > not
> > > in
> > > > > the
> > > > > > >>>>> spirit of SQL. Neither joins nor aggregations add extra
> > > > unexpected
> > > > > > >> columns
> > > > > > >>>>> to the result without asking. This definitely can be
> > confusing
> > > > for
> > > > > > the
> > > > > > >>>>> users since it brakes the convention. Thus I would lean
> > towards
> > > > > > >> Fabian’s
> > > > > > >>>>> proposal of multi-argument `map(Expression*)` from those 3
> > > > options.
> > > > > > >>>>>
> > > > > > >>>>> 3.
> > > > > > >>>>>
> > > > > > >>>>> Another topic is that I’m not 100% convinced that we should
> > be
> > > > > adding
> > > > > > >> new
> > > > > > >>>>> api functions for `map`,`aggregate`,`flatMap` and
> > > > `flatAggregate`.
> > > > > I
> > > > > > >> think
> > > > > > >>>>> the same could be achieved by changing
> > > > > > >>>>>
> > > > > > >>>>> table.map(F('x))
> > > > > > >>>>>
> > > > > > >>>>> into
> > > > > > >>>>>
> > > > > > >>>>> table.select(F('x)).unnest()
> > > > > > >>>>> or
> > > > > > >>>>> table.select(F('x).unnest())
> > > > > > >>>>>
> > > > > > >>>>> Where `unnest()` means unnest row/tuple type into a
> columnar
> > > > table.
> > > > > > >>>>>
> > > > > > >>>>> table.flatMap(F('x))
> > > > > > >>>>>
> > > > > > >>>>> Could be on the other hand also handled by
> > > > > > >>>>>
> > > > > > >>>>> table.select(F('x))
> > > > > > >>>>>
> > > > > > >>>>> By correctly deducing that F(x) is a multi row output
> > function
> > > > > > >>>>>
> > > > > > >>>>> Same might apply to `aggregate(F('x))`, but this maybe
> could
> > be
> > > > > > >> replaced
> > > > > > >>>>> by:
> > > > > > >>>>>
> > > > > > >>>>> table.groupBy(…).select(F('x).unnest())
> > > > > > >>>>>
> > > > > > >>>>> Adding scalar functions should also be possible:
> > > > > > >>>>>
> > > > > > >>>>> table.groupBy('k).select(F('x).unnest(), ‘k)
> > > > > > >>>>>
> > > > > > >>>>> Maybe such approach would allow us to implement the same
> > > features
> > > > > in
> > > > > > >> the
> > > > > > >>>>> SQL as well?
> > > > > > >>>>>
> > > > > > >>>>> Piotrek
> > > > > > >>>>>
> > > > > > >>>>>> On 21 Nov 2018, at 09:43, Hequn Cheng <
> chenghe...@gmail.com
> > >
> > > > > wrote:
> > > > > > >>>>>>
> > > > > > >>>>>> Hi,
> > > > > > >>>>>>
> > > > > > >>>>>> Thank you all for the great proposal and discussion!
> > > > > > >>>>>> I also prefer to move on to the next step, so +1 for
> opening
> > > the
> > > > > > JIRAs
> > > > > > >>>>> to
> > > > > > >>>>>> start the work.
> > > > > > >>>>>> We can have more detailed discussion there. Btw, we can
> > start
> > > > with
> > > > > > >> JIRAs
> > > > > > >>>>>> which we have agreed on.
> > > > > > >>>>>>
> > > > > > >>>>>> Best,
> > > > > > >>>>>> Hequn
> > > > > > >>>>>>
> > > > > > >>>>>> On Tue, Nov 20, 2018 at 11:38 PM Shaoxuan Wang <
> > > > > wshaox...@gmail.com
> > > > > > >
> > > > > > >>>>> wrote:
> > > > > > >>>>>>
> > > > > > >>>>>>> +1. I agree that we should open the JIRAs to start the
> > work.
> > > We
> > > > > may
> > > > > > >>>>>>> have better ideas on the flavor of the interface when
> > > > > > >> implement/review
> > > > > > >>>>>>> the code.
> > > > > > >>>>>>>
> > > > > > >>>>>>> Regards,
> > > > > > >>>>>>> shaoxuan
> > > > > > >>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>> On 11/20/18, jincheng sun <sunjincheng...@gmail.com>
> > wrote:
> > > > > > >>>>>>>> Hi all,
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Thanks all for the feedback.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> @Piotr About not using abbreviations naming,  +1,I like
> > > > > > >>>>>>>> your proposal!Currently both DataSet and DataStream API
> > are
> > > > > using
> > > > > > >>>>>>>> `aggregate`,
> > > > > > >>>>>>>> BTW,I find other language also not using abbreviations
> > > > > naming,such
> > > > > > >> as
> > > > > > >>>>> R.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Sometimes the interface of the API is really difficult
> to
> > > > > perfect,
> > > > > > >> we
> > > > > > >>>>>>> need
> > > > > > >>>>>>>> to spend a lot of time thinking and feedback from a
> large
> > > > number
> > > > > > of
> > > > > > >>>>>>> users,
> > > > > > >>>>>>>> and constantly improve, but for backward compatibility
> > > issues,
> > > > > we
> > > > > > >>>>> have to
> > > > > > >>>>>>>> adopt the most conservative approach when designing the
> > > API(Of
> > > > > > >>>>> course, I
> > > > > > >>>>>>> am
> > > > > > >>>>>>>> more in favor of developing more rich features, when we
> > > > discuss
> > > > > > >>>>> clearly).
> > > > > > >>>>>>>> Therefore, I propose to divide the function
> implementation
> > > of
> > > > > > >>>>>>>> map/faltMap/agg/flatAgg into basic functions of JIRAs
> and
> > > > JIRAs
> > > > > > that
> > > > > > >>>>>>>> support time attributes and groupKeys. We can develop
> the
> > > > > features
> > > > > > >>>>> which
> > > > > > >>>>>>>> we  have already agreed on the design. And we will
> > continue
> > > to
> > > > > > >> discuss
> > > > > > >>>>>>> the
> > > > > > >>>>>>>> uncertain design.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> In fact, in addition to the design of APIs, there will
> be
> > > > > various
> > > > > > >>>>>>>> performance optimization details, such as: table
> Aggregate
> > > > > > function
> > > > > > >>>>>>>> emitValue will generate multiple calculation results, in
> > > > extreme
> > > > > > >>>>> cases,
> > > > > > >>>>>>>> each record will trigger a large number of retract
> > messages,
> > > > > this
> > > > > > >> will
> > > > > > >>>>>>> have
> > > > > > >>>>>>>> poor performance,so we will also optimize the interface
> > > > design,
> > > > > > such
> > > > > > >>>>> as
> > > > > > >>>>>>>> adding the emitWithRetractValue interface (I have
> updated
> > > the
> > > > > > google
> > > > > > >>>>> doc)
> > > > > > >>>>>>>> to allow the user to optionally perform incremental
> > > > > calculations,
> > > > > > >> thus
> > > > > > >>>>>>>> avoiding a large number of retracts. Details like this
> are
> > > > > > difficult
> > > > > > >>>>> to
> > > > > > >>>>>>>> fully discuss in the mail list, so I recommend creating
> > > > > JIRAs/FLIP
> > > > > > >>>>> first,
> > > > > > >>>>>>>> we develop designs that have been agreed upon and
> continue
> > > to
> > > > > > >> discuss
> > > > > > >>>>>>>> non-deterministic designs!  What do you think? @Fabian &
> > > > Piotr &
> > > > > > >>>>> XiaoWei
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Best,
> > > > > > >>>>>>>> Jincheng
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月19日周一
> > > 上午12:07写道:
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> Hi Fabian & Piotr, thanks for the feedback!
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> I appreciate your concerns, both on timestamp
> attributes
> > as
> > > > > well
> > > > > > as
> > > > > > >>>>> on
> > > > > > >>>>>>>>> implicit group keys. At the same time, I'm also
> concerned
> > > > with
> > > > > > the
> > > > > > >>>>>>>>> proposed
> > > > > > >>>>>>>>> approach of allowing Expression* as parameters,
> > especially
> > > > for
> > > > > > >>>>>>>>> flatMap/flatAgg. So far, we never allowed a scalar
> > > expression
> > > > > to
> > > > > > >>>>> appear
> > > > > > >>>>>>>>> together with table expressions. With the Expression*
> > > > approach,
> > > > > > >> this
> > > > > > >>>>>>> will
> > > > > > >>>>>>>>> happen for the parameters to flatMap/flatAgg. I'm a bit
> > > > > concerned
> > > > > > >> on
> > > > > > >>>>> if
> > > > > > >>>>>>>>> we
> > > > > > >>>>>>>>> fully understand the consequences when we try to extend
> > our
> > > > > > system
> > > > > > >> in
> > > > > > >>>>>>> the
> > > > > > >>>>>>>>> future. I would be extra cautious in doing this. To
> avoid
> > > > > this, I
> > > > > > >>>>> think
> > > > > > >>>>>>>>> an
> > > > > > >>>>>>>>> implicit group key for flatAgg is safer. For flatMap,
> if
> > > > users
> > > > > > want
> > > > > > >>>>> to
> > > > > > >>>>>>>>> keep
> > > > > > >>>>>>>>> the rowtime column, he can use crossApply/join instead.
> > So
> > > we
> > > > > are
> > > > > > >> not
> > > > > > >>>>>>>>> losing any real functionality here.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Also a clarification on the following example:
> > > > > > >>>>>>>>> tab.window(Tumble ... as 'w)
> > > > > > >>>>>>>>>  .groupBy('w, 'k1, 'k2) // 'w should be a group key.
> > > > > > >>>>>>>>>  .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
> > > > > > >>>>>>>>>  .select('k1, 'col1, 'w.rowtime as 'rtime)
> > > > > > >>>>>>>>> If we did not have the select clause in this example,
> we
> > > will
> > > > > > have
> > > > > > >>>>> 'w as
> > > > > > >>>>>>>>> a
> > > > > > >>>>>>>>> regular column in the output. It should not magically
> > > > > disappear.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> The concern is not as strong for Table.map/Table.agg
> > > because
> > > > we
> > > > > > are
> > > > > > >>>>> not
> > > > > > >>>>>>>>> mixing scalar and table expressions. But we also want
> to
> > > be a
> > > > > bit
> > > > > > >>>>>>>>> consistent with these methods. If we used implicit
> group
> > > keys
> > > > > for
> > > > > > >>>>>>>>> Table.flatAgg, we probably should do the same for
> > > Table.agg.
> > > > > Now
> > > > > > we
> > > > > > >>>>> only
> > > > > > >>>>>>>>> have to choose what to do with Table.map. I can see
> good
> > > > > > arguments
> > > > > > >>>>> from
> > > > > > >>>>>>>>> both sides. But starting with a single Expression seems
> > > safer
> > > > > > >> because
> > > > > > >>>>>>>>> that
> > > > > > >>>>>>>>> we can always extend to Expression* in the future.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> While thinking about this problem, it appears that we
> may
> > > > need
> > > > > > more
> > > > > > >>>>> work
> > > > > > >>>>>>>>> in
> > > > > > >>>>>>>>> our handling of watermarks for SQL/Table API. Our
> current
> > > way
> > > > > of
> > > > > > >>>>>>>>> propagating the watermarks from source all the way to
> > sink
> > > > > might
> > > > > > >> not
> > > > > > >>>>> be
> > > > > > >>>>>>>>> optimal. For example, after a tumbling window, the
> > > watermark
> > > > > can
> > > > > > >>>>>>> actually
> > > > > > >>>>>>>>> be advanced to just before the expiring of next
> window. I
> > > > think
> > > > > > >> that
> > > > > > >>>>> in
> > > > > > >>>>>>>>> general, each operator may need to generate new
> > watermarks
> > > > > > instead
> > > > > > >> of
> > > > > > >>>>>>>>> simply propagating them. Once we accept that watermarks
> > may
> > > > > > change
> > > > > > >>>>>>> during
> > > > > > >>>>>>>>> the execution, it appears that the timestamp columns
> may
> > > also
> > > > > > >>>>> change, as
> > > > > > >>>>>>>>> long as we have some way to associate watermark with
> it.
> > My
> > > > > > >>>>> intuition is
> > > > > > >>>>>>>>> that once we have a through solution for the watermark
> > > issue,
> > > > > we
> > > > > > >> may
> > > > > > >>>>> be
> > > > > > >>>>>>>>> able to solve the problem we encountered for Table.map
> > in a
> > > > > > cleaner
> > > > > > >>>>> way.
> > > > > > >>>>>>>>> But this is a complex issue which deserves a discussion
> > on
> > > > its
> > > > > > own.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Regards,
> > > > > > >>>>>>>>> Xiaowei
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> On Fri, Nov 16, 2018 at 12:34 AM Piotr Nowojski <
> > > > > > >>>>>>> pi...@data-artisans.com>
> > > > > > >>>>>>>>> wrote:
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>> Hi,
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Isn’t the problem of multiple expressions limited only
> > to
> > > > > > >> `flat***`
> > > > > > >>>>>>>>>> functions and to be more specific only to having two
> (or
> > > > more)
> > > > > > >>>>>>>>>> different
> > > > > > >>>>>>>>>> table functions passed as an expressions?
> > > > > > `.flatAgg(TableAggA('a),
> > > > > > >>>>>>>>>> scalarFunction1(‘b), scalarFunction2(‘c))` seems to be
> > > well
> > > > > > >> defined
> > > > > > >>>>>>>>>> (duplicate result of every scalar function to every
> > > record.
> > > > Or
> > > > > > am
> > > > > > >> I
> > > > > > >>>>>>>>> missing
> > > > > > >>>>>>>>>> something?
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Another remark, I would be in favour of not using
> > > > > abbreviations
> > > > > > >> and
> > > > > > >>>>>>>>> naming
> > > > > > >>>>>>>>>> `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Piotrek
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>> On 15 Nov 2018, at 14:15, Fabian Hueske <
> > > fhue...@gmail.com
> > > > >
> > > > > > >> wrote:
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Hi Jincheng,
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> I said before, that I think that the append() method
> is
> > > > > better
> > > > > > >> than
> > > > > > >>>>>>>>>>> implicitly forwarding keys, but still, I believe it
> > adds
> > > > > > >>>>> unnecessary
> > > > > > >>>>>>>>>> boiler
> > > > > > >>>>>>>>>>> plate code.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Moreover, I haven't seen a convincing argument why
> > > > > > >> map(Expression*)
> > > > > > >>>>>>>>>>> is
> > > > > > >>>>>>>>>>> worse than map(Expression). In either case we need to
> > do
> > > > all
> > > > > > >> kinds
> > > > > > >>>>>>> of
> > > > > > >>>>>>>>>>> checks to prevent invalid use of functions.
> > > > > > >>>>>>>>>>> If the method is not correctly used, we can emit a
> good
> > > > error
> > > > > > >>>>>>> message
> > > > > > >>>>>>>>> and
> > > > > > >>>>>>>>>>> documenting map(Expression*) will be easier than
> > > > > > >>>>>>>>>> map(append(Expression*)),
> > > > > > >>>>>>>>>>> in my opinion.
> > > > > > >>>>>>>>>>> I think we should not add unnessary syntax unless
> there
> > > is
> > > > a
> > > > > > good
> > > > > > >>>>>>>>> reason
> > > > > > >>>>>>>>>>> and to be honest, I haven't seen this reason yet.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Regarding the groupBy.agg() method, I think it should
> > > > behave
> > > > > > just
> > > > > > >>>>>>>>>>> like
> > > > > > >>>>>>>>>> any
> > > > > > >>>>>>>>>>> other method, i.e., not do any implicit forwarding.
> > > > > > >>>>>>>>>>> Let's take the example of the windowed group by, that
> > you
> > > > > > posted
> > > > > > >>>>>>>>> before.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> tab.window(Tumble ... as 'w)
> > > > > > >>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key.
> > > > > > >>>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
> > > > > > >>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime)
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> What happens if 'w.rowtime is not selected? What is
> the
> > > > data
> > > > > > type
> > > > > > >>>>> of
> > > > > > >>>>>>>>> the
> > > > > > >>>>>>>>>>> field 'w in the resulting Table? Is it a regular
> field
> > at
> > > > all
> > > > > > or
> > > > > > >>>>>>> just
> > > > > > >>>>>>>>>>> a
> > > > > > >>>>>>>>>>> system field that disappears if it is not selected?
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> IMO, the following syntax is shorter, more explicit,
> > and
> > > > > better
> > > > > > >>>>>>>>>>> aligned
> > > > > > >>>>>>>>>>> with the regular window.groupBy.select aggregations
> > that
> > > > are
> > > > > > >>>>>>>>>>> supported
> > > > > > >>>>>>>>>>> today.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> tab.window(Tumble ... as 'w)
> > > > > > >>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key.
> > > > > > >>>>>>>>>>> .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a))
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Best, Fabian
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng
> > sun <
> > > > > > >>>>>>>>>>> sunjincheng...@gmail.com>:
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Hi Fabian/Xiaowei,
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> I am very sorry for my late reply! Glad to see your
> > > reply,
> > > > > and
> > > > > > >>>>>>>>>>>> sounds
> > > > > > >>>>>>>>>>>> pretty good!
> > > > > > >>>>>>>>>>>> I agree that the approach with append() which can
> > > clearly
> > > > > > >> defined
> > > > > > >>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>> result schema is better which Fabian mentioned.
> > > > > > >>>>>>>>>>>> In addition and append() and also contains non-time
> > > > > > attributes,
> > > > > > >>>>>>>>>>>> e.g.:
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> tab('name, 'age, 'address, 'rowtime)
> > > > > > >>>>>>>>>>>> tab.map(append(udf('name), 'address,
> > 'rowtime).as('col1,
> > > > > > 'col2,
> > > > > > >>>>>>>>>>>> 'address, 'rowtime)
> > > > > > >>>>>>>>>>>> .window(Tumble over 5.millis on 'rowtime as 'w)
> > > > > > >>>>>>>>>>>> .groupBy('w, 'address)
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> In this way the append() is very useful, and the
> > > behavior
> > > > is
> > > > > > >> very
> > > > > > >>>>>>>>>> similar
> > > > > > >>>>>>>>>>>> to withForwardedFields() in DataSet.
> > > > > > >>>>>>>>>>>> So +1 to using append() approach for the
> > > map()&flatmap()!
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> But how about the agg() and flatAgg()? In
> agg/flatAgg
> > > > case I
> > > > > > >> agree
> > > > > > >>>>>>>>>>>> Xiaowei's approach that define the keys to be
> implied
> > in
> > > > the
> > > > > > >>>>> result
> > > > > > >>>>>>>>>> table
> > > > > > >>>>>>>>>>>> and appears at the beginning, for example as
> follows:
> > > > > > >>>>>>>>>>>> tab.window(Tumble ... as 'w)
> > > > > > >>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key.
> > > > > > >>>>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
> > > > > > >>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime)
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> What to you think? @Fabian @Xiaowei
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Thanks,
> > > > > > >>>>>>>>>>>> Jincheng
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月9日周五
> > > 下午6:35写道:
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Hi Jincheng,
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Thanks for the summary!
> > > > > > >>>>>>>>>>>>> I like the approach with append() better than the
> > > > implicit
> > > > > > >>>>>>>>>>>>> forwarding
> > > > > > >>>>>>>>>> as
> > > > > > >>>>>>>>>>>> it
> > > > > > >>>>>>>>>>>>> clearly indicates which fields are forwarded.
> > > > > > >>>>>>>>>>>>> However, I don't see much benefit over the
> > > > > > flatMap(Expression*)
> > > > > > >>>>>>>>>> variant,
> > > > > > >>>>>>>>>>>> as
> > > > > > >>>>>>>>>>>>> we would still need to analyze the full expression
> > tree
> > > > to
> > > > > > >> ensure
> > > > > > >>>>>>>>> that
> > > > > > >>>>>>>>>> at
> > > > > > >>>>>>>>>>>>> most (or exactly?) one Scalar / TableFunction is
> > used.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>>> Fabian
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng
> > sun
> > > <
> > > > > > >>>>>>>>>>>>> sunjincheng...@gmail.com>:
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Hi all,
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> We are discussing very detailed content about this
> > > > > proposal.
> > > > > > >> We
> > > > > > >>>>>>>>>>>>>> are
> > > > > > >>>>>>>>>>>>> trying
> > > > > > >>>>>>>>>>>>>> to design the API in many aspects (functionality,
> > > > > > >> compatibility,
> > > > > > >>>>>>>>> ease
> > > > > > >>>>>>>>>>>> of
> > > > > > >>>>>>>>>>>>>> use, etc.). I think this is a very good process.
> > Only
> > > > > such a
> > > > > > >>>>>>>>> detailed
> > > > > > >>>>>>>>>>>>>> discussion, In order to develop PR more clearly
> and
> > > > > smoothly
> > > > > > >> in
> > > > > > >>>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>> later
> > > > > > >>>>>>>>>>>>>> stage. I am very grateful to @Fabian and  @Xiaowei
> > for
> > > > > > >> sharing a
> > > > > > >>>>>>>>>>>>>> lot
> > > > > > >>>>>>>>>> of
> > > > > > >>>>>>>>>>>>>> good ideas.
> > > > > > >>>>>>>>>>>>>> About the definition of method signatures I want
> to
> > > > share
> > > > > my
> > > > > > >>>>>>>>>>>>>> points
> > > > > > >>>>>>>>>>>> here
> > > > > > >>>>>>>>>>>>>> which I am discussing with fabian in google doc
> (not
> > > yet
> > > > > > >>>>>>>>>>>>>> completed),
> > > > > > >>>>>>>>>> as
> > > > > > >>>>>>>>>>>>>> follows:
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Assume we have a table:
> > > > > > >>>>>>>>>>>>>> val tab = util.addTable[(Long, String)]("MyTable",
> > > > 'long,
> > > > > > >>>>>>> 'string,
> > > > > > >>>>>>>>>>>>>> 'proctime.proctime)
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Approach 1:
> > > > > > >>>>>>>>>>>>>> case1: Map follows Source Table
> > > > > > >>>>>>>>>>>>>> val result =
> > > > > > >>>>>>>>>>>>>> tab.map(udf('string)).as('proctime, 'col1,
> 'col2)//
> > > > > proctime
> > > > > > >>>>>>>>> implied
> > > > > > >>>>>>>>>>>> in
> > > > > > >>>>>>>>>>>>>> the output
> > > > > > >>>>>>>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w)
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> case2: FatAgg follows Window (Fabian mentioned
> > above)
> > > > > > >>>>>>>>>>>>>> val result =
> > > > > > >>>>>>>>>>>>>> tab.window(Tumble ... as 'w)
> > > > > > >>>>>>>>>>>>>>    .groupBy('w, 'k1, 'k2) // 'w should be a group
> > key.
> > > > > > >>>>>>>>>>>>>>    .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1,
> > 'col2)
> > > > > > >>>>>>>>>>>>>>    .select('k1, 'col1, 'w.rowtime as 'rtime)
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Approach 2: Similar to Fabian‘s approach, which
> the
> > > > result
> > > > > > >>>>> schema
> > > > > > >>>>>>>>>> would
> > > > > > >>>>>>>>>>>>> be
> > > > > > >>>>>>>>>>>>>> clearly defined, but add a built-in append UDF.
> That
> > > > make
> > > > > > >>>>>>>>>>>>>> map/flatmap/agg/flatAgg interface only accept one
> > > > > > Expression.
> > > > > > >>>>>>>>>>>>>> val result =
> > > > > > >>>>>>>>>>>>>> tab.map(append(udf('string), 'long, 'proctime)) as
> > > > ('col1,
> > > > > > >>>>>>>>>>>>>> 'col2,
> > > > > > >>>>>>>>>>>>>> 'long, 'proctime)
> > > > > > >>>>>>>>>>>>>>  .window(Tumble over 5.millis on 'proctime as 'w)
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Note: Append is a special UDF for built-in that
> can
> > > pass
> > > > > > >> through
> > > > > > >>>>>>>>>>>>>> any
> > > > > > >>>>>>>>>>>>>> column.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> So, May be we can defined the as
> > > table.map(Expression)
> > > > > > >> first,
> > > > > > >>>>>>> If
> > > > > > >>>>>>>>>>>>>> necessary, we can extend to table.map(Expression*)
> > in
> > > > the
> > > > > > >>>>> future
> > > > > > >>>>>>>>>>>>>> ?
> > > > > > >>>>>>>>>> Of
> > > > > > >>>>>>>>>>>>>> course, I also hope that we can do more perfection
> > in
> > > > this
> > > > > > >>>>>>>>>>>>>> proposal
> > > > > > >>>>>>>>>>>>> through
> > > > > > >>>>>>>>>>>>>> discussion.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Thanks,
> > > > > > >>>>>>>>>>>>>> Jincheng
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月7日周三
> > > > > 下午11:45写道:
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> Hi Fabian,
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> I think that the key question you raised is if we
> > > allow
> > > > > > extra
> > > > > > >>>>>>>>>>>>> parameters
> > > > > > >>>>>>>>>>>>>> in
> > > > > > >>>>>>>>>>>>>>> the methods map/flatMap/agg/flatAgg. I can see
> why
> > > > > allowing
> > > > > > >>>>> that
> > > > > > >>>>>>>>> may
> > > > > > >>>>>>>>>>>>>> appear
> > > > > > >>>>>>>>>>>>>>> more convenient in some cases. However, it might
> > also
> > > > > cause
> > > > > > >>>>> some
> > > > > > >>>>>>>>>>>>>> confusions
> > > > > > >>>>>>>>>>>>>>> if we do that. For example, do we allow multiple
> > UDFs
> > > > in
> > > > > > >> these
> > > > > > >>>>>>>>>>>>>> expressions?
> > > > > > >>>>>>>>>>>>>>> If we do, the semantics may be weird to define,
> > e.g.
> > > > what
> > > > > > >> does
> > > > > > >>>>>>>>>>>>>>> table.groupBy('k).flatAgg(TableAggA('a),
> > > TableAggB('b))
> > > > > > mean?
> > > > > > >>>>>>>>>>>>>>> Even
> > > > > > >>>>>>>>>>>>> though
> > > > > > >>>>>>>>>>>>>>> not allowing it may appear less powerful, but it
> > can
> > > > make
> > > > > > >>>>> things
> > > > > > >>>>>>>>> more
> > > > > > >>>>>>>>>>>>>>> intuitive too. In the case of agg/flatAgg, we can
> > > > define
> > > > > > the
> > > > > > >>>>>>> keys
> > > > > > >>>>>>>>> to
> > > > > > >>>>>>>>>>>> be
> > > > > > >>>>>>>>>>>>>>> implied in the result table and appears at the
> > > > beginning.
> > > > > > You
> > > > > > >>>>>>> can
> > > > > > >>>>>>>>>>>> use a
> > > > > > >>>>>>>>>>>>>>> select method if you want to modify this
> behavior.
> > I
> > > > > think
> > > > > > >> that
> > > > > > >>>>>>>>>>>>>> eventually
> > > > > > >>>>>>>>>>>>>>> we will have some API which allows other
> > expressions
> > > as
> > > > > > >>>>>>>>>>>>>>> additional
> > > > > > >>>>>>>>>>>>>>> parameters, but I think it's better to do that
> > after
> > > we
> > > > > > >>>>>>> introduce
> > > > > > >>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>> concept of nested tables. A lot of things we
> > > suggested
> > > > > here
> > > > > > >> can
> > > > > > >>>>>>>>>>>>>>> be
> > > > > > >>>>>>>>>>>>>>> considered as special cases of that. But things
> are
> > > > much
> > > > > > >>>>> simpler
> > > > > > >>>>>>>>>>>>>>> if
> > > > > > >>>>>>>>>>>> we
> > > > > > >>>>>>>>>>>>>>> leave that to later.
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> Regards,
> > > > > > >>>>>>>>>>>>>>> Xiaowei
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <
> > > > > > >>>>> fhue...@gmail.com
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> Hi,
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> * Re emit:
> > > > > > >>>>>>>>>>>>>>>> I think we should start with a well understood
> > > > semantics
> > > > > > of
> > > > > > >>>>>>> full
> > > > > > >>>>>>>>>>>>>>>> replacement. This is how the other agg functions
> > > work.
> > > > > > >>>>>>>>>>>>>>>> As was said before, there are open questions
> > > regarding
> > > > > an
> > > > > > >>>>>>> append
> > > > > > >>>>>>>>>>>> mode
> > > > > > >>>>>>>>>>>>>>>> (checkpointing, whether supporting retractions
> or
> > > not
> > > > > and
> > > > > > if
> > > > > > >>>>>>> yes
> > > > > > >>>>>>>>>>>> how
> > > > > > >>>>>>>>>>>>> to
> > > > > > >>>>>>>>>>>>>>>> declare them, ...).
> > > > > > >>>>>>>>>>>>>>>> Since this seems to be an optimization, I'd
> > postpone
> > > > it.
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> * Re grouping keys:
> > > > > > >>>>>>>>>>>>>>>> I don't think we should automatically add them
> > > because
> > > > > the
> > > > > > >>>>>>>>>>>>>>>> result
> > > > > > >>>>>>>>>>>>>> schema
> > > > > > >>>>>>>>>>>>>>>> would not be intuitive.
> > > > > > >>>>>>>>>>>>>>>> Would they be added at the beginning of the
> tuple
> > or
> > > > at
> > > > > > the
> > > > > > >>>>>>> end?
> > > > > > >>>>>>>>>>>> What
> > > > > > >>>>>>>>>>>>>>>> metadata fields of windows would be added? In
> > which
> > > > > order
> > > > > > >>>>> would
> > > > > > >>>>>>>>>>>> they
> > > > > > >>>>>>>>>>>>> be
> > > > > > >>>>>>>>>>>>>>>> added?
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> However, we could support syntax like this:
> > > > > > >>>>>>>>>>>>>>>> val t: Table = ???
> > > > > > >>>>>>>>>>>>>>>> t
> > > > > > >>>>>>>>>>>>>>>> .window(Tumble ... as 'w)
> > > > > > >>>>>>>>>>>>>>>> .groupBy('a, 'b)
> > > > > > >>>>>>>>>>>>>>>> .flatAgg('b, 'a, myAgg(row('*)), 'w.end as
> 'wend,
> > > > > > 'w.rowtime
> > > > > > >>>>>>> as
> > > > > > >>>>>>>>>>>>>> 'rtime)
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> The result schema would be clearly defined as
> [b,
> > a,
> > > > f1,
> > > > > > f2,
> > > > > > >>>>>>>>>>>>>>>> ...,
> > > > > > >>>>>>>>>>>> fn,
> > > > > > >>>>>>>>>>>>>>> wend,
> > > > > > >>>>>>>>>>>>>>>> rtime]. (f1, f2, ...fn) are the result
> attributes
> > of
> > > > the
> > > > > > >> UDF.
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> * Re Multi-staged evaluation:
> > > > > > >>>>>>>>>>>>>>>> I think this should be an optimization that can
> be
> > > > > applied
> > > > > > >> if
> > > > > > >>>>>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>> UDF
> > > > > > >>>>>>>>>>>>>>>> implements the merge() method.
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> Best, Fabian
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb
> Shaoxuan
> > > > Wang
> > > > > <
> > > > > > >>>>>>>>>>>>>>>> wshaox...@gmail.com
> > > > > > >>>>>>>>>>>>>>>>> :
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>> Hi xiaowei,
> > > > > > >>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>> Yes, I agree with you that the semantics of
> > > > > > >>>>>>>>>>>> TableAggregateFunction
> > > > > > >>>>>>>>>>>>>> emit
> > > > > > >>>>>>>>>>>>>>>> is
> > > > > > >>>>>>>>>>>>>>>>> much more complex than AggregateFunction. The
> > > > > fundamental
> > > > > > >>>>>>>>>>>>> difference
> > > > > > >>>>>>>>>>>>>> is
> > > > > > >>>>>>>>>>>>>>>>> that TableAggregateFunction emits a "table"
> while
> > > > > > >>>>>>>>>>>> AggregateFunction
> > > > > > >>>>>>>>>>>>>>>> outputs
> > > > > > >>>>>>>>>>>>>>>>> (a column of) a "row". In the case of
> > > > AggregateFunction
> > > > > > it
> > > > > > >>>>>>> only
> > > > > > >>>>>>>>>>>> has
> > > > > > >>>>>>>>>>>>>> one
> > > > > > >>>>>>>>>>>>>>>>> mode which is “replacing” (complete update).
> But
> > > for
> > > > > > >>>>>>>>>>>>>>>>> TableAggregateFunction, it could be incremental
> > > (only
> > > > > > emit
> > > > > > >>>>> the
> > > > > > >>>>>>>>>>>> new
> > > > > > >>>>>>>>>>>>>>>> updated
> > > > > > >>>>>>>>>>>>>>>>> results) update or complete update (always emit
> > the
> > > > > > entire
> > > > > > >>>>>>>>>>>>>>>>> table
> > > > > > >>>>>>>>>>>>> when
> > > > > > >>>>>>>>>>>>>>>>> “emit" is triggered).  From the performance
> > > > > perspective,
> > > > > > we
> > > > > > >>>>>>>>>>>>>>>>> might
> > > > > > >>>>>>>>>>>>>> want
> > > > > > >>>>>>>>>>>>>>> to
> > > > > > >>>>>>>>>>>>>>>>> use incremental update. But we need review and
> > > design
> > > > > > this
> > > > > > >>>>>>>>>>>>> carefully,
> > > > > > >>>>>>>>>>>>>>>>> especially taking into account the cases of the
> > > > > failover
> > > > > > >>>>>>>>>>>>>>>>> (instead
> > > > > > >>>>>>>>>>>>> of
> > > > > > >>>>>>>>>>>>>>> just
> > > > > > >>>>>>>>>>>>>>>>> back-up the ACC it may also needs to remember
> the
> > > > emit
> > > > > > >>>>> offset)
> > > > > > >>>>>>>>>>>> and
> > > > > > >>>>>>>>>>>>>>>>> retractions, as the semantics of
> > > > TableAggregateFunction
> > > > > > >> emit
> > > > > > >>>>>>>>>>>>>>>>> are
> > > > > > >>>>>>>>>>>>>>>> different
> > > > > > >>>>>>>>>>>>>>>>> than other UDFs. TableFunction also emits a
> > table,
> > > > but
> > > > > it
> > > > > > >>>>> does
> > > > > > >>>>>>>>>>>> not
> > > > > > >>>>>>>>>>>>>> need
> > > > > > >>>>>>>>>>>>>>>> to
> > > > > > >>>>>>>>>>>>>>>>> worry this due to the nature of stateless.
> > > > > > >>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>> Regards,
> > > > > > >>>>>>>>>>>>>>>>> Shaoxuan
> > > > > > >>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang
> > > > > > >>>>>>>>>>>>>>>>> <xiaow...@gmail.com
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>> Hi Jincheng,
> > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>> Thanks for adding the public interfaces! I
> think
> > > > that
> > > > > > >> it's a
> > > > > > >>>>>>>>>>>> very
> > > > > > >>>>>>>>>>>>>>> good
> > > > > > >>>>>>>>>>>>>>>>>> start. There are a few points that we need to
> > have
> > > > > more
> > > > > > >>>>>>>>>>>>>> discussions.
> > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>> - TableAggregateFunction - this is a very
> > complex
> > > > > beast,
> > > > > > >>>>>>>>>>>>>>> definitely
> > > > > > >>>>>>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>> most complex user defined objects we
> introduced
> > so
> > > > > far.
> > > > > > I
> > > > > > >>>>>>>>>>>>> think
> > > > > > >>>>>>>>>>>>>>>> there
> > > > > > >>>>>>>>>>>>>>>>>> are
> > > > > > >>>>>>>>>>>>>>>>>> quite some interesting questions here. For
> > > example,
> > > > do
> > > > > > we
> > > > > > >>>>>>>>>>>>> allow
> > > > > > >>>>>>>>>>>>>>>>>> multi-staged TableAggregate in this case? What
> > is
> > > > the
> > > > > > >>>>>>>>>>>>> semantics
> > > > > > >>>>>>>>>>>>>> of
> > > > > > >>>>>>>>>>>>>>>>>> emit? Is
> > > > > > >>>>>>>>>>>>>>>>>> it amendments to the previous output, or
> > replacing
> > > > > it? I
> > > > > > >>>>>>>>>>>> think
> > > > > > >>>>>>>>>>>>>>> that
> > > > > > >>>>>>>>>>>>>>>>> this
> > > > > > >>>>>>>>>>>>>>>>>> subject itself is worth a discussion to make
> > sure
> > > we
> > > > > get
> > > > > > >>>>>>> the
> > > > > > >>>>>>>>>>>>>>> details
> > > > > > >>>>>>>>>>>>>>>>>> right.
> > > > > > >>>>>>>>>>>>>>>>>> - GroupedTable.agg - does the group keys
> > > > automatically
> > > > > > >>>>>>>>>>>> appear
> > > > > > >>>>>>>>>>>>> in
> > > > > > >>>>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>> output? how about the case of windowing
> > > aggregation?
> > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>> Regards,
> > > > > > >>>>>>>>>>>>>>>>>> Xiaowei
> > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun <
> > > > > > >>>>>>>>>>>>>>> sunjincheng...@gmail.com>
> > > > > > >>>>>>>>>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> Hi, Xiaowei,
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> Thanks for bring up the discuss of Table API
> > > > > > Enhancement
> > > > > > >>>>>>>>>>>>> Outline
> > > > > > >>>>>>>>>>>>>> !
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> I quickly looked at the overall content,
> these
> > > are
> > > > > good
> > > > > > >>>>>>>>>>>>>> expressions
> > > > > > >>>>>>>>>>>>>>>> of
> > > > > > >>>>>>>>>>>>>>>>>> our
> > > > > > >>>>>>>>>>>>>>>>>>> offline discussions. But from the points of
> my
> > > > view,
> > > > > we
> > > > > > >>>>>>>>>>>> should
> > > > > > >>>>>>>>>>>>>> add
> > > > > > >>>>>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>>> usage of public interfaces that we will
> > introduce
> > > > in
> > > > > > this
> > > > > > >>>>>>>>>>>>>> propose.
> > > > > > >>>>>>>>>>>>>>>>> So, I
> > > > > > >>>>>>>>>>>>>>>>>>> added the following usage description of
> > > interface
> > > > > and
> > > > > > >>>>>>>>>>>>> operators
> > > > > > >>>>>>>>>>>>>>> in
> > > > > > >>>>>>>>>>>>>>>>>>> google doc:
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> 1. Map Operator
> > > > > > >>>>>>>>>>>>>>>>>>> Map operator is a new operator of Table, Map
> > > > operator
> > > > > > >> can
> > > > > > >>>>>>>>>>>>>>> apply a
> > > > > > >>>>>>>>>>>>>>>>>>> scalar function, and can return multi-column.
> > The
> > > > > usage
> > > > > > >> as
> > > > > > >>>>>>>>>>>>>> follows:
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> val res = tab
> > > > > > >>>>>>>>>>>>>>>>>>>  .map(fun: ScalarFunction).as(‘a, ‘b, ‘c)
> > > > > > >>>>>>>>>>>>>>>>>>>  .select(‘a, ‘c)
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> 2. FlatMap Operator
> > > > > > >>>>>>>>>>>>>>>>>>> FaltMap operator is a new operator of Table,
> > > > FlatMap
> > > > > > >>>>>>>>>>>>> operator
> > > > > > >>>>>>>>>>>>>>> can
> > > > > > >>>>>>>>>>>>>>>>>> apply
> > > > > > >>>>>>>>>>>>>>>>>>> a table function, and can return multi-row.
> The
> > > > usage
> > > > > > as
> > > > > > >>>>>>>>>>>>> follows:
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> val res = tab
> > > > > > >>>>>>>>>>>>>>>>>>>   .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c)
> > > > > > >>>>>>>>>>>>>>>>>>>   .select(‘a, ‘c)
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> 3. Agg Operator
> > > > > > >>>>>>>>>>>>>>>>>>> Agg operator is a new operator of
> > > > Table/GroupedTable,
> > > > > > >> Agg
> > > > > > >>>>>>>>>>>>>>>> operator
> > > > > > >>>>>>>>>>>>>>>>>> can
> > > > > > >>>>>>>>>>>>>>>>>>> apply a aggregate function, and can return
> > > > > > multi-column.
> > > > > > >>>>> The
> > > > > > >>>>>>>>>>>>>> usage
> > > > > > >>>>>>>>>>>>>>> as
> > > > > > >>>>>>>>>>>>>>>>>>> follows:
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> val res = tab
> > > > > > >>>>>>>>>>>>>>>>>>>   .groupBy(‘a) // leave groupBy-Clause out to
> > > > define
> > > > > > >>>>>>>>>>>> global
> > > > > > >>>>>>>>>>>>>>>>>> aggregates
> > > > > > >>>>>>>>>>>>>>>>>>>   .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c)
> > > > > > >>>>>>>>>>>>>>>>>>>   .select(‘a, ‘c)
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> 4.  FlatAgg Operator
> > > > > > >>>>>>>>>>>>>>>>>>> FlatAgg operator is a new operator of
> > > > > > >> Table/GroupedTable,
> > > > > > >>>>>>>>>>>>>>> FaltAgg
> > > > > > >>>>>>>>>>>>>>>>>>> operator can apply a table aggregate
> function,
> > > and
> > > > > can
> > > > > > >>>>>>> return
> > > > > > >>>>>>>>>>>>>>>>> multi-row.
> > > > > > >>>>>>>>>>>>>>>>>>> The usage as follows:
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> val res = tab
> > > > > > >>>>>>>>>>>>>>>>>>>    .groupBy(‘a) // leave groupBy-Clause out
> to
> > > > define
> > > > > > >>>>>>>>>>>>> global
> > > > > > >>>>>>>>>>>>>>>> table
> > > > > > >>>>>>>>>>>>>>>>>>> aggregates
> > > > > > >>>>>>>>>>>>>>>>>>>    .flatAgg(fun:
> TableAggregateFunction).as(‘a,
> > > ‘b,
> > > > > ‘c)
> > > > > > >>>>>>>>>>>>>>>>>>>    .select(‘a, ‘c)
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> 5. TableAggregateFunction
> > > > > > >>>>>>>>>>>>>>>>>>>  The behavior of table aggregates is most
> like
> > > > > > >>>>>>>>>>>>>>>> GroupReduceFunction
> > > > > > >>>>>>>>>>>>>>>>>> did,
> > > > > > >>>>>>>>>>>>>>>>>>> which computed for a group of elements, and
> > > > output  a
> > > > > > >> group
> > > > > > >>>>>>>>>>>> of
> > > > > > >>>>>>>>>>>>>>>>> elements.
> > > > > > >>>>>>>>>>>>>>>>>>> The TableAggregateFunction can be applied on
> > > > > > >>>>>>>>>>>>>>> GroupedTable.flatAgg() .
> > > > > > >>>>>>>>>>>>>>>>> The
> > > > > > >>>>>>>>>>>>>>>>>>> interface of TableAggregateFunction has a lot
> > of
> > > > > > content,
> > > > > > >>>>> so
> > > > > > >>>>>>>>>>>> I
> > > > > > >>>>>>>>>>>>>>> don't
> > > > > > >>>>>>>>>>>>>>>>> copy
> > > > > > >>>>>>>>>>>>>>>>>>> it here, Please look at the detail in google
> > doc:
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> I will be very appreciate to anyone for
> > reviewing
> > > > and
> > > > > > >>>>>>>>>>>>> commenting.
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>>>>>>>>> Jincheng
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>> --
> > > > > > >>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> -----------------------------------------------------------------------------------
> > > > > > >>>>>>>
> > > > > > >>>>>>> *Rome was not built in one day*
> > > > > > >>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> -----------------------------------------------------------------------------------
> > > > > > >>>>>>>
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to