Hi everyone,

Thanks for all your feedback! I started a vote for this FLIP [1], please
vote there or ask additional questions here[2].

[1] https://lists.apache.org/thread/nr9wwf98fkw1tk7ycgbcfjjo5g4x8pmz
[2] https://lists.apache.org/thread/m9hj60p3mntyctkbxrksm8l4d0s4q9dw

Best,
Lincoln Lee


Piotr Nowojski <pnowoj...@apache.org> 于2022年9月20日周二 18:21写道:

> Fine by me. Thanks for driving this Lincoln :)
>
> Best, Piotrek
>
> wt., 20 wrz 2022 o 09:06 Lincoln Lee <lincoln.8...@gmail.com> napisał(a):
>
> > Hi all,
> >    I'll start a vote if there are no more objections till this
> > thursday(9.22). Looking forward to your feedback!
> >
> > [1] Flip-260:
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction
> > [2] PoC: https://github.com/lincoln-lil/flink/tree/tf-finish-poc
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Lincoln Lee <lincoln.8...@gmail.com> 于2022年9月19日周一 17:38写道:
> >
> > > Hi Jingsong,
> > >    Thank you for participating this discussion!  For the method name, I
> > > think we should follow the new finish() method in `StreamOperator`,
> the
> > > BoundedOneInput might be removed in the future as discussed [1] before
> > >
> > > [1] https://lists.apache.org/thread/3ozw653ql8jso9w55p4pw8p4909trvkb
> > >
> > > Best,
> > > Lincoln Lee
> > >
> > >
> > > Jingsong Li <jingsongl...@gmail.com> 于2022年9月19日周一 10:13写道:
> > >
> > >> +1 to add `finish()` method to `TableFunction` only.
> > >>
> > >> Can we use `endInput` just like `BoundedOneInput`?
> > >>
> > >> Best,
> > >> Jingsong
> > >>
> > >> On Fri, Sep 16, 2022 at 11:54 PM Lincoln Lee <lincoln.8...@gmail.com>
> > >> wrote:
> > >> >
> > >> > Hi Dawid, Piotr,
> > >> >    Agree with you that add finish() method to `TableFunction` only.
> > >> Other
> > >> > `UserDefinedFunction`s (`ScalarFunction`, `AggregateFunction`,
> > >> > `AggregateTableFunction`) are not necessarily to have the finish
> > >> > method(they can not emit records in legacy close() method).
> > >> >
> > >> > A `TableFunction` is used to correlate with the left table/stream,
> the
> > >> > following example shows a case that user only select columns from
> the
> > >> > correlated 'FeatureTF' (no left table column was selected):
> > >> > ```
> > >> > SELECT feature1, feature2, feature3
> > >> > FROM MyTable t1
> > >> > JOIN LATERAL TABLE(FeatureTF(t1.f0, t1.f1)) AS F(feature1, feature2,
> > >> > feature3) ON TRUE
> > >> > ```
> > >> > the 'FeatureTF' can do some flushing work in legacy close() method
> and
> > >> this
> > >> > doesn't break any sql semantics, so I don't see any reason that we
> can
> > >> > enforce users not do flushing work in new finish() method. I've
> > updated
> > >> the
> > >> > flip doc to limit the change only for `TableFunction`[1].
> > >> >
> > >> > For the more powerful `ProcessFunction`, I'd like to share some
> > >> thoughts:
> > >> > There indeed exists requirements for advanced usage in Table/SQL,
> > even a
> > >> > further UD-Operator, e.g., UD-Join for user controlled join logic
> > which
> > >> can
> > >> > not simply expressed by SQL. This is an interesting topic, expect
> more
> > >> > discussions on this.
> > >> >
> > >> >
> > >> > [1]
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction
> > >> >
> > >> > Best,
> > >> > Lincoln Lee
> > >> >
> > >> >
> > >> > Piotr Nowojski <pnowoj...@apache.org> 于2022年9月15日周四 22:39写道:
> > >> >
> > >> > > Hi Dawid, Lincoln,
> > >> > >
> > >> > > I would tend to agree with Dawid. It seems to me like
> > `TableFunction`
> > >> is
> > >> > > the one that needs to be taken care of. Other types of
> > >> > > `UserDefinedFunction` wouldn't be able to emit anything from the
> > >> `finish()`
> > >> > > even if we added it. And if we added `finish(Collector<T> out)` to
> > >> them, it
> > >> > > would create the same problems (how to pass the output type) that
> > >> prevented
> > >> > > us from adding `finish()` to all functions in the DataStream API.
> > >> > >
> > >> > > However I'm not sure what should be the long term solution for the
> > >> Table
> > >> > > API. For the DataStream API we wanted to provide a new, better and
> > >> more
> > >> > > powerful `ProcessFunction` for all of the unusual use cases, that
> > >> currently
> > >> > > require the use of `StreamOperator` API instead of `DataStream`
> > >> functions.
> > >> > > I don't know what would be an alternative in the Table API.
> > >> > >
> > >> > > Dawid, who do you think we should ping from the Table API/SQL
> teams
> > >> to chip
> > >> > > in?
> > >> > >
> > >> > > Best,
> > >> > > Piotrek
> > >> > >
> > >> > > czw., 15 wrz 2022 o 12:38 Dawid Wysakowicz <
> dwysakow...@apache.org>
> > >> > > napisał(a):
> > >> > >
> > >> > > > Hey Lincoln,
> > >> > > >
> > >> > > > Thanks for opening the discussion.
> > >> > > >
> > >> > > > To be honest I am not convinced if emitting from close there is
> a
> > >> > > > contract that was envisioned and thus should be maintained. As
> far
> > >> as I
> > >> > > > can see it does affect only the TableFunction, because it has
> the
> > >> > > > collect method. None of the other UDFs (ScalarFunction,
> > >> > > > AggregateFunction) have means to emit records from close().
> > >> > > >
> > >> > > > To be honest I am not sure what would be the consequences of
> > >> interplay
> > >> > > > with other operators which expect TableFunction to emit only
> when
> > >> eval
> > >> > > > is called. Not sure if there are such.
> > >> > > >
> > >> > > > If it is a thing that we are certain we want to support, I'd be
> > much
> > >> > > > more comfortable adding finish() to the TableFunction instead.
> > >> Would be
> > >> > > > happy to hear opinions from the Table API folks.
> > >> > > >
> > >> > > > Best,
> > >> > > >
> > >> > > > Dawid
> > >> > > >
> > >> > > > On 14/09/2022 15:55, Lincoln Lee wrote:
> > >> > > > > Thanks @Piort for your valuable inputs!
> > >> > > > >
> > >> > > > > I did a quick read of the previous discussion you mentioned,
> > >> seems my
> > >> > > > flip
> > >> > > > > title doesn't give a clear scope here and make some
> confusions,
> > >> if my
> > >> > > > > understanding is correct, the UDFs in your context is the user
> > >> > > > > implemented `org.apache.flink.api.common.functions.Function`s,
> > >> while
> > >> > > the
> > >> > > > > `UserDefinedFunction` I mentioned in the flip is limited to
> the
> > >> > > > flink-table
> > >> > > > > module which located in `org.apache.flink.table.functions`.
> > >> > > > >
> > >> > > > > Here's an use case we've met recently (which is indeed the
> > >> motivation
> > >> > > to
> > >> > > > > propose this):
> > >> > > > > one of our user implemented a
> > >> > > > > `org.apache.flink.table.functions.TableFunction`, the
> simplified
> > >> > > > > pseudo-code is as below:
> > >> > > > >
> > >> > > > > ```
> > >> > > > > class XFunction extend TableFunction<Out> {
> > >> > > > >
> > >> > > > >    open(FunctionContext context){
> > >> > > > >        initMemQueue();
> > >> > > > >        initPythonProc()
> > >> > > > >    }
> > >> > > > >
> > >> > > > >    eval(In in){
> > >> > > > >        queue.offer(data)
> > >> > > > >        Out out = queue.poll()
> > >> > > > >        if (out != null) {
> > >> > > > >          collect(out)
> > >> > > > >        }
> > >> > > > >    }
> > >> > > > >
> > >> > > > >    close(){
> > >> > > > >        waitForPythonFinish()
> > >> > > > >        List<Out> outputs = drainQueue()
> > >> > > > >        outputs.foreach(out -> collect(out))
> > >> > > > >    }
> > >> > > > > }
> > >> > > > > ```
> > >> > > > > It works well in lower flink versions until they attempt to
> do a
> > >> > > upgrade
> > >> > > > > recently, the 'flush' logic in the legacy close method of
> > >> > > `TableFunction`
> > >> > > > > cannot work properly any more.
> > >> > > > >
> > >> > > > > Before proposing the flip, I also considered the `flush()`
> > >> extension on
> > >> > > > the
> > >> > > > > `org.apache.flink.api.common.functions.Function`, because some
> > sql
> > >> > > > > operators are also related, but currently not included in the
> > >> scope of
> > >> > > > this
> > >> > > > > flip, maybe we can discuss it in another thread.
> > >> > > > >
> > >> > > > > Wish this helps explaining the reason and welcome your
> comments
> > >> here!
> > >> > > > >
> > >> > > > > Best,
> > >> > > > > Lincoln Lee
> > >> > > > >
> > >> > > > >
> > >> > > > > Piotr Nowojski <pnowoj...@apache.org> 于2022年9月14日周三 16:56写道:
> > >> > > > >
> > >> > > > >> Hi Lincoln,
> > >> > > > >>
> > >> > > > >> Thanks for the proposal. Have you seen the old discussion
> about
> > >> adding
> > >> > > > this
> > >> > > > >> `finish()` method? [1] We didn't add it to UDFs, as we didn't
> > >> see a
> > >> > > > >> motivation (maybe we have missed something), and at the same
> > >> time it
> > >> > > > wasn't
> > >> > > > >> that easy. Plain `finish()` wouldn't be enough. Users would
> > need
> > >> a way
> > >> > > > to
> > >> > > > >> output records from the `finish()` call, so it would have to
> be
> > >> typed
> > >> > > > with
> > >> > > > >> the user record (`finish(Collector<T> output)`). On the other
> > >> hand, we
> > >> > > > >> couldn't find an example where a user would actually need the
> > >> > > `finish()`
> > >> > > > >> call in an UDF, as it seemed to us it makes only sense for
> > >> > > > >> operators/functions that are buffering records. Note back
> then,
> > >> during
> > >> > > > the
> > >> > > > >> discussion, we were referring to this method as `flush()` or
> > >> > > `drain()`.
> > >> > > > >>
> > >> > > > >> Can you shed some more light and provide more details on the
> > >> exact
> > >> > > > >> motivating example behind this proposal?
> > >> > > > >>
> > >> > > > >> Best,
> > >> > > > >> Piotrek
> > >> > > > >>
> > >> > > > >> [1]
> > >> https://lists.apache.org/thread/gmr9r3n3ktojt4bhoxz4t8qho6h7d1rp
> > >> > > > >>
> > >> > > > >> śr., 14 wrz 2022 o 08:22 Lincoln Lee <lincoln.8...@gmail.com
> >
> > >> > > > napisał(a):
> > >> > > > >>
> > >> > > > >>> Hello everyone,
> > >> > > > >>>
> > >> > > > >>>    I’d like to open a discussion on FLIP-260[1]: expose
> finish
> > >> method
> > >> > > > for
> > >> > > > >>> UserDefinedFunction, this makes a chance for users who rely
> on
> > >> finish
> > >> > > > >> logic
> > >> > > > >>> in the legacy close() method (< 1.14) to migrate to the new
> > >> finish()
> > >> > > > >>> method.
> > >> > > > >>>
> > >> > > > >>>    The task lifecycle was changed in FLINK-22972[2]: a new
> > >> finish()
> > >> > > > phase
> > >> > > > >>> was introduced (extracted the ‘finish’ part out of the
> > ‘close’)
> > >> and
> > >> > > > >> removed
> > >> > > > >>> the dispose() method. This change was also done in table
> > module
> > >> > > (e.g.,
> > >> > > > >>> `AbstractMapBundleOperator` for mini-batch operation ) but
> not
> > >> > > covered
> > >> > > > >> the
> > >> > > > >>> UserDefinedFunction which only exposes open() and close()
> api
> > >> for
> > >> > > > custom
> > >> > > > >>> usage, those customers who rely on the legacy close() api
> may
> > >> > > encounter
> > >> > > > >>> wrong result or suffer runtime errors after upgrading to the
> > new
> > >> > > > version.
> > >> > > > >>> Strictly speaking, it is a bug caused by the breaking
> change,
> > >> but due
> > >> > > > to
> > >> > > > >>> the public api change, we propose this flip.
> > >> > > > >>>
> > >> > > > >>>    Looking forward to your comments or feedback.
> > >> > > > >>>
> > >> > > > >>> [1]
> > >> > > > >>>
> > >> > > > >>>
> > >> > > > >>
> > >> > > >
> > >> > >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+UserDefinedFunction
> > >> > > > >>> [2] https://issues.apache.org/jira/browse/FLINK-22972
> > >> > > > >>>
> > >> > > > >>> Best,
> > >> > > > >>> Lincoln Lee
> > >> > > > >>>
> > >> > > >
> > >> > >
> > >>
> > >
> >
>

Reply via email to