[
https://issues.apache.org/jira/browse/FLINK-24558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-24558:
-----------------------------------
Labels: pull-request-available (was: )
> dataStream can not use multiple classloaders
> ---------------------------------------------
>
> Key: FLINK-24558
> URL: https://issues.apache.org/jira/browse/FLINK-24558
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Reporter: bai sui
> Priority: Minor
> Labels: pull-request-available
> Attachments: Flink ClassLoader优化 (1).png
>
>
> create a dataStream demo as below,in the demo,create a very simple example,
> read stream data from sourceFunction,and send it to sinkFunction without any
> processing.
> The point is,by creating the instance of SourceFunction and SinkFunction has
> used two separately URLClassLoader with different dependencies,for avoiding
> the code conflict .
> but the problem is flink client send to server ,the server side throw an
> classNotFoundException which defined the de classloader dependencies,
> Obviously the server side has not use the classloader as client side.
> how can I solve the problem ,is there any one can give me some advice ?
> thanks a lot
>
> {code:java}
> public class FlinkStreamDemo {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> SourceFunction<DTO> sourceFunc = createSourceFunction();
> DataStreamSource<DTO> dtoDataStreamSource = env.addSource(sourceFunc);
> SinkFunction<DTO> sinkFunction = createSink();
> dtoDataStreamSource.addSink(sinkFunction);
> env.execute("flink-example");
> }
> private static SinkFunction<DTO> createSink() {
> URL[] urls = new URL[]{...};
> ClassLoader classLoader = new URLClassLoader(urls);
> ServiceLoader<ISinkFunctionFactory> loaders =
> ServiceLoader.load(ISinkFunctionFactory.class, classLoader);
> Iterator<ISinkFunctionFactory> it = loaders.iterator();
> if (it.hasNext()) {
> return it.next().create();
> }
> throw new IllegalStateException();
> }
> private static SourceFunction<DTO> createSourceFunction() {
> URL[] urls = new URL[]{...};
> ClassLoader classLoader = new URLClassLoader(urls);
> ServiceLoader<ISourceFunctionFactory> loaders =
> ServiceLoader.load(ISourceFunctionFactory.class, classLoader);
> Iterator<ISourceFunctionFactory> it = loaders.iterator();
> if (it.hasNext()) {
> return it.next().create();
> }
> throw new IllegalStateException();
> }
> public interface ISinkFunctionFactory {
> SinkFunction<DTO> create();
> }
> public interface ISourceFunctionFactory {
> SourceFunction<DTO> create();
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)