Hi Dawid, Lincoln, I would tend to agree with Dawid. It seems to me like `TableFunction` is the one that needs to be taken care of. Other types of `UserDefinedFunction` wouldn't be able to emit anything from the `finish()` even if we added it. And if we added `finish(Collector<T> out)` to them, it would create the same problems (how to pass the output type) that prevented us from adding `finish()` to all functions in the DataStream API.
However I'm not sure what should be the long term solution for the Table API. For the DataStream API we wanted to provide a new, better and more powerful `ProcessFunction` for all of the unusual use cases, that currently require the use of `StreamOperator` API instead of `DataStream` functions. I don't know what would be an alternative in the Table API. Dawid, who do you think we should ping from the Table API/SQL teams to chip in? Best, Piotrek czw., 15 wrz 2022 o 12:38 Dawid Wysakowicz <dwysakow...@apache.org> napisał(a): > Hey Lincoln, > > Thanks for opening the discussion. > > To be honest I am not convinced if emitting from close there is a > contract that was envisioned and thus should be maintained. As far as I > can see it does affect only the TableFunction, because it has the > collect method. None of the other UDFs (ScalarFunction, > AggregateFunction) have means to emit records from close(). > > To be honest I am not sure what would be the consequences of interplay > with other operators which expect TableFunction to emit only when eval > is called. Not sure if there are such. > > If it is a thing that we are certain we want to support, I'd be much > more comfortable adding finish() to the TableFunction instead. Would be > happy to hear opinions from the Table API folks. > > Best, > > Dawid > > On 14/09/2022 15:55, Lincoln Lee wrote: > > Thanks @Piort for your valuable inputs! > > > > I did a quick read of the previous discussion you mentioned, seems my > flip > > title doesn't give a clear scope here and make some confusions, if my > > understanding is correct, the UDFs in your context is the user > > implemented `org.apache.flink.api.common.functions.Function`s, while the > > `UserDefinedFunction` I mentioned in the flip is limited to the > flink-table > > module which located in `org.apache.flink.table.functions`. > > > > Here's an use case we've met recently (which is indeed the motivation to > > propose this): > > one of our user implemented a > > `org.apache.flink.table.functions.TableFunction`, the simplified > > pseudo-code is as below: > > > > ``` > > class XFunction extend TableFunction<Out> { > > > > open(FunctionContext context){ > > initMemQueue(); > > initPythonProc() > > } > > > > eval(In in){ > > queue.offer(data) > > Out out = queue.poll() > > if (out != null) { > > collect(out) > > } > > } > > > > close(){ > > waitForPythonFinish() > > List<Out> outputs = drainQueue() > > outputs.foreach(out -> collect(out)) > > } > > } > > ``` > > It works well in lower flink versions until they attempt to do a upgrade > > recently, the 'flush' logic in the legacy close method of `TableFunction` > > cannot work properly any more. > > > > Before proposing the flip, I also considered the `flush()` extension on > the > > `org.apache.flink.api.common.functions.Function`, because some sql > > operators are also related, but currently not included in the scope of > this > > flip, maybe we can discuss it in another thread. > > > > Wish this helps explaining the reason and welcome your comments here! > > > > Best, > > Lincoln Lee > > > > > > Piotr Nowojski <pnowoj...@apache.org> 于2022年9月14日周三 16:56写道: > > > >> Hi Lincoln, > >> > >> Thanks for the proposal. Have you seen the old discussion about adding > this > >> `finish()` method? [1] We didn't add it to UDFs, as we didn't see a > >> motivation (maybe we have missed something), and at the same time it > wasn't > >> that easy. Plain `finish()` wouldn't be enough. Users would need a way > to > >> output records from the `finish()` call, so it would have to be typed > with > >> the user record (`finish(Collector<T> output)`). On the other hand, we > >> couldn't find an example where a user would actually need the `finish()` > >> call in an UDF, as it seemed to us it makes only sense for > >> operators/functions that are buffering records. Note back then, during > the > >> discussion, we were referring to this method as `flush()` or `drain()`. > >> > >> Can you shed some more light and provide more details on the exact > >> motivating example behind this proposal? > >> > >> Best, > >> Piotrek > >> > >> [1] https://lists.apache.org/thread/gmr9r3n3ktojt4bhoxz4t8qho6h7d1rp > >> > >> śr., 14 wrz 2022 o 08:22 Lincoln Lee <lincoln.8...@gmail.com> > napisał(a): > >> > >>> Hello everyone, > >>> > >>> I’d like to open a discussion on FLIP-260[1]: expose finish method > for > >>> UserDefinedFunction, this makes a chance for users who rely on finish > >> logic > >>> in the legacy close() method (< 1.14) to migrate to the new finish() > >>> method. > >>> > >>> The task lifecycle was changed in FLINK-22972[2]: a new finish() > phase > >>> was introduced (extracted the ‘finish’ part out of the ‘close’) and > >> removed > >>> the dispose() method. This change was also done in table module (e.g., > >>> `AbstractMapBundleOperator` for mini-batch operation ) but not covered > >> the > >>> UserDefinedFunction which only exposes open() and close() api for > custom > >>> usage, those customers who rely on the legacy close() api may encounter > >>> wrong result or suffer runtime errors after upgrading to the new > version. > >>> Strictly speaking, it is a bug caused by the breaking change, but due > to > >>> the public api change, we propose this flip. > >>> > >>> Looking forward to your comments or feedback. > >>> > >>> [1] > >>> > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+UserDefinedFunction > >>> [2] https://issues.apache.org/jira/browse/FLINK-22972 > >>> > >>> Best, > >>> Lincoln Lee > >>> >