> -----原始邮件-----
> 发件人: "Timo Walther" <twal...@apache.org>
> 发送时间: 2022-03-24 17:09:38 (星期四)
> 收件人: dev@flink.apache.org
> 抄送: 
> 主题: Re: 回复:Re:[DISCUSS] FLIP-214 Support Advanced Function DDL
> 
> Hi Ron,
> 
> thanks for opening this discussion and proposing a FLIP. This feature 
> has been requested multiple times before and it is definitely something 
> that the Flink community wants. However, we have to ensure that this 
> change fits into the overall architecture and doesn't break core 
> assumptions. The function stack is already pretty complex today and we 
> should encapsulate the change as much as possible from other components. 
> Some feedback from my side:
> 
> 1) Filesystem abstraction
> 
> I share Martijn's concerns. This FLIP seems to be HDFS-centric to me. 
> There are other filesystems such as S3 that should be considered as 
> well. It would be good to use Flink's filesystem abstraction to load 
> resources.
> 
> 2) Table API
> 
> Currently, the FLIP only mentions SQL syntax. Please also provide a 
> programmatic API. So far we offer `TableEnvironment.createFunction`but 
> all of them only take classes or instances, we should provide a method 
> that takes a string and resource information.
> 
> 3) ResourceUri
> 
> You mention ResourceUri in the interface of CatalogFunction. Where does 
> this class come from? What does it contain?
> 
> 4) Function registration vs. function usage
> 
> You mentioned that a `CREATE FUNCTION ... USING JAR` will directly use 
> load resources into the user class loader. This would break our current 
> assumptions. Currently, `CREATE` are pure metadata catalog operations. 
> For `CREATE TABLE`, we neither check for available connectors nor 
> validity of the provided options. We should stick to this principle and 
> let `CREATE FUNCTION` an easy "put metadata into catalog" operation.
> 
> In the end, the function usage is the important piece and not the 
> function registration. The planner needs to validate whether resources 
> are still available and load them when looking up a function in a catalog.
> 
> 5) Cluster execution
> 
> It would be great if you can elaborate a bit on the implementation by 
> mentioning internal classes. Does pipeline.jars support HDFS or other 
> file systems already? I think currently it is meant only to submit local 
> files via blob store. So how are the HDFS/S3 resources fetched during 
> execution in the cluster? Which entity fetches them?
> 
> 5) API Implementation
> 
> Could you elaborate roughly what should go into Planner, 
> FunctionCatalog, CatalogManager, TableEnvironment and higher layers?
> 
> What happens in case of an EXPLAIN? Will the loaded resources be cleaned 
> up?
> 
> What happens if TableEnvironment is used in an interactive session for 
> hundreds of submissions? Will the class loader grow infinitely?
> 
> I think we will need some "Resource Manager" entity (either in 
> CatalogManager or as a separate entity in TableEnvironment) that 
> performs bookkeeping of used/loaded resources. This entity should also 
> provide a UserClassLoader that inherits from the classloader given via 
> EnvironmentSettings.
> 
> The planner should return which resources it needs in the end. Take 
> `SELECT MyFunc(1)` as an example, this function will be executed on the 
> client side during constant folding, thus the resources of `MyFunc` are 
> not necessary anymore in the cluster. Only the planner knows this after 
> optimization. It might change the signature of Planner#translate to 
> return Tuple2<List<Transformation<?>>, List<Resources>>.
> 
> However, in Table API we resolve functions eagerly so we need to keep 
> the resources forever and should provide a cleanup method.
> 
> Regarding FLINK-15635, my colleague Francesco is currently working on 
> resolving this issue.
> 
> Regards,
> Timo
> 
> 
> 
> Am 24.03.22 um 09:18 schrieb Martijn Visser:
> > Hi Ron,
> >
> > Thanks for creating the FLIP. You're talking about both local and remote
> > resources. With regards to remote resources, how do you see this work with
> > Flink's filesystem abstraction? I did read in the FLIP that Hadoop
> > dependencies are not packaged, but I would hope that we do that for all
> > filesystem implementation. I don't think it's a good idea to have any tight
> > coupling to file system implementations, especially if at some point we
> > could also externalize file system implementations (like we're doing for
> > connectors already). I think the FLIP would be better by not only
> > referring to "Hadoop" as a remote resource provider, but a more generic
> > term since there are more options than Hadoop.
> >
> > I'm also thinking about security/operations implications: would it be
> > possible for bad actor X to create a JAR that either influences other
> > running jobs, leaks data or credentials or anything else? If so, I think it
> > would also be good to have an option to disable this feature completely. I
> > think there are roughly two types of companies who run Flink: those who
> > open it up for everyone to use (here the feature would be welcomed) and
> > those who need to follow certain minimum standards/have a more closed Flink
> > ecosystem). They usually want to validate a JAR upfront before making it
> > available, even at the expense of speed, because it gives them more control
> > over what will be running in their environment.
> >
> > Best regards,
> >
> > Martijn Visser
> > https://twitter.com/MartijnVisser82
> >
> >
> > On Wed, 23 Mar 2022 at 16:47, 刘大龙 <ld...@zju.edu.cn> wrote:
> >
> >>
> >>
> >>> -----原始邮件-----
> >>> 发件人: "Peter Huang" <huangzhenqiu0...@gmail.com>
> >>> 发送时间: 2022-03-23 11:13:32 (星期三)
> >>> 收件人: dev <dev@flink.apache.org>
> >>> 抄送:
> >>> 主题: Re: 回复:Re:[DISCUSS] FLIP-214 Support Advanced Function DDL
> >>>
> >>> Hi Ron,
> >>>
> >>> Thanks for reviving the discussion of the work. The design looks good. A
> >>> small typo in the FLIP is that currently it is marked as released in
> >> 1.16.
> >>>
> >>> Best Regards
> >>> Peter Huang
> >>>
> >>>
> >>> On Tue, Mar 22, 2022 at 10:58 PM Mang Zhang <zhangma...@163.com> wrote:
> >>>
> >>>> hi Yuxia,
> >>>>
> >>>>
> >>>> Thanks for your reply. Your reminder is very important !
> >>>>
> >>>>
> >>>> Since we download the file to the local, remember to clean it up when
> >> the
> >>>> flink client exits
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>>
> >>>> Best regards,
> >>>> Mang Zhang
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> At 2022-03-23 10:02:26, "罗宇侠(莫辞)"
> >>>> <luoyuxia.luoyu...@alibaba-inc.com.INVALID> wrote:
> >>>>> Hi Ron, Thanks for starting this dicuss, some Spark/Hive users will
> >>>> benefit from it. The flip looks good to me. I just have two minor
> >> questions:
> >>>>> 1. For synax explanation, I see it's "Create .... function as
> >>>> identifier....", I think the word "identifier" may not be
> >>>> self-dedescriptive for actually it's not a random name but the name of
> >> the
> >>>> class that provides the implementation for function to be create.
> >>>>> May be it'll be more clear to use "class_name" replace "identifier"
> >> just
> >>>> like what Hive[1]/Spark[2] do.
> >>>>> 2.  >> If the resource used is a remote resource, it will first
> >> download
> >>>> the resource to a local temporary directory, which will be generated
> >> using
> >>>> UUID, and then register the local path to the user class loader.
> >>>>> For the above explanation in this FLIP, It seems for such statement
> >> sets,
> >>>>> ""
> >>>>> Create  function as org.apache.udf1 using jar 'hdfs://myudfs.jar';
> >>>>> Create  function as org.apache.udf2 using jar 'hdfs://myudfs.jar';
> >>>>> ""
> >>>>> it'll download the resource 'hdfs://myudfs.jar' for twice. So is it
> >>>> possible to provide some cache mechanism that we won't need to
> >> download /
> >>>> store for twice?
> >>>>>
> >>>>> Best regards,
> >>>>> Yuxia
> >>>>> [1]
> >> https://cwiki.apache.org/confluence/display/hive/languagemanual+ddl
> >>>>> [2]
> >> https://spark.apache.org/docs/3.0.0-preview/sql-ref-syntax-ddl-create-function.html------------------------------------------------------------------
> >>>>> 发件人:Mang Zhang<zhangma...@163.com>
> >>>>> 日 期:2022年03月22日 11:35:24
> >>>>> 收件人:<dev@flink.apache.org>
> >>>>> 主 题:Re:[DISCUSS] FLIP-214 Support Advanced Function DDL
> >>>>>
> >>>>> Hi Ron, Thank you so much for this suggestion, this is so good.
> >>>>> In our company, when users use custom UDF, it is very inconvenient,
> >> and
> >>>> the code needs to be packaged into the job jar,
> >>>>> and cannot refer to the existing udf jar through the existing udf jar.
> >>>>> Or pass in the jar reference in the startup command.
> >>>>> If we implement this feature, users can focus on their own business
> >>>> development.
> >>>>> I can also contribute if needed.
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>>
> >>>>> Best regards,
> >>>>> Mang Zhang
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> At 2022-03-21 14:57:32, "刘大龙" <ld...@zju.edu.cn> wrote:
> >>>>>> Hi, everyone
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> I would like to open a discussion for support advanced Function DDL,
> >>>> this proposal is a continuation of FLIP-79 in which Flink Function DDL
> >> is
> >>>> defined. Until now it is partially released as the Flink function DDL
> >> with
> >>>> user defined resources is not clearly discussed and implemented. It is
> >> an
> >>>> important feature for support to register UDF with custom jar resource,
> >>>> users can use UDF more more easily without having to put jars under the
> >>>> classpath in advance.
> >>>>>> Looking forward to your feedback.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> [1]
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-214+Support+Advanced+Function+DDL
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> Best,
> >>>>>>
> >>>>>> Ron
> >>>>>>
> >>>>>>
> >> Hi, Peter, Thanks for your feedback. This work also has your effort, thank
> >> you very much.
> >>

Hi, Timo

Thank you for the many suggestions you have given, it has been very useful to 
me. I will answer your doubts separately.

1. Filesystem abstraction: I agree with you that we should use Flink's 
FileSytem abstraction to load resources, it is more better.

2. Table API: Good point, we should also support this feature in Table API 
level, I will propose it in FLIP.

3. ResourceUri: This is a new class which contains resource type and resource 
path, it is referenced from Hive[1]. We need it to encapsulate the type and 
path of resource, In the future, we may support `CREATE FUNCTION ... USING 
FILE` or `CREATE FUNCTION ... USING ARCHIVE`.

4. Function registration vs. function usage: The main reason to consider load 
resource into the user class loader directly when registering Jar is to verify 
the Class specified by name whether exists in the Jar in advance, to avoid 
finding out that the class does not exist only when it is used, which would be 
useful for users and is what Hive does. Considering Flink current assumptions 
that `CREATE` are pure metadata catalog operations, so I agree with you that we 
should stick to the principle and let `CREATE FUNCTION` an easy "put metadata 
into catalog" operation. We only load resource when the function is used. Also, 
I think we need to consider the case where `CREATE TABLE` uses a UDF registered 
via the `CREATE FUNCTION ... USING JAR` syntax to ensure that the resources are 
available when the table is created, and this case needs to be covered by test.

5. Cluster execution: In FLIP, I have mentioned that the downloading of remote 
resources is done by the client, we will download the resources to a temporary 
directory first and upload the files in that temporary directory to the blob 
store via pipeline.jars when submit job. Our core principle is that remote 
resources such as HDFS/S3 are uniformly fetched by the client and then upload 
to the blob store, and the TaskManager will pull the Jar resources directly 
from the blob store at distributed execution.

6. API Implementation: 

>> I think we will need some "Resource Manager" entity (either in
CatalogManager or as a separate entity in TableEnvironment) that
performs bookkeeping of used/loaded resources. This entity should also
provide a UserClassLoader that inherits from the classloader given via
EnvironmentSettings.

I agree with you about this point, we should support a "Resource Manager" 
entity (either in CatalogManager or as a separate entity in TableEnvironment) 
that performs bookkeeping of used/loaded resources. This entity should also 
provide a UserClassLoader that inherits from the classloader given via 
EnvironmentSettings. In FLIP core code design section, I have proposed a 
`UserClassLoaderContext` entity, its function is similar to "Resource Manager".

>> The planner should return which resources it needs in the end. Take
`SELECT MyFunc(1)` as an example, this function will be executed on the
client side during constant folding, thus the resources of `MyFunc` are
not necessary anymore in the cluster. Only the planner knows this after
optimization. It might change the signature of Planner#translate to
return Tuple2<List<Transformation<?>>, List<Resources>>.
However, in Table API we resolve functions eagerly so we need to keep
the resources forever and should provide a cleanup method.

For this point, IMO, it is a good point. However, I think this is an 
improvement. The current impact on the overall job startup/execution is not 
significant, but the changes will be more complicated and we should make a 
trade-off.

>> What happens if TableEnvironment is used in an interactive session for
hundreds of submissions? Will the class loader grow infinitely?

First of all, a session will not be open all the time, and will generally be 
closed after use. Secondly, if accept a lot of submission in an interactive 
session, it means that the user needs to adjust the size of the metadata to 
avoid OOM. Otherwise, there are not many submissions that can be supported 
naturally. I think this should be decided by user.

>> What happens in case of an EXPLAIN? Will the loaded resources be cleaned up?

For this question, I understand that you want to express the cleanup mechanism 
of classes in ClassLoader. For class cleanup, I think we can remove the used 
Jar resources from ClassLoader after query is compiled, and then load them into 
ClassLoader when it is used next time, but Jar itself has cache, so it will not 
re-download Jar. I think this also can solve the problem of the class loader 
grow infinitely in an interactive session for hundreds of submissions.

Overall, since FunctionCatalog is responsible for UDF registration and 
discovery, I think `FunctionCatalog` should go into Planner, which holds the 
Resource Manager entity. Resource Manager is responsible for loading resources 
into the class loader.

>> Regarding FLINK-15635, my colleague Francesco is currently working on 
>> resolving this issue.

Very happy to hear this, it is good to me.


Thank you again for these valuable suggestions, I will update the FLIP.

[1] 
https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1012

Best,

Ron

Reply via email to