[
https://issues.apache.org/jira/browse/FLINK-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17048459#comment-17048459
]
Guowei Ma edited comment on FLINK-15786 at 3/2/20 4:36 AM:
-----------------------------------------------------------
I agree that we could focus on separating the connector class loader from the
user class loader in the Table/SQL stack at first. However, I think we should
resolve the FlinkKafkaPartitioner problem even if we only focus on the
Table/SQL stack because the Table/SQL user might provide their own
FlinkKafkaPartitioner implementation in theory.
There might be two options:
# The FlinkKafkaPartitioner could be treated as an SPI. User could let
FlinkKafaPartitioner load by the parent class loader. For example the Kafka
connector could be splitted into two jars. One is for the SPI for user to
extended. User could put the Kafka SPI jar to the lib/ directory and put the
other one to plugin dir. However, IMHO the FlinkKafkaPartitioner SPI has some
differences with other Flink SPI such as Source/Sink SPI. Source/Sink SPI would
be used by the Flink and FlinkKafakaPartitioner would only be used by
FlinkKakfaConnector. In my view, it might be a little tricky that we let the
user put the SPI of a specific connector to the parent class loader. This also
gives extra burden to the connector developer and the end-user when deploying a
job.
# Putting the customized FlinkKafkaPartioner and FlinkKafkaPartitioner
interface together into the plugin directory. However, there might need some
change at the client side because the user would reference the customized
FlinkKafkaPartition and it is not in the user classpath. For example, let the
user class path include the plugin directory at the client-side. But I think it
is not a good choice that there is different behavior of user code class loader
in client and task manager side. Or we could only allow the user to reference
the customized FlinkKafaPatitioner as a string.
In general the above two options keep the user and connector class loader
orthogonal. I think it is a good choice. However, what we might pay some
attention to is how to set the thread's context classloader. Currently,
TaskManager sets the FlinkUserCodeClassloader to context class loader.
Sometimes the operator needs the connector class loader for some class. Such as
Flink-16262. Sometimes the operator needs the FlinkUserCodeClassloader for
customized FlinkKafkaPartitioner. The connector author should pay much
attention to these things.
XClassLoader means that it could know where to find the user class and the
connector class. For example, we could make FlinkUserCodeClassloader contain
two type class loaders: one is system class loader and the other is the
connecter class loaders which could come from the plugin system. The order of
searching for a class is:
# jar in the user classpath
# jar in the connectors jars
# jar in the system classpath
For example, the user implements a FlinkKafkaPartitioner we could call it
DynamicFlinkKafkaPartitioner. And we keep creating the source operator at the
client. At the client side the Kafka source class could be found at the
connector jars by the FlinkUserCodeClassloader. When creating the
DynamicFlinkKafaPartitioner object at task manager side by the reflection:
# Find the FlinkKafkaPartition class in the connector jars by the
FlinkUserCodeClassloader
# Find the DynamicFlinkKafaParitioner class in the user class path by the
FlinkUserCodeClassloader.
The pros of XClassLoader are that there no additional burden for connector
developer/user. We could always set the XClassLoader as the context class
loader of stream task.
was (Author: maguowei):
I agree that we could focus on separating the connector class loader from the
user class loader in the Table/SQL stack at first. However, I think we should
resolve the FlinkKafkaPartitioner problem even if we only focus on the
Table/SQL stack because the Table/SQL user might provide their own
FlinkKafkaPartitioner implementation in theory.
There might be two options:
# The FlinkKafkaPartitioner could be treated as an SPI. User could let
FlinkKafaPartitioner load by the parent class loader. For example the Kafka
connector could be splitted into two jars. One is for the SPI for user to
extended. User could put the Kafka SPI jar to the lib/ directory and put the
other one to plugin dir. I think we might keep the same as it is at the
client-side. However, IMHO the FlinkKafkaPartitioner SPI has some differences
with other Flink SPI such as Source/Sink SPI. Source/Sink SPI would be used by
the Flink and FlinkKafakaPartitioner would only be used by FlinkKakfaConnector.
In my view, it might be a little tricky that we let the user put the SPI of a
specific connector to the parent class loader. This also gives extra burden to
the connector developer and the end-user when deploying a job.
# Putting the customized FlinkKafkaPartioner and FlinkKafkaPartitioner
interface together into the plugin directory. However, there might need some
change at the client side because the user would reference the customized
FlinkKafkaPartition and it is not in the user classpath. For example, let the
user class path include the plugin directory at the client-side. But I think it
is not a good choice that there is different behavior of user code class loader
in client and task manager side. Or we could only allow the user to reference
the customized FlinkKafaPatitioner as a string.
In general the above two options keep the user and connector class loader
orthogonal. I think it is a good choice. However, what we might pay some
attention to is how to set the thread's context classloader. Currently,
TaskManager sets the FlinkUserCodeClassloader to context class loader.
Sometimes the operator needs the connector class loader for some class. Such as
Flink-16262. Sometimes the operator needs the FlinkUserCodeClassloader for
customized FlinkKafkaPartitioner. The connector author should pay much
attention to these things.
XClassLoader means that it could know where to find the user class and the
connector class. For example, we could make FlinkUserCodeClassloader contain
two type class loaders: one is system class loader and the other is the
connecter class loaders which could come from the plugin system. The order of
searching for a class is:
# jar in the user classpath
# jar in the connectors jars
# jar in the system classpath
For example, the user implements a FlinkKafkaPartitioner we could call it
DynamicFlinkKafkaPartitioner. And we keep creating the source operator at the
client. At the client side the Kafka source class could be found at the
connector jars by the FlinkUserCodeClassloader. When creating the
DynamicFlinkKafaPartitioner object at task manager side by the reflection:
# Find the FlinkKafkaPartition class in the connector jars by the
FlinkUserCodeClassloader
# Find the DynamicFlinkKafaParitioner class in the user class path by the
FlinkUserCodeClassloader.
The pros of XClassLoader are that there no additional burden for connector
developer/user. We could always set the XClassLoader as the context class
loader of stream task.
> Load connector code with separate classloader
> ---------------------------------------------
>
> Key: FLINK-15786
> URL: https://issues.apache.org/jira/browse/FLINK-15786
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Task
> Reporter: Guowei Ma
> Priority: Major
> Labels: usability
>
> Currently, connector code can be seen as part of user code. Usually, users
> only need to add the corresponding connector as a dependency and package it
> in the user jar. This is convenient enough.
> However, connectors usually need to interact with external systems and often
> introduce heavy dependencies, there is a high possibility of a class conflict
> of different connectors or the user code of the same job. For example, every
> one or two weeks, we will receive issue reports relevant with connector class
> conflict from our users. The problem can get worse when users want to analyze
> data from different sources and write output to different sinks.
> Using separate classloader to load the different connector code could resolve
> the problem.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)