+1. This could be very useful for "dynamic" UDF.

Just to clarify, if I understand correctly, we are tying to use an ENUM
indicator to
(1) Replace the current Boolean isExecutable flag.
(2) Provide additional information used by ExecutionEnvironment to decide
when/where to use the DistributedCached file.

In this case, DistributedCache.CacheType or DistributedCache.FileType
sounds more intuitive, what do you think?

Also, I was wondering is there any other useful information for the cached
file to be passed to runtime.
If we are just talking about including the library to the classloader, can
we directly extend the interface with

public void registerCachedFile(
    String filePath,
    String name,
    boolean executable,
    boolean includeInClassLoader)


Thanks,
Rong

On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <suez1...@gmail.com> wrote:

> Hi Flink devs,
>
> In an effort to support loading external libraries and creating UDFs from
> external libraries using DDL in Flink SQL, we want to use Flink’s Blob
> Server to distribute the external libraries in runtime and load those
> libraries into the user code classloader automatically.
>
> However, the current [Stream]ExecutionEnvironment.registerCachedFile
> interface limits only to registering executable or non-executable blobs.
> It’s not possible to tell in runtime if the blob files are libraries and
> should be loaded into the user code classloader in RuntimeContext.
> Therefore, I want to propose to add an enum called *BlobType* explicitly to
> indicate the type of the Blob file being distributed, and the following
> interface in [Stream]ExecutionEnvironment to support it. In general, I
> think the new BlobType information can be used by Flink runtime to
> preprocess the Blob files if needed.
>
> */***
> ** Registers a file at the distributed cache under the given name. The file
> will be accessible*
> ** from any user-defined function in the (distributed) runtime under a
> local path. Files*
> ** may be local files (as long as all relevant workers have access to it),
> or files in a distributed file system.*
> ** The runtime will copy the files temporarily to a local cache, if
> needed.*
> ***
> ** <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can
> be obtained inside UDFs via*
> ** {@link
> org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()}
> and
> provides access*
> ** {@link org.apache.flink.api.common.ca
> <http://org.apache.flink.api.common.ca>che.DistributedCache} via*
> ** {@link
> org.apache.flink.api.common.functions.RuntimeContext#
> getDistributedCache()}.*
> ***
> ** @param filePath The path of the file, as a URI (e.g. "file:///some/path"
> or "hdfs://host:port/and/path")*
> ** @param name The name under which the file is registered.*
> ** @param blobType indicating the type of the Blob file*
> **/*
>
> *public void registerCachedFile(String filePath, String name,
> DistributedCache.BlobType blobType) {...}*
>
> Optionally, we can add another interface to register UDF Jars which will
> use the interface above to implement.
>
> *public void registerJarFile(String filePath, String name) {...}*
>
> The existing interface in the following will be marked deprecated:
>
> *public void registerCachedFile(String filePath, String name, boolean
> executable) {...}*
>
> And the following interface will be implemented using the new interface
> proposed above with a EXECUTABLE BlobType:
>
> *public void registerCachedFile(String filePath, String name) { ... }*
>
> Thanks a lot.
> Shuyi
>
> "So you have to trust that the dots will somehow connect in your future."
>

Reply via email to