Hi all, I'll start a vote if there are no more objections till this thursday(9.22). Looking forward to your feedback!
[1] Flip-260: https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction [2] PoC: https://github.com/lincoln-lil/flink/tree/tf-finish-poc Best, Lincoln Lee Lincoln Lee <lincoln.8...@gmail.com> 于2022年9月19日周一 17:38写道: > Hi Jingsong, > Thank you for participating this discussion! For the method name, I > think we should follow the new finish() method in `StreamOperator`, the > BoundedOneInput might be removed in the future as discussed [1] before > > [1] https://lists.apache.org/thread/3ozw653ql8jso9w55p4pw8p4909trvkb > > Best, > Lincoln Lee > > > Jingsong Li <jingsongl...@gmail.com> 于2022年9月19日周一 10:13写道: > >> +1 to add `finish()` method to `TableFunction` only. >> >> Can we use `endInput` just like `BoundedOneInput`? >> >> Best, >> Jingsong >> >> On Fri, Sep 16, 2022 at 11:54 PM Lincoln Lee <lincoln.8...@gmail.com> >> wrote: >> > >> > Hi Dawid, Piotr, >> > Agree with you that add finish() method to `TableFunction` only. >> Other >> > `UserDefinedFunction`s (`ScalarFunction`, `AggregateFunction`, >> > `AggregateTableFunction`) are not necessarily to have the finish >> > method(they can not emit records in legacy close() method). >> > >> > A `TableFunction` is used to correlate with the left table/stream, the >> > following example shows a case that user only select columns from the >> > correlated 'FeatureTF' (no left table column was selected): >> > ``` >> > SELECT feature1, feature2, feature3 >> > FROM MyTable t1 >> > JOIN LATERAL TABLE(FeatureTF(t1.f0, t1.f1)) AS F(feature1, feature2, >> > feature3) ON TRUE >> > ``` >> > the 'FeatureTF' can do some flushing work in legacy close() method and >> this >> > doesn't break any sql semantics, so I don't see any reason that we can >> > enforce users not do flushing work in new finish() method. I've updated >> the >> > flip doc to limit the change only for `TableFunction`[1]. >> > >> > For the more powerful `ProcessFunction`, I'd like to share some >> thoughts: >> > There indeed exists requirements for advanced usage in Table/SQL, even a >> > further UD-Operator, e.g., UD-Join for user controlled join logic which >> can >> > not simply expressed by SQL. This is an interesting topic, expect more >> > discussions on this. >> > >> > >> > [1] >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction >> > >> > Best, >> > Lincoln Lee >> > >> > >> > Piotr Nowojski <pnowoj...@apache.org> 于2022年9月15日周四 22:39写道: >> > >> > > Hi Dawid, Lincoln, >> > > >> > > I would tend to agree with Dawid. It seems to me like `TableFunction` >> is >> > > the one that needs to be taken care of. Other types of >> > > `UserDefinedFunction` wouldn't be able to emit anything from the >> `finish()` >> > > even if we added it. And if we added `finish(Collector<T> out)` to >> them, it >> > > would create the same problems (how to pass the output type) that >> prevented >> > > us from adding `finish()` to all functions in the DataStream API. >> > > >> > > However I'm not sure what should be the long term solution for the >> Table >> > > API. For the DataStream API we wanted to provide a new, better and >> more >> > > powerful `ProcessFunction` for all of the unusual use cases, that >> currently >> > > require the use of `StreamOperator` API instead of `DataStream` >> functions. >> > > I don't know what would be an alternative in the Table API. >> > > >> > > Dawid, who do you think we should ping from the Table API/SQL teams >> to chip >> > > in? >> > > >> > > Best, >> > > Piotrek >> > > >> > > czw., 15 wrz 2022 o 12:38 Dawid Wysakowicz <dwysakow...@apache.org> >> > > napisał(a): >> > > >> > > > Hey Lincoln, >> > > > >> > > > Thanks for opening the discussion. >> > > > >> > > > To be honest I am not convinced if emitting from close there is a >> > > > contract that was envisioned and thus should be maintained. As far >> as I >> > > > can see it does affect only the TableFunction, because it has the >> > > > collect method. None of the other UDFs (ScalarFunction, >> > > > AggregateFunction) have means to emit records from close(). >> > > > >> > > > To be honest I am not sure what would be the consequences of >> interplay >> > > > with other operators which expect TableFunction to emit only when >> eval >> > > > is called. Not sure if there are such. >> > > > >> > > > If it is a thing that we are certain we want to support, I'd be much >> > > > more comfortable adding finish() to the TableFunction instead. >> Would be >> > > > happy to hear opinions from the Table API folks. >> > > > >> > > > Best, >> > > > >> > > > Dawid >> > > > >> > > > On 14/09/2022 15:55, Lincoln Lee wrote: >> > > > > Thanks @Piort for your valuable inputs! >> > > > > >> > > > > I did a quick read of the previous discussion you mentioned, >> seems my >> > > > flip >> > > > > title doesn't give a clear scope here and make some confusions, >> if my >> > > > > understanding is correct, the UDFs in your context is the user >> > > > > implemented `org.apache.flink.api.common.functions.Function`s, >> while >> > > the >> > > > > `UserDefinedFunction` I mentioned in the flip is limited to the >> > > > flink-table >> > > > > module which located in `org.apache.flink.table.functions`. >> > > > > >> > > > > Here's an use case we've met recently (which is indeed the >> motivation >> > > to >> > > > > propose this): >> > > > > one of our user implemented a >> > > > > `org.apache.flink.table.functions.TableFunction`, the simplified >> > > > > pseudo-code is as below: >> > > > > >> > > > > ``` >> > > > > class XFunction extend TableFunction<Out> { >> > > > > >> > > > > open(FunctionContext context){ >> > > > > initMemQueue(); >> > > > > initPythonProc() >> > > > > } >> > > > > >> > > > > eval(In in){ >> > > > > queue.offer(data) >> > > > > Out out = queue.poll() >> > > > > if (out != null) { >> > > > > collect(out) >> > > > > } >> > > > > } >> > > > > >> > > > > close(){ >> > > > > waitForPythonFinish() >> > > > > List<Out> outputs = drainQueue() >> > > > > outputs.foreach(out -> collect(out)) >> > > > > } >> > > > > } >> > > > > ``` >> > > > > It works well in lower flink versions until they attempt to do a >> > > upgrade >> > > > > recently, the 'flush' logic in the legacy close method of >> > > `TableFunction` >> > > > > cannot work properly any more. >> > > > > >> > > > > Before proposing the flip, I also considered the `flush()` >> extension on >> > > > the >> > > > > `org.apache.flink.api.common.functions.Function`, because some sql >> > > > > operators are also related, but currently not included in the >> scope of >> > > > this >> > > > > flip, maybe we can discuss it in another thread. >> > > > > >> > > > > Wish this helps explaining the reason and welcome your comments >> here! >> > > > > >> > > > > Best, >> > > > > Lincoln Lee >> > > > > >> > > > > >> > > > > Piotr Nowojski <pnowoj...@apache.org> 于2022年9月14日周三 16:56写道: >> > > > > >> > > > >> Hi Lincoln, >> > > > >> >> > > > >> Thanks for the proposal. Have you seen the old discussion about >> adding >> > > > this >> > > > >> `finish()` method? [1] We didn't add it to UDFs, as we didn't >> see a >> > > > >> motivation (maybe we have missed something), and at the same >> time it >> > > > wasn't >> > > > >> that easy. Plain `finish()` wouldn't be enough. Users would need >> a way >> > > > to >> > > > >> output records from the `finish()` call, so it would have to be >> typed >> > > > with >> > > > >> the user record (`finish(Collector<T> output)`). On the other >> hand, we >> > > > >> couldn't find an example where a user would actually need the >> > > `finish()` >> > > > >> call in an UDF, as it seemed to us it makes only sense for >> > > > >> operators/functions that are buffering records. Note back then, >> during >> > > > the >> > > > >> discussion, we were referring to this method as `flush()` or >> > > `drain()`. >> > > > >> >> > > > >> Can you shed some more light and provide more details on the >> exact >> > > > >> motivating example behind this proposal? >> > > > >> >> > > > >> Best, >> > > > >> Piotrek >> > > > >> >> > > > >> [1] >> https://lists.apache.org/thread/gmr9r3n3ktojt4bhoxz4t8qho6h7d1rp >> > > > >> >> > > > >> śr., 14 wrz 2022 o 08:22 Lincoln Lee <lincoln.8...@gmail.com> >> > > > napisał(a): >> > > > >> >> > > > >>> Hello everyone, >> > > > >>> >> > > > >>> I’d like to open a discussion on FLIP-260[1]: expose finish >> method >> > > > for >> > > > >>> UserDefinedFunction, this makes a chance for users who rely on >> finish >> > > > >> logic >> > > > >>> in the legacy close() method (< 1.14) to migrate to the new >> finish() >> > > > >>> method. >> > > > >>> >> > > > >>> The task lifecycle was changed in FLINK-22972[2]: a new >> finish() >> > > > phase >> > > > >>> was introduced (extracted the ‘finish’ part out of the ‘close’) >> and >> > > > >> removed >> > > > >>> the dispose() method. This change was also done in table module >> > > (e.g., >> > > > >>> `AbstractMapBundleOperator` for mini-batch operation ) but not >> > > covered >> > > > >> the >> > > > >>> UserDefinedFunction which only exposes open() and close() api >> for >> > > > custom >> > > > >>> usage, those customers who rely on the legacy close() api may >> > > encounter >> > > > >>> wrong result or suffer runtime errors after upgrading to the new >> > > > version. >> > > > >>> Strictly speaking, it is a bug caused by the breaking change, >> but due >> > > > to >> > > > >>> the public api change, we propose this flip. >> > > > >>> >> > > > >>> Looking forward to your comments or feedback. >> > > > >>> >> > > > >>> [1] >> > > > >>> >> > > > >>> >> > > > >> >> > > > >> > > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+UserDefinedFunction >> > > > >>> [2] https://issues.apache.org/jira/browse/FLINK-22972 >> > > > >>> >> > > > >>> Best, >> > > > >>> Lincoln Lee >> > > > >>> >> > > > >> > > >> >