I think this functionality is already there, we just have to expose it in the 
right places: ClusterClient.submitJob() takes a JobGraph, JobGraph has method 
addJar() for adding jars that need to be in the classloader for executing a 
user program.

> On 16. May 2018, at 12:34, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Ted,
> 
> The design doc is in late draft status and proposes support for SQL DDL
> statements (CREATE TABLE, CREATE  FUNCTION, etc.).
> The question about registering JARs came up because we need a way to
> distribute JAR files that contain the code of user-defined functions.
> 
> The design doc will soon be shared on the dev mailing list to gather
> feedback from the community.
> 
> Best, Fabian
> 
> 2018-05-16 10:45 GMT+02:00 Ted Yu <yuzhih...@gmail.com>:
> 
>> bq. In a design document, Timo mentioned that we can ship multiple JAR
>> files
>> 
>> Mind telling us where the design doc can be retrieved ?
>> 
>> Thanks
>> 
>> On Wed, May 16, 2018 at 1:29 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>> 
>>> Hi,
>>> 
>>> I'm not sure if we need to modify the existing method.
>>> What we need is a bit different from what registerCachedFile() provides.
>>> The method ensures that a file is copied to each TaskManager and can be
>>> locally accessed from a function's RuntimeContext.
>>> In our case, we don't need to access the file but would like to make sure
>>> that it is loaded into the class loader.
>>> So, we could also just add a method like registerUserJarFile().
>>> 
>>> In a design document, Timo mentioned that we can ship multiple JAR files
>>> with a job.
>>> So, we could also implement the UDF shipping logic by loading the Jar
>>> file(s) to the client and distribute them from there.
>>> In that case, we would not need to add new method to the execution
>>> environment.
>>> 
>>> Best,
>>> Fabian
>>> 
>>> 2018-05-15 3:50 GMT+02:00 Rong Rong <walter...@gmail.com>:
>>> 
>>>> +1. This could be very useful for "dynamic" UDF.
>>>> 
>>>> Just to clarify, if I understand correctly, we are tying to use an ENUM
>>>> indicator to
>>>> (1) Replace the current Boolean isExecutable flag.
>>>> (2) Provide additional information used by ExecutionEnvironment to
>> decide
>>>> when/where to use the DistributedCached file.
>>>> 
>>>> In this case, DistributedCache.CacheType or DistributedCache.FileType
>>>> sounds more intuitive, what do you think?
>>>> 
>>>> Also, I was wondering is there any other useful information for the
>>> cached
>>>> file to be passed to runtime.
>>>> If we are just talking about including the library to the classloader,
>>> can
>>>> we directly extend the interface with
>>>> 
>>>> public void registerCachedFile(
>>>>    String filePath,
>>>>    String name,
>>>>    boolean executable,
>>>>    boolean includeInClassLoader)
>>>> 
>>>> 
>>>> Thanks,
>>>> Rong
>>>> 
>>>> On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <suez1...@gmail.com>
>> wrote:
>>>> 
>>>>> Hi Flink devs,
>>>>> 
>>>>> In an effort to support loading external libraries and creating UDFs
>>> from
>>>>> external libraries using DDL in Flink SQL, we want to use Flink’s
>> Blob
>>>>> Server to distribute the external libraries in runtime and load those
>>>>> libraries into the user code classloader automatically.
>>>>> 
>>>>> However, the current [Stream]ExecutionEnvironment.registerCachedFile
>>>>> interface limits only to registering executable or non-executable
>>> blobs.
>>>>> It’s not possible to tell in runtime if the blob files are libraries
>>> and
>>>>> should be loaded into the user code classloader in RuntimeContext.
>>>>> Therefore, I want to propose to add an enum called *BlobType*
>>> explicitly
>>>> to
>>>>> indicate the type of the Blob file being distributed, and the
>> following
>>>>> interface in [Stream]ExecutionEnvironment to support it. In general,
>> I
>>>>> think the new BlobType information can be used by Flink runtime to
>>>>> preprocess the Blob files if needed.
>>>>> 
>>>>> */***
>>>>> ** Registers a file at the distributed cache under the given name.
>> The
>>>> file
>>>>> will be accessible*
>>>>> ** from any user-defined function in the (distributed) runtime under
>> a
>>>>> local path. Files*
>>>>> ** may be local files (as long as all relevant workers have access to
>>>> it),
>>>>> or files in a distributed file system.*
>>>>> ** The runtime will copy the files temporarily to a local cache, if
>>>>> needed.*
>>>>> ***
>>>>> ** <p>The {@link org.apache.flink.api.common.
>> functions.RuntimeContext}
>>>> can
>>>>> be obtained inside UDFs via*
>>>>> ** {@link
>>>>> org.apache.flink.api.common.functions.RichFunction#
>>> getRuntimeContext()}
>>>>> and
>>>>> provides access*
>>>>> ** {@link org.apache.flink.api.common.ca
>>>>> <http://org.apache.flink.api.common.ca>che.DistributedCache} via*
>>>>> ** {@link
>>>>> org.apache.flink.api.common.functions.RuntimeContext#
>>>>> getDistributedCache()}.*
>>>>> ***
>>>>> ** @param filePath The path of the file, as a URI (e.g.
>>>> "file:///some/path"
>>>>> or "hdfs://host:port/and/path")*
>>>>> ** @param name The name under which the file is registered.*
>>>>> ** @param blobType indicating the type of the Blob file*
>>>>> **/*
>>>>> 
>>>>> *public void registerCachedFile(String filePath, String name,
>>>>> DistributedCache.BlobType blobType) {...}*
>>>>> 
>>>>> Optionally, we can add another interface to register UDF Jars which
>>> will
>>>>> use the interface above to implement.
>>>>> 
>>>>> *public void registerJarFile(String filePath, String name) {...}*
>>>>> 
>>>>> The existing interface in the following will be marked deprecated:
>>>>> 
>>>>> *public void registerCachedFile(String filePath, String name, boolean
>>>>> executable) {...}*
>>>>> 
>>>>> And the following interface will be implemented using the new
>> interface
>>>>> proposed above with a EXECUTABLE BlobType:
>>>>> 
>>>>> *public void registerCachedFile(String filePath, String name) { ...
>> }*
>>>>> 
>>>>> Thanks a lot.
>>>>> Shuyi
>>>>> 
>>>>> "So you have to trust that the dots will somehow connect in your
>>> future."
>>>>> 
>>>> 
>>> 
>> 

Reply via email to