[ 
https://issues.apache.org/jira/browse/FLINK-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17046526#comment-17046526
 ] 

Piotr Nowojski commented on FLINK-15786:
----------------------------------------

[~maguowei], when I was initially thinking about Plugins, I was mostly 
targeting SQL Table Sinks and Sources. The points that you have raised are 
valid and they are indeed currently making connectors loading via plugins 
challenging. However, I think there are ways to solve them, by changing the way 
how are we instantiating sources and sinks.

For example currently it works like this:
1. SourceFunction class is loaded by an user on the client side
2. SourceFunction is instantiated/constructed on the client side, using some 
custom java constructors (like 
{{org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer#FlinkKafkaConsumer(java.lang.String,
 org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema<T>, 
java.util.Properties)}}
3. SourceFunction is serialised on the client side
4. SourceFunction is deserialised on the TM

One potential solution would to provide new API for constructing sources. User 
would be creating some kind of source descriptor on the client side, not the 
SourceFunction/SourceReader classes. Later Flink's SourceOperator/SourceTask 
would use this generic descriptor, to load connector class using plugin and 
insatiate it from there. For example:

1. User defines SourceDescriptor, maybe just a simple pojo with configuration. 
Or even just some properties file (Map<String, String>).
2. DataStream API on the client side, could deduce from this descriptor, what's 
the type of DataStream class (for example {{DataStream<MyFancyPojo>}}), so 
defining rest of the DataStream application from the user perspective could 
look exactly the same as it looks right now.
3. SourceDescriptor is serialised and sent over the network to the TaskManagers
4. Task manager deserialises SourceDescriptor and SourceOperator/SourceTask (no 
external dependencies) and...
   a) it checks in the descriptor what plugin to load - and it loads the 
correct plugin class loader, for example "Kafka"
   b) it uses some kind of builder/factory to construct "Kafka"'s 
SourceFunction/SourceReader from the descriptor

Important bit is that SourceDescriptor could be completely generic Flink's 
class, without any dependencies. Any external dependency would be loaded in the 
plugin's context on the TaskManager.


This might work with zero API changes in the Table API/SQL, as the sources are 
already defined in this way - with some generic descriptor. It would require to 
design new DataStream API though.

Also, instead of using "SourceDescriptor", maybe we could just re-use existing 
{{StreamOperatorFactory}} concept. So treat the above message as some kind of 
high level gist of an idea, not anything concrete.

> Load connector code with separate classloader
> ---------------------------------------------
>
>                 Key: FLINK-15786
>                 URL: https://issues.apache.org/jira/browse/FLINK-15786
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>            Reporter: Guowei Ma
>            Priority: Major
>              Labels: usability
>
> Currently, connector code can be seen as part of user code. Usually, users 
> only need to add the corresponding connector as a dependency and package it 
> in the user jar. This is convenient enough.
> However, connectors usually need to interact with external systems and often 
> introduce heavy dependencies, there is a high possibility of a class conflict 
> of different connectors or the user code of the same job. For example, every 
> one or two weeks, we will receive issue reports relevant with connector class 
> conflict from our users. The problem can get worse when users want to analyze 
> data from different sources and write output to different sinks.
> Using separate classloader to load the different connector code could resolve 
> the problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to