baisui1981 commented on pull request #17521:
URL: https://github.com/apache/flink/pull/17521#issuecomment-953619514


   > I don't think it is a good idea to give users direct access to this part 
of the code. It just yet again increases the API surface, and for some _very_ 
important internal thing that we need to be able to change at a whim.
   > 
   > Furthermore, I don't see yet how this would solve the issue at hand. The 
proposed interface provides no differentiating factor that could be used to 
create different classloaders for each task (like the Task ID). Even then, the 
classloader is shared across different tasks running on the same TM, so it must 
behave the same way? Given that they all have access to the same jars, I'm 
curious how the behavior is supposed to be different in the first place.
   > 
   > All in all, I think this needs way more discussion.
   
   hi @zentol thanks for your reply,  We are building a data center product 
based on flink, expecting to integrate various third-party components which is 
provided as flink `SourceFunction`(like [various source cdc connectors 
](https://github.com/ververica/flink-cdc-connectors)) and `SinkFunction`(like 
[elasticsearch7](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-elasticsearch7))
 and so on, then a new problem is always unavoidable as described in the 
[FLINK-24558](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-24558?filter=allissues)。
   to solve this problem, i am  intend to add an extend point for customized 
`org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.ClassLoaderFactory`
 ,the main motivation is **making the parent class can be variable to my 
designated parent classLoader**, the parent classloader can be delegated to 
load clientSide class which is wrapped in multiple plugin bundle
   
    in my project I have written a class 
[TISFlinClassLoaderFactory](https://github.com/qlangtech/plugins/blob/v3.1.0/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/plugins/incr/flink/TISFlinClassLoaderFactory.java)
 by extend with new introduced interface of `ClassLoaderFactoryBuilder ` and 
write an [service meta 
configuration](https://github.com/qlangtech/plugins/blob/v3.1.0/tis-incr/tis-flink-extends/src/main/resources/META-INF/services/org.apache.flink.runtime.execution.librarycache.ClassLoaderFactoryBuilder)
 ,and then , make a package of the artifact, put it in dir **$FLINK_HOME/lib**
   
   > The proposed interface provides no differentiating factor that could be 
used to create different classloaders for each task (like the Task ID)
   
   thanks for @zentol your reminding , maybe I shall make the parent 
classloader unchangeable, because as you say it will shared across different 
tasks running on the same TM,  Instead , extend directly from 
`FlinkUserCodeClassLoader`  would be better, example: 
[TISFlinClassLoaderFactory](https://github.com/qlangtech/plugins/blob/v3.1.0/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/plugins/incr/flink/TISChildFirstClassLoader.java)
   
    plugin inventory (**differentiating factor that could be used to create 
different classloaders**) is store in  jar manifest  , that submit from flink 
clientSide as param `libraryURLs`. On serverSide extract the `plugin inventory` 
by parse the jar manifest, and then take the `plugin inventory` as param to 
initialize the PluginManager for creating the uberClassloader .
   ``` java
   TISFlinClassLoaderFactory.java
   
       public BlobLibraryCacheManager.ClassLoaderFactory 
buildServerLoaderFactory(
               FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder
               , String[] alwaysParentFirstPatterns, @Nullable 
Consumer<Throwable> exceptionHander, boolean checkClassLoaderLeak) {
   
           return new 
BlobLibraryCacheManager.DefaultClassLoaderFactory(classLoaderResolveOrder
                   , alwaysParentFirstPatterns, exceptionHander, 
checkClassLoaderLeak) {
   
               @Override
               public URLClassLoader createClassLoader(URL[] libraryURLs) {
   
                   try {
                       PluginManager pluginManager = 
TIS.get().getPluginManager();
                       if (libraryURLs.length != 1) {
                           throw new IllegalStateException("length of 
libraryURLs must be 1 , but now is:" + libraryURLs.length);
                       }
                       for (URL lib : libraryURLs) {
                           try (JarInputStream jarReader = new 
JarInputStream(lib.openStream())) {
                               Manifest manifest = jarReader.getManifest();
                               Attributes pluginInventory = 
manifest.getAttributes("plugin_inventory");
                               if (pluginInventory == null) {
                                   throw new IllegalStateException("plugin 
inventory can not be empty in lib:" + lib);
                               }
                               for (Map.Entry<Object, Object> pluginDesc : 
pluginInventory.entrySet()) {
                                   
pluginManager.dynamicLoadPlugin(String.valueOf(pluginDesc.getKey()));
                               }
                           }
                       }
                       return new 
TISChildFirstClassLoader(pluginManager.uberClassLoader, libraryURLs, 
this.getParentClassLoader()
                               , this.alwaysParentFirstPatterns, 
this.classLoadingExceptionHandler);
                   } catch (IOException e) {
                       throw new RuntimeException(e);
                   }
               }
           };
       }
   ```
   @zentol how about it , give me some suggestions ,thanks
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to