Hi all,
   I'll start a vote if there are no more objections till this
thursday(9.22). Looking forward to your feedback!

[1] Flip-260:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction
[2] PoC: https://github.com/lincoln-lil/flink/tree/tf-finish-poc

Best,
Lincoln Lee


Lincoln Lee <lincoln.8...@gmail.com> 于2022年9月19日周一 17:38写道:

> Hi Jingsong,
>    Thank you for participating this discussion!  For the method name, I
> think we should follow the new finish() method in `StreamOperator`,  the
> BoundedOneInput might be removed in the future as discussed [1] before
>
> [1] https://lists.apache.org/thread/3ozw653ql8jso9w55p4pw8p4909trvkb
>
> Best,
> Lincoln Lee
>
>
> Jingsong Li <jingsongl...@gmail.com> 于2022年9月19日周一 10:13写道:
>
>> +1 to add `finish()` method to `TableFunction` only.
>>
>> Can we use `endInput` just like `BoundedOneInput`?
>>
>> Best,
>> Jingsong
>>
>> On Fri, Sep 16, 2022 at 11:54 PM Lincoln Lee <lincoln.8...@gmail.com>
>> wrote:
>> >
>> > Hi Dawid, Piotr,
>> >    Agree with you that add finish() method to `TableFunction` only.
>> Other
>> > `UserDefinedFunction`s (`ScalarFunction`, `AggregateFunction`,
>> > `AggregateTableFunction`) are not necessarily to have the finish
>> > method(they can not emit records in legacy close() method).
>> >
>> > A `TableFunction` is used to correlate with the left table/stream, the
>> > following example shows a case that user only select columns from the
>> > correlated 'FeatureTF' (no left table column was selected):
>> > ```
>> > SELECT feature1, feature2, feature3
>> > FROM MyTable t1
>> > JOIN LATERAL TABLE(FeatureTF(t1.f0, t1.f1)) AS F(feature1, feature2,
>> > feature3) ON TRUE
>> > ```
>> > the 'FeatureTF' can do some flushing work in legacy close() method and
>> this
>> > doesn't break any sql semantics, so I don't see any reason that we can
>> > enforce users not do flushing work in new finish() method. I've updated
>> the
>> > flip doc to limit the change only for `TableFunction`[1].
>> >
>> > For the more powerful `ProcessFunction`, I'd like to share some
>> thoughts:
>> > There indeed exists requirements for advanced usage in Table/SQL, even a
>> > further UD-Operator, e.g., UD-Join for user controlled join logic which
>> can
>> > not simply expressed by SQL. This is an interesting topic, expect more
>> > discussions on this.
>> >
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction
>> >
>> > Best,
>> > Lincoln Lee
>> >
>> >
>> > Piotr Nowojski <pnowoj...@apache.org> 于2022年9月15日周四 22:39写道:
>> >
>> > > Hi Dawid, Lincoln,
>> > >
>> > > I would tend to agree with Dawid. It seems to me like `TableFunction`
>> is
>> > > the one that needs to be taken care of. Other types of
>> > > `UserDefinedFunction` wouldn't be able to emit anything from the
>> `finish()`
>> > > even if we added it. And if we added `finish(Collector<T> out)` to
>> them, it
>> > > would create the same problems (how to pass the output type) that
>> prevented
>> > > us from adding `finish()` to all functions in the DataStream API.
>> > >
>> > > However I'm not sure what should be the long term solution for the
>> Table
>> > > API. For the DataStream API we wanted to provide a new, better and
>> more
>> > > powerful `ProcessFunction` for all of the unusual use cases, that
>> currently
>> > > require the use of `StreamOperator` API instead of `DataStream`
>> functions.
>> > > I don't know what would be an alternative in the Table API.
>> > >
>> > > Dawid, who do you think we should ping from the Table API/SQL teams
>> to chip
>> > > in?
>> > >
>> > > Best,
>> > > Piotrek
>> > >
>> > > czw., 15 wrz 2022 o 12:38 Dawid Wysakowicz <dwysakow...@apache.org>
>> > > napisał(a):
>> > >
>> > > > Hey Lincoln,
>> > > >
>> > > > Thanks for opening the discussion.
>> > > >
>> > > > To be honest I am not convinced if emitting from close there is a
>> > > > contract that was envisioned and thus should be maintained. As far
>> as I
>> > > > can see it does affect only the TableFunction, because it has the
>> > > > collect method. None of the other UDFs (ScalarFunction,
>> > > > AggregateFunction) have means to emit records from close().
>> > > >
>> > > > To be honest I am not sure what would be the consequences of
>> interplay
>> > > > with other operators which expect TableFunction to emit only when
>> eval
>> > > > is called. Not sure if there are such.
>> > > >
>> > > > If it is a thing that we are certain we want to support, I'd be much
>> > > > more comfortable adding finish() to the TableFunction instead.
>> Would be
>> > > > happy to hear opinions from the Table API folks.
>> > > >
>> > > > Best,
>> > > >
>> > > > Dawid
>> > > >
>> > > > On 14/09/2022 15:55, Lincoln Lee wrote:
>> > > > > Thanks @Piort for your valuable inputs!
>> > > > >
>> > > > > I did a quick read of the previous discussion you mentioned,
>> seems my
>> > > > flip
>> > > > > title doesn't give a clear scope here and make some confusions,
>> if my
>> > > > > understanding is correct, the UDFs in your context is the user
>> > > > > implemented `org.apache.flink.api.common.functions.Function`s,
>> while
>> > > the
>> > > > > `UserDefinedFunction` I mentioned in the flip is limited to the
>> > > > flink-table
>> > > > > module which located in `org.apache.flink.table.functions`.
>> > > > >
>> > > > > Here's an use case we've met recently (which is indeed the
>> motivation
>> > > to
>> > > > > propose this):
>> > > > > one of our user implemented a
>> > > > > `org.apache.flink.table.functions.TableFunction`, the simplified
>> > > > > pseudo-code is as below:
>> > > > >
>> > > > > ```
>> > > > > class XFunction extend TableFunction<Out> {
>> > > > >
>> > > > >    open(FunctionContext context){
>> > > > >        initMemQueue();
>> > > > >        initPythonProc()
>> > > > >    }
>> > > > >
>> > > > >    eval(In in){
>> > > > >        queue.offer(data)
>> > > > >        Out out = queue.poll()
>> > > > >        if (out != null) {
>> > > > >          collect(out)
>> > > > >        }
>> > > > >    }
>> > > > >
>> > > > >    close(){
>> > > > >        waitForPythonFinish()
>> > > > >        List<Out> outputs = drainQueue()
>> > > > >        outputs.foreach(out -> collect(out))
>> > > > >    }
>> > > > > }
>> > > > > ```
>> > > > > It works well in lower flink versions until they attempt to do a
>> > > upgrade
>> > > > > recently, the 'flush' logic in the legacy close method of
>> > > `TableFunction`
>> > > > > cannot work properly any more.
>> > > > >
>> > > > > Before proposing the flip, I also considered the `flush()`
>> extension on
>> > > > the
>> > > > > `org.apache.flink.api.common.functions.Function`, because some sql
>> > > > > operators are also related, but currently not included in the
>> scope of
>> > > > this
>> > > > > flip, maybe we can discuss it in another thread.
>> > > > >
>> > > > > Wish this helps explaining the reason and welcome your comments
>> here!
>> > > > >
>> > > > > Best,
>> > > > > Lincoln Lee
>> > > > >
>> > > > >
>> > > > > Piotr Nowojski <pnowoj...@apache.org> 于2022年9月14日周三 16:56写道:
>> > > > >
>> > > > >> Hi Lincoln,
>> > > > >>
>> > > > >> Thanks for the proposal. Have you seen the old discussion about
>> adding
>> > > > this
>> > > > >> `finish()` method? [1] We didn't add it to UDFs, as we didn't
>> see a
>> > > > >> motivation (maybe we have missed something), and at the same
>> time it
>> > > > wasn't
>> > > > >> that easy. Plain `finish()` wouldn't be enough. Users would need
>> a way
>> > > > to
>> > > > >> output records from the `finish()` call, so it would have to be
>> typed
>> > > > with
>> > > > >> the user record (`finish(Collector<T> output)`). On the other
>> hand, we
>> > > > >> couldn't find an example where a user would actually need the
>> > > `finish()`
>> > > > >> call in an UDF, as it seemed to us it makes only sense for
>> > > > >> operators/functions that are buffering records. Note back then,
>> during
>> > > > the
>> > > > >> discussion, we were referring to this method as `flush()` or
>> > > `drain()`.
>> > > > >>
>> > > > >> Can you shed some more light and provide more details on the
>> exact
>> > > > >> motivating example behind this proposal?
>> > > > >>
>> > > > >> Best,
>> > > > >> Piotrek
>> > > > >>
>> > > > >> [1]
>> https://lists.apache.org/thread/gmr9r3n3ktojt4bhoxz4t8qho6h7d1rp
>> > > > >>
>> > > > >> śr., 14 wrz 2022 o 08:22 Lincoln Lee <lincoln.8...@gmail.com>
>> > > > napisał(a):
>> > > > >>
>> > > > >>> Hello everyone,
>> > > > >>>
>> > > > >>>    I’d like to open a discussion on FLIP-260[1]: expose finish
>> method
>> > > > for
>> > > > >>> UserDefinedFunction, this makes a chance for users who rely on
>> finish
>> > > > >> logic
>> > > > >>> in the legacy close() method (< 1.14) to migrate to the new
>> finish()
>> > > > >>> method.
>> > > > >>>
>> > > > >>>    The task lifecycle was changed in FLINK-22972[2]: a new
>> finish()
>> > > > phase
>> > > > >>> was introduced (extracted the ‘finish’ part out of the ‘close’)
>> and
>> > > > >> removed
>> > > > >>> the dispose() method. This change was also done in table module
>> > > (e.g.,
>> > > > >>> `AbstractMapBundleOperator` for mini-batch operation ) but not
>> > > covered
>> > > > >> the
>> > > > >>> UserDefinedFunction which only exposes open() and close() api
>> for
>> > > > custom
>> > > > >>> usage, those customers who rely on the legacy close() api may
>> > > encounter
>> > > > >>> wrong result or suffer runtime errors after upgrading to the new
>> > > > version.
>> > > > >>> Strictly speaking, it is a bug caused by the breaking change,
>> but due
>> > > > to
>> > > > >>> the public api change, we propose this flip.
>> > > > >>>
>> > > > >>>    Looking forward to your comments or feedback.
>> > > > >>>
>> > > > >>> [1]
>> > > > >>>
>> > > > >>>
>> > > > >>
>> > > >
>> > >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+UserDefinedFunction
>> > > > >>> [2] https://issues.apache.org/jira/browse/FLINK-22972
>> > > > >>>
>> > > > >>> Best,
>> > > > >>> Lincoln Lee
>> > > > >>>
>> > > >
>> > >
>>
>

Reply via email to