I think this functionality is already there, we just have to expose it in the right places: ClusterClient.submitJob() takes a JobGraph, JobGraph has method addJar() for adding jars that need to be in the classloader for executing a user program.
> On 16. May 2018, at 12:34, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Ted, > > The design doc is in late draft status and proposes support for SQL DDL > statements (CREATE TABLE, CREATE FUNCTION, etc.). > The question about registering JARs came up because we need a way to > distribute JAR files that contain the code of user-defined functions. > > The design doc will soon be shared on the dev mailing list to gather > feedback from the community. > > Best, Fabian > > 2018-05-16 10:45 GMT+02:00 Ted Yu <yuzhih...@gmail.com>: > >> bq. In a design document, Timo mentioned that we can ship multiple JAR >> files >> >> Mind telling us where the design doc can be retrieved ? >> >> Thanks >> >> On Wed, May 16, 2018 at 1:29 AM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi, >>> >>> I'm not sure if we need to modify the existing method. >>> What we need is a bit different from what registerCachedFile() provides. >>> The method ensures that a file is copied to each TaskManager and can be >>> locally accessed from a function's RuntimeContext. >>> In our case, we don't need to access the file but would like to make sure >>> that it is loaded into the class loader. >>> So, we could also just add a method like registerUserJarFile(). >>> >>> In a design document, Timo mentioned that we can ship multiple JAR files >>> with a job. >>> So, we could also implement the UDF shipping logic by loading the Jar >>> file(s) to the client and distribute them from there. >>> In that case, we would not need to add new method to the execution >>> environment. >>> >>> Best, >>> Fabian >>> >>> 2018-05-15 3:50 GMT+02:00 Rong Rong <walter...@gmail.com>: >>> >>>> +1. This could be very useful for "dynamic" UDF. >>>> >>>> Just to clarify, if I understand correctly, we are tying to use an ENUM >>>> indicator to >>>> (1) Replace the current Boolean isExecutable flag. >>>> (2) Provide additional information used by ExecutionEnvironment to >> decide >>>> when/where to use the DistributedCached file. >>>> >>>> In this case, DistributedCache.CacheType or DistributedCache.FileType >>>> sounds more intuitive, what do you think? >>>> >>>> Also, I was wondering is there any other useful information for the >>> cached >>>> file to be passed to runtime. >>>> If we are just talking about including the library to the classloader, >>> can >>>> we directly extend the interface with >>>> >>>> public void registerCachedFile( >>>> String filePath, >>>> String name, >>>> boolean executable, >>>> boolean includeInClassLoader) >>>> >>>> >>>> Thanks, >>>> Rong >>>> >>>> On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <suez1...@gmail.com> >> wrote: >>>> >>>>> Hi Flink devs, >>>>> >>>>> In an effort to support loading external libraries and creating UDFs >>> from >>>>> external libraries using DDL in Flink SQL, we want to use Flink’s >> Blob >>>>> Server to distribute the external libraries in runtime and load those >>>>> libraries into the user code classloader automatically. >>>>> >>>>> However, the current [Stream]ExecutionEnvironment.registerCachedFile >>>>> interface limits only to registering executable or non-executable >>> blobs. >>>>> It’s not possible to tell in runtime if the blob files are libraries >>> and >>>>> should be loaded into the user code classloader in RuntimeContext. >>>>> Therefore, I want to propose to add an enum called *BlobType* >>> explicitly >>>> to >>>>> indicate the type of the Blob file being distributed, and the >> following >>>>> interface in [Stream]ExecutionEnvironment to support it. In general, >> I >>>>> think the new BlobType information can be used by Flink runtime to >>>>> preprocess the Blob files if needed. >>>>> >>>>> */*** >>>>> ** Registers a file at the distributed cache under the given name. >> The >>>> file >>>>> will be accessible* >>>>> ** from any user-defined function in the (distributed) runtime under >> a >>>>> local path. Files* >>>>> ** may be local files (as long as all relevant workers have access to >>>> it), >>>>> or files in a distributed file system.* >>>>> ** The runtime will copy the files temporarily to a local cache, if >>>>> needed.* >>>>> *** >>>>> ** <p>The {@link org.apache.flink.api.common. >> functions.RuntimeContext} >>>> can >>>>> be obtained inside UDFs via* >>>>> ** {@link >>>>> org.apache.flink.api.common.functions.RichFunction# >>> getRuntimeContext()} >>>>> and >>>>> provides access* >>>>> ** {@link org.apache.flink.api.common.ca >>>>> <http://org.apache.flink.api.common.ca>che.DistributedCache} via* >>>>> ** {@link >>>>> org.apache.flink.api.common.functions.RuntimeContext# >>>>> getDistributedCache()}.* >>>>> *** >>>>> ** @param filePath The path of the file, as a URI (e.g. >>>> "file:///some/path" >>>>> or "hdfs://host:port/and/path")* >>>>> ** @param name The name under which the file is registered.* >>>>> ** @param blobType indicating the type of the Blob file* >>>>> **/* >>>>> >>>>> *public void registerCachedFile(String filePath, String name, >>>>> DistributedCache.BlobType blobType) {...}* >>>>> >>>>> Optionally, we can add another interface to register UDF Jars which >>> will >>>>> use the interface above to implement. >>>>> >>>>> *public void registerJarFile(String filePath, String name) {...}* >>>>> >>>>> The existing interface in the following will be marked deprecated: >>>>> >>>>> *public void registerCachedFile(String filePath, String name, boolean >>>>> executable) {...}* >>>>> >>>>> And the following interface will be implemented using the new >> interface >>>>> proposed above with a EXECUTABLE BlobType: >>>>> >>>>> *public void registerCachedFile(String filePath, String name) { ... >> }* >>>>> >>>>> Thanks a lot. >>>>> Shuyi >>>>> >>>>> "So you have to trust that the dots will somehow connect in your >>> future." >>>>> >>>> >>> >>