Hi everyone, Thanks for all your feedback! I started a vote for this FLIP [1], please vote there or ask additional questions here[2].
[1] https://lists.apache.org/thread/nr9wwf98fkw1tk7ycgbcfjjo5g4x8pmz [2] https://lists.apache.org/thread/m9hj60p3mntyctkbxrksm8l4d0s4q9dw Best, Lincoln Lee Piotr Nowojski <pnowoj...@apache.org> 于2022年9月20日周二 18:21写道: > Fine by me. Thanks for driving this Lincoln :) > > Best, Piotrek > > wt., 20 wrz 2022 o 09:06 Lincoln Lee <lincoln.8...@gmail.com> napisał(a): > > > Hi all, > > I'll start a vote if there are no more objections till this > > thursday(9.22). Looking forward to your feedback! > > > > [1] Flip-260: > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction > > [2] PoC: https://github.com/lincoln-lil/flink/tree/tf-finish-poc > > > > Best, > > Lincoln Lee > > > > > > Lincoln Lee <lincoln.8...@gmail.com> 于2022年9月19日周一 17:38写道: > > > > > Hi Jingsong, > > > Thank you for participating this discussion! For the method name, I > > > think we should follow the new finish() method in `StreamOperator`, > the > > > BoundedOneInput might be removed in the future as discussed [1] before > > > > > > [1] https://lists.apache.org/thread/3ozw653ql8jso9w55p4pw8p4909trvkb > > > > > > Best, > > > Lincoln Lee > > > > > > > > > Jingsong Li <jingsongl...@gmail.com> 于2022年9月19日周一 10:13写道: > > > > > >> +1 to add `finish()` method to `TableFunction` only. > > >> > > >> Can we use `endInput` just like `BoundedOneInput`? > > >> > > >> Best, > > >> Jingsong > > >> > > >> On Fri, Sep 16, 2022 at 11:54 PM Lincoln Lee <lincoln.8...@gmail.com> > > >> wrote: > > >> > > > >> > Hi Dawid, Piotr, > > >> > Agree with you that add finish() method to `TableFunction` only. > > >> Other > > >> > `UserDefinedFunction`s (`ScalarFunction`, `AggregateFunction`, > > >> > `AggregateTableFunction`) are not necessarily to have the finish > > >> > method(they can not emit records in legacy close() method). > > >> > > > >> > A `TableFunction` is used to correlate with the left table/stream, > the > > >> > following example shows a case that user only select columns from > the > > >> > correlated 'FeatureTF' (no left table column was selected): > > >> > ``` > > >> > SELECT feature1, feature2, feature3 > > >> > FROM MyTable t1 > > >> > JOIN LATERAL TABLE(FeatureTF(t1.f0, t1.f1)) AS F(feature1, feature2, > > >> > feature3) ON TRUE > > >> > ``` > > >> > the 'FeatureTF' can do some flushing work in legacy close() method > and > > >> this > > >> > doesn't break any sql semantics, so I don't see any reason that we > can > > >> > enforce users not do flushing work in new finish() method. I've > > updated > > >> the > > >> > flip doc to limit the change only for `TableFunction`[1]. > > >> > > > >> > For the more powerful `ProcessFunction`, I'd like to share some > > >> thoughts: > > >> > There indeed exists requirements for advanced usage in Table/SQL, > > even a > > >> > further UD-Operator, e.g., UD-Join for user controlled join logic > > which > > >> can > > >> > not simply expressed by SQL. This is an interesting topic, expect > more > > >> > discussions on this. > > >> > > > >> > > > >> > [1] > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction > > >> > > > >> > Best, > > >> > Lincoln Lee > > >> > > > >> > > > >> > Piotr Nowojski <pnowoj...@apache.org> 于2022年9月15日周四 22:39写道: > > >> > > > >> > > Hi Dawid, Lincoln, > > >> > > > > >> > > I would tend to agree with Dawid. It seems to me like > > `TableFunction` > > >> is > > >> > > the one that needs to be taken care of. Other types of > > >> > > `UserDefinedFunction` wouldn't be able to emit anything from the > > >> `finish()` > > >> > > even if we added it. And if we added `finish(Collector<T> out)` to > > >> them, it > > >> > > would create the same problems (how to pass the output type) that > > >> prevented > > >> > > us from adding `finish()` to all functions in the DataStream API. > > >> > > > > >> > > However I'm not sure what should be the long term solution for the > > >> Table > > >> > > API. For the DataStream API we wanted to provide a new, better and > > >> more > > >> > > powerful `ProcessFunction` for all of the unusual use cases, that > > >> currently > > >> > > require the use of `StreamOperator` API instead of `DataStream` > > >> functions. > > >> > > I don't know what would be an alternative in the Table API. > > >> > > > > >> > > Dawid, who do you think we should ping from the Table API/SQL > teams > > >> to chip > > >> > > in? > > >> > > > > >> > > Best, > > >> > > Piotrek > > >> > > > > >> > > czw., 15 wrz 2022 o 12:38 Dawid Wysakowicz < > dwysakow...@apache.org> > > >> > > napisał(a): > > >> > > > > >> > > > Hey Lincoln, > > >> > > > > > >> > > > Thanks for opening the discussion. > > >> > > > > > >> > > > To be honest I am not convinced if emitting from close there is > a > > >> > > > contract that was envisioned and thus should be maintained. As > far > > >> as I > > >> > > > can see it does affect only the TableFunction, because it has > the > > >> > > > collect method. None of the other UDFs (ScalarFunction, > > >> > > > AggregateFunction) have means to emit records from close(). > > >> > > > > > >> > > > To be honest I am not sure what would be the consequences of > > >> interplay > > >> > > > with other operators which expect TableFunction to emit only > when > > >> eval > > >> > > > is called. Not sure if there are such. > > >> > > > > > >> > > > If it is a thing that we are certain we want to support, I'd be > > much > > >> > > > more comfortable adding finish() to the TableFunction instead. > > >> Would be > > >> > > > happy to hear opinions from the Table API folks. > > >> > > > > > >> > > > Best, > > >> > > > > > >> > > > Dawid > > >> > > > > > >> > > > On 14/09/2022 15:55, Lincoln Lee wrote: > > >> > > > > Thanks @Piort for your valuable inputs! > > >> > > > > > > >> > > > > I did a quick read of the previous discussion you mentioned, > > >> seems my > > >> > > > flip > > >> > > > > title doesn't give a clear scope here and make some > confusions, > > >> if my > > >> > > > > understanding is correct, the UDFs in your context is the user > > >> > > > > implemented `org.apache.flink.api.common.functions.Function`s, > > >> while > > >> > > the > > >> > > > > `UserDefinedFunction` I mentioned in the flip is limited to > the > > >> > > > flink-table > > >> > > > > module which located in `org.apache.flink.table.functions`. > > >> > > > > > > >> > > > > Here's an use case we've met recently (which is indeed the > > >> motivation > > >> > > to > > >> > > > > propose this): > > >> > > > > one of our user implemented a > > >> > > > > `org.apache.flink.table.functions.TableFunction`, the > simplified > > >> > > > > pseudo-code is as below: > > >> > > > > > > >> > > > > ``` > > >> > > > > class XFunction extend TableFunction<Out> { > > >> > > > > > > >> > > > > open(FunctionContext context){ > > >> > > > > initMemQueue(); > > >> > > > > initPythonProc() > > >> > > > > } > > >> > > > > > > >> > > > > eval(In in){ > > >> > > > > queue.offer(data) > > >> > > > > Out out = queue.poll() > > >> > > > > if (out != null) { > > >> > > > > collect(out) > > >> > > > > } > > >> > > > > } > > >> > > > > > > >> > > > > close(){ > > >> > > > > waitForPythonFinish() > > >> > > > > List<Out> outputs = drainQueue() > > >> > > > > outputs.foreach(out -> collect(out)) > > >> > > > > } > > >> > > > > } > > >> > > > > ``` > > >> > > > > It works well in lower flink versions until they attempt to > do a > > >> > > upgrade > > >> > > > > recently, the 'flush' logic in the legacy close method of > > >> > > `TableFunction` > > >> > > > > cannot work properly any more. > > >> > > > > > > >> > > > > Before proposing the flip, I also considered the `flush()` > > >> extension on > > >> > > > the > > >> > > > > `org.apache.flink.api.common.functions.Function`, because some > > sql > > >> > > > > operators are also related, but currently not included in the > > >> scope of > > >> > > > this > > >> > > > > flip, maybe we can discuss it in another thread. > > >> > > > > > > >> > > > > Wish this helps explaining the reason and welcome your > comments > > >> here! > > >> > > > > > > >> > > > > Best, > > >> > > > > Lincoln Lee > > >> > > > > > > >> > > > > > > >> > > > > Piotr Nowojski <pnowoj...@apache.org> 于2022年9月14日周三 16:56写道: > > >> > > > > > > >> > > > >> Hi Lincoln, > > >> > > > >> > > >> > > > >> Thanks for the proposal. Have you seen the old discussion > about > > >> adding > > >> > > > this > > >> > > > >> `finish()` method? [1] We didn't add it to UDFs, as we didn't > > >> see a > > >> > > > >> motivation (maybe we have missed something), and at the same > > >> time it > > >> > > > wasn't > > >> > > > >> that easy. Plain `finish()` wouldn't be enough. Users would > > need > > >> a way > > >> > > > to > > >> > > > >> output records from the `finish()` call, so it would have to > be > > >> typed > > >> > > > with > > >> > > > >> the user record (`finish(Collector<T> output)`). On the other > > >> hand, we > > >> > > > >> couldn't find an example where a user would actually need the > > >> > > `finish()` > > >> > > > >> call in an UDF, as it seemed to us it makes only sense for > > >> > > > >> operators/functions that are buffering records. Note back > then, > > >> during > > >> > > > the > > >> > > > >> discussion, we were referring to this method as `flush()` or > > >> > > `drain()`. > > >> > > > >> > > >> > > > >> Can you shed some more light and provide more details on the > > >> exact > > >> > > > >> motivating example behind this proposal? > > >> > > > >> > > >> > > > >> Best, > > >> > > > >> Piotrek > > >> > > > >> > > >> > > > >> [1] > > >> https://lists.apache.org/thread/gmr9r3n3ktojt4bhoxz4t8qho6h7d1rp > > >> > > > >> > > >> > > > >> śr., 14 wrz 2022 o 08:22 Lincoln Lee <lincoln.8...@gmail.com > > > > >> > > > napisał(a): > > >> > > > >> > > >> > > > >>> Hello everyone, > > >> > > > >>> > > >> > > > >>> I’d like to open a discussion on FLIP-260[1]: expose > finish > > >> method > > >> > > > for > > >> > > > >>> UserDefinedFunction, this makes a chance for users who rely > on > > >> finish > > >> > > > >> logic > > >> > > > >>> in the legacy close() method (< 1.14) to migrate to the new > > >> finish() > > >> > > > >>> method. > > >> > > > >>> > > >> > > > >>> The task lifecycle was changed in FLINK-22972[2]: a new > > >> finish() > > >> > > > phase > > >> > > > >>> was introduced (extracted the ‘finish’ part out of the > > ‘close’) > > >> and > > >> > > > >> removed > > >> > > > >>> the dispose() method. This change was also done in table > > module > > >> > > (e.g., > > >> > > > >>> `AbstractMapBundleOperator` for mini-batch operation ) but > not > > >> > > covered > > >> > > > >> the > > >> > > > >>> UserDefinedFunction which only exposes open() and close() > api > > >> for > > >> > > > custom > > >> > > > >>> usage, those customers who rely on the legacy close() api > may > > >> > > encounter > > >> > > > >>> wrong result or suffer runtime errors after upgrading to the > > new > > >> > > > version. > > >> > > > >>> Strictly speaking, it is a bug caused by the breaking > change, > > >> but due > > >> > > > to > > >> > > > >>> the public api change, we propose this flip. > > >> > > > >>> > > >> > > > >>> Looking forward to your comments or feedback. > > >> > > > >>> > > >> > > > >>> [1] > > >> > > > >>> > > >> > > > >>> > > >> > > > >> > > >> > > > > > >> > > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+UserDefinedFunction > > >> > > > >>> [2] https://issues.apache.org/jira/browse/FLINK-22972 > > >> > > > >>> > > >> > > > >>> Best, > > >> > > > >>> Lincoln Lee > > >> > > > >>> > > >> > > > > > >> > > > > >> > > > > > >