[ 
https://issues.apache.org/jira/browse/FLINK-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17048459#comment-17048459
 ] 

Guowei Ma edited comment on FLINK-15786 at 3/2/20 4:21 AM:
-----------------------------------------------------------

I agree that we could focus on separating the connector class loader from the 
user class loader in the Table/SQL stack at first. However, I think we should 
resolve the FlinkKafkaPartitioner problem even if we only focus on the 
Table/SQL stack because the Table/SQL user might provide their own 
FlinkKafkaPartitioner implementation in theory.

 There might be two options:
 # The FlinkKafkaPartitioner could be treated as an SPI. User could let 
FlinkKafaPartitioner load by the parent class loader. For example the Kafka 
connector could be splitted into two jars. One is for the SPI for user to 
extended. User could put the Kafka SPI jar to the lib/ directory and put the 
other one to plugin dir. I think we might keep the same as it is at the 
client-side. However, IMHO the FlinkKafkaPartitioner SPI has some differences 
with other Flink SPI such as Source/Sink SPI. Source/Sink SPI would be used by 
the Flink and FlinkKafakaPartitioner would only be used by FlinkKakfaConnector. 
In my view, it might be a little tricky that we let the user put the SPI of a 
specific connector to the parent class loader. This also gives extra burden to 
the connector developer and the end-user when deploying a job.
 # Putting the customized FlinkKafkaPartioner and FlinkKafkaPartitioner 
interface together into the plugin directory. However, there might need some 
change at the client side because the user would reference the customized 
FlinkKafkaPartition and it is not in the user classpath. For example, let the 
user class path include the plugin directory at the client-side. But I think it 
is not a good choice that there is different behavior of user code class loader 
in client and task manager side. Or we could only allow the user to reference 
the customized FlinkKafaPatitioner as a string.

 

In general the above two options keep the user and connector class loader 
orthogonal. I think it is a good choice. However, what we might pay some 
attention to is how to set the thread's context classloader. Currently, 
TaskManager sets the FlinkUserCodeClassloader to context class loader. 
Sometimes the operator needs the connector class loader for some class. Such as 
Flink-16262. Sometimes the operator needs the FlinkUserCodeClassloader for 
customized FlinkKafkaPartitioner. The connector author should pay much 
attention to these things.

 

XClassLoader means that it could know where to find the user class and the 
connector class. For example, we could make FlinkUserCodeClassloader contain 
two type class loaders: one is system class loader and the other is the 
connecter class loaders which could come from the plugin system. The order of 
searching for a class is:
 # jar in the user classpath
 # jar in the connectors jars
 # jar in the system classpath

 

 

For example, the user implements a FlinkKafkaPartitioner we could call it 
DynamicFlinkKafkaPartitioner. And we keep creating the source operator at the 
client. At the client side the Kafka source class could be found at the 
connector jars by the FlinkUserCodeClassloader. When creating the 
DynamicFlinkKafaPartitioner object at task manager side by the reflection:
 # Find the FlinkKafkaPartition class in the connector jars by the 
FlinkUserCodeClassloader
 # Find the DynamicFlinkKafaParitioner class in the user class path by the 
FlinkUserCodeClassloader.

The pros of XClassLoader are that there no additional burden for connector 
developer/user. We could always set the XClassLoader as the context class 
loader of stream task.


was (Author: maguowei):
First of all I think we could focus on separating the connector class loader 
from user class loader in the Table/Sql stack at first.

The FlinkKafkaPartitioner could be treated as a SPI. User could let 
FlinkKafaPartitioner load by the parent class loader. For example the Kafka 
connector could be splitted into two jars. One is for the SPI for user to 
extended. User could put the Kafka SPI jar to the lib/ directory and put the 
other one to plugin dir. So I think it is an option to resolve the 
FlinkKafkaPartitioner problem . I have two little concerns:
 # What is the context class loader of the task thread? Currently, TaskManager 
sets the FlinkUserCodeClassloader to context class loader. If 
FlinkUserCodeClassloader does not know the connector jars there might be some 
problem. Such as Flink-16262. Of course we could set context class loader 
dynamically if we could know what exactly operator behavior is .
 # IMHO, the FlinkKafkaPartitioner SPI has some difference with other Flink SPI 
such as Source/Sink SPI. Source/Sink SPI would be used by the Flink and 
FlinkKafakaPartitioner would only be used by FlinkKakfaConnector. In my view it 
might be a little tricky that we let the user put the SPI of a specific 
connector to the parent class loader. This also gives some burden to the 
connector developer and the end user when deploying a job.

XClassLoader means that it could know where to find the user class and the 
connector class. For example we could make FlinkUserCodeClassloader contain two 
type class loaders: one is system class loader and the other is the connecter 
class loaders which could come from the plugin system. The order of search a 
class is:
 # jar in the user class path
 # jar in the connectors jars
 # jar in the system class path

 

For example the user implements a FlinkKafkaPartitioner we could call it 
DynamicFlinkKafkaPartitioner. And we keep create the source operator at client. 
At the client side the Kafka source class could be found at the connector jars 
by the FlinkUserCodeClassloader. When creating the DynamicFlinkKafaPartitioner 
object at task manager side by the reflection:
 # Find the FlinkKafkaPartition class in the connector jars by the 
FlinkUserCodeClassloader
 # Find the DynamicFlinkKafaParitioner class in the user class path by the 
FlinkUserCodeClassloader.

The pros of XClassLoader are that there no additional burden for connector 
developer/user. We could always set the XClassLoader as the context class 
loader of stream task.

> Load connector code with separate classloader
> ---------------------------------------------
>
>                 Key: FLINK-15786
>                 URL: https://issues.apache.org/jira/browse/FLINK-15786
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>            Reporter: Guowei Ma
>            Priority: Major
>              Labels: usability
>
> Currently, connector code can be seen as part of user code. Usually, users 
> only need to add the corresponding connector as a dependency and package it 
> in the user jar. This is convenient enough.
> However, connectors usually need to interact with external systems and often 
> introduce heavy dependencies, there is a high possibility of a class conflict 
> of different connectors or the user code of the same job. For example, every 
> one or two weeks, we will receive issue reports relevant with connector class 
> conflict from our users. The problem can get worse when users want to analyze 
> data from different sources and write output to different sinks.
> Using separate classloader to load the different connector code could resolve 
> the problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to