[
https://issues.apache.org/jira/browse/FLINK-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17046526#comment-17046526
]
Piotr Nowojski commented on FLINK-15786:
----------------------------------------
[~maguowei], when I was initially thinking about Plugins, I was mostly
targeting SQL Table Sinks and Sources. The points that you have raised are
valid and they are indeed currently making connectors loading via plugins
challenging. However, I think there are ways to solve them, by changing the way
how are we instantiating sources and sinks.
For example currently it works like this:
1. SourceFunction class is loaded by an user on the client side
2. SourceFunction is instantiated/constructed on the client side, using some
custom java constructors (like
{{org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer#FlinkKafkaConsumer(java.lang.String,
org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema<T>,
java.util.Properties)}}
3. SourceFunction is serialised on the client side
4. SourceFunction is deserialised on the TM
One potential solution would to provide new API for constructing sources. User
would be creating some kind of source descriptor on the client side, not the
SourceFunction/SourceReader classes. Later Flink's SourceOperator/SourceTask
would use this generic descriptor, to load connector class using plugin and
insatiate it from there. For example:
1. User defines SourceDescriptor, maybe just a simple pojo with configuration.
Or even just some properties file (Map<String, String>).
2. DataStream API on the client side, could deduce from this descriptor, what's
the type of DataStream class (for example {{DataStream<MyFancyPojo>}}), so
defining rest of the DataStream application from the user perspective could
look exactly the same as it looks right now.
3. SourceDescriptor is serialised and sent over the network to the TaskManagers
4. Task manager deserialises SourceDescriptor and SourceOperator/SourceTask (no
external dependencies) and...
a) it checks in the descriptor what plugin to load - and it loads the
correct plugin class loader, for example "Kafka"
b) it uses some kind of builder/factory to construct "Kafka"'s
SourceFunction/SourceReader from the descriptor
Important bit is that SourceDescriptor could be completely generic Flink's
class, without any dependencies. Any external dependency would be loaded in the
plugin's context on the TaskManager.
This might work with zero API changes in the Table API/SQL, as the sources are
already defined in this way - with some generic descriptor. It would require to
design new DataStream API though.
Also, instead of using "SourceDescriptor", maybe we could just re-use existing
{{StreamOperatorFactory}} concept. So treat the above message as some kind of
high level gist of an idea, not anything concrete.
> Load connector code with separate classloader
> ---------------------------------------------
>
> Key: FLINK-15786
> URL: https://issues.apache.org/jira/browse/FLINK-15786
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Task
> Reporter: Guowei Ma
> Priority: Major
> Labels: usability
>
> Currently, connector code can be seen as part of user code. Usually, users
> only need to add the corresponding connector as a dependency and package it
> in the user jar. This is convenient enough.
> However, connectors usually need to interact with external systems and often
> introduce heavy dependencies, there is a high possibility of a class conflict
> of different connectors or the user code of the same job. For example, every
> one or two weeks, we will receive issue reports relevant with connector class
> conflict from our users. The problem can get worse when users want to analyze
> data from different sources and write output to different sinks.
> Using separate classloader to load the different connector code could resolve
> the problem.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)