Hi Dawid, Lincoln,

I would tend to agree with Dawid. It seems to me like `TableFunction` is
the one that needs to be taken care of. Other types of
`UserDefinedFunction` wouldn't be able to emit anything from the `finish()`
even if we added it. And if we added `finish(Collector<T> out)` to them, it
would create the same problems (how to pass the output type) that prevented
us from adding `finish()` to all functions in the DataStream API.

However I'm not sure what should be the long term solution for the Table
API. For the DataStream API we wanted to provide a new, better and more
powerful `ProcessFunction` for all of the unusual use cases, that currently
require the use of `StreamOperator` API instead of `DataStream` functions.
I don't know what would be an alternative in the Table API.

Dawid, who do you think we should ping from the Table API/SQL teams to chip
in?

Best,
Piotrek

czw., 15 wrz 2022 o 12:38 Dawid Wysakowicz <dwysakow...@apache.org>
napisał(a):

> Hey Lincoln,
>
> Thanks for opening the discussion.
>
> To be honest I am not convinced if emitting from close there is a
> contract that was envisioned and thus should be maintained. As far as I
> can see it does affect only the TableFunction, because it has the
> collect method. None of the other UDFs (ScalarFunction,
> AggregateFunction) have means to emit records from close().
>
> To be honest I am not sure what would be the consequences of interplay
> with other operators which expect TableFunction to emit only when eval
> is called. Not sure if there are such.
>
> If it is a thing that we are certain we want to support, I'd be much
> more comfortable adding finish() to the TableFunction instead. Would be
> happy to hear opinions from the Table API folks.
>
> Best,
>
> Dawid
>
> On 14/09/2022 15:55, Lincoln Lee wrote:
> > Thanks @Piort for your valuable inputs!
> >
> > I did a quick read of the previous discussion you mentioned, seems my
> flip
> > title doesn't give a clear scope here and make some confusions, if my
> > understanding is correct, the UDFs in your context is the user
> > implemented `org.apache.flink.api.common.functions.Function`s, while the
> > `UserDefinedFunction` I mentioned in the flip is limited to the
> flink-table
> > module which located in `org.apache.flink.table.functions`.
> >
> > Here's an use case we've met recently (which is indeed the motivation to
> > propose this):
> > one of our user implemented a
> > `org.apache.flink.table.functions.TableFunction`, the simplified
> > pseudo-code is as below:
> >
> > ```
> > class XFunction extend TableFunction<Out> {
> >
> >    open(FunctionContext context){
> >        initMemQueue();
> >        initPythonProc()
> >    }
> >
> >    eval(In in){
> >        queue.offer(data)
> >        Out out = queue.poll()
> >        if (out != null) {
> >          collect(out)
> >        }
> >    }
> >
> >    close(){
> >        waitForPythonFinish()
> >        List<Out> outputs = drainQueue()
> >        outputs.foreach(out -> collect(out))
> >    }
> > }
> > ```
> > It works well in lower flink versions until they attempt to do a upgrade
> > recently, the 'flush' logic in the legacy close method of `TableFunction`
> > cannot work properly any more.
> >
> > Before proposing the flip, I also considered the `flush()` extension on
> the
> > `org.apache.flink.api.common.functions.Function`, because some sql
> > operators are also related, but currently not included in the scope of
> this
> > flip, maybe we can discuss it in another thread.
> >
> > Wish this helps explaining the reason and welcome your comments here!
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Piotr Nowojski <pnowoj...@apache.org> 于2022年9月14日周三 16:56写道:
> >
> >> Hi Lincoln,
> >>
> >> Thanks for the proposal. Have you seen the old discussion about adding
> this
> >> `finish()` method? [1] We didn't add it to UDFs, as we didn't see a
> >> motivation (maybe we have missed something), and at the same time it
> wasn't
> >> that easy. Plain `finish()` wouldn't be enough. Users would need a way
> to
> >> output records from the `finish()` call, so it would have to be typed
> with
> >> the user record (`finish(Collector<T> output)`). On the other hand, we
> >> couldn't find an example where a user would actually need the `finish()`
> >> call in an UDF, as it seemed to us it makes only sense for
> >> operators/functions that are buffering records. Note back then, during
> the
> >> discussion, we were referring to this method as `flush()` or `drain()`.
> >>
> >> Can you shed some more light and provide more details on the exact
> >> motivating example behind this proposal?
> >>
> >> Best,
> >> Piotrek
> >>
> >> [1] https://lists.apache.org/thread/gmr9r3n3ktojt4bhoxz4t8qho6h7d1rp
> >>
> >> śr., 14 wrz 2022 o 08:22 Lincoln Lee <lincoln.8...@gmail.com>
> napisał(a):
> >>
> >>> Hello everyone,
> >>>
> >>>    I’d like to open a discussion on FLIP-260[1]: expose finish method
> for
> >>> UserDefinedFunction, this makes a chance for users who rely on finish
> >> logic
> >>> in the legacy close() method (< 1.14) to migrate to the new finish()
> >>> method.
> >>>
> >>>    The task lifecycle was changed in FLINK-22972[2]: a new finish()
> phase
> >>> was introduced (extracted the ‘finish’ part out of the ‘close’) and
> >> removed
> >>> the dispose() method. This change was also done in table module (e.g.,
> >>> `AbstractMapBundleOperator` for mini-batch operation ) but not covered
> >> the
> >>> UserDefinedFunction which only exposes open() and close() api for
> custom
> >>> usage, those customers who rely on the legacy close() api may encounter
> >>> wrong result or suffer runtime errors after upgrading to the new
> version.
> >>> Strictly speaking, it is a bug caused by the breaking change, but due
> to
> >>> the public api change, we propose this flip.
> >>>
> >>>    Looking forward to your comments or feedback.
> >>>
> >>> [1]
> >>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+UserDefinedFunction
> >>> [2] https://issues.apache.org/jira/browse/FLINK-22972
> >>>
> >>> Best,
> >>> Lincoln Lee
> >>>
>

Reply via email to