jerrypeng commented on a change in pull request #9500:
URL: https://github.com/apache/pulsar/pull/9500#discussion_r572351249



##########
File path: 
pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
##########
@@ -84,34 +95,82 @@
         this.secretsProvider = secretsProvider;
         this.collectorRegistry = collectorRegistry;
         this.narExtractionDirectory = narExtractionDirectory;
-        this.javaInstanceRunnable = new JavaInstanceRunnable(
-                instanceConfig,
-                fnCache,
-                jarFile,
-                pulsarClient,
-                pulsarAdmin,
-                stateStorageServiceUrl,
-                secretsProvider,
-                collectorRegistry,
-                narExtractionDirectory);
+        this.connectorsManager = connectorsManager;
+    }
+
+    private static ClassLoader getFunctionClassLoader(InstanceConfig 
instanceConfig,
+                                                      String jarFile,
+                                                      String 
narExtractionDirectory,
+                                                      FunctionCacheManager 
fnCache,
+                                                      
Optional<ConnectorsManager> connectorsManager) throws Exception {
+
+        if 
(FunctionCommon.isFunctionCodeBuiltin(instanceConfig.getFunctionDetails())
+                && connectorsManager.isPresent()) {
+            switch 
(InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails())) {
+                case SOURCE:
+                    return connectorsManager.get().getConnector(
+                            
instanceConfig.getFunctionDetails().getSource().getBuiltin()).getClassLoader();
+                case SINK:
+                    return connectorsManager.get().getConnector(
+                            
instanceConfig.getFunctionDetails().getSink().getBuiltin()).getClassLoader();
+                default:
+                    return loadJars(jarFile, instanceConfig, 
narExtractionDirectory, fnCache);
+            }
+        } else {
+            return loadJars(jarFile, instanceConfig, narExtractionDirectory, 
fnCache);
+        }
+    }
+
+    private static ClassLoader loadJars(String jarFile,
+                                 InstanceConfig instanceConfig,
+                                 String narExtractionDirectory,
+                                 FunctionCacheManager fnCache) throws 
Exception {
+        ClassLoader fnClassLoader;
+        try {
+            log.info("Load JAR: {}", jarFile);
+            // Let's first try to treat it as a nar archive
+            fnCache.registerFunctionInstanceWithArchive(
+                    instanceConfig.getFunctionId(),
+                    instanceConfig.getInstanceName(),
+                    jarFile, narExtractionDirectory);
+        } catch (FileNotFoundException e) {
+            // create the function class loader
+            fnCache.registerFunctionInstance(
+                    instanceConfig.getFunctionId(),
+                    instanceConfig.getInstanceName(),
+                    Arrays.asList(jarFile),
+                    Collections.emptyList());
+        }
+
+        log.info("Initialize function class loader for function {} at function 
cache manager, functionClassLoader: {}",
+                instanceConfig.getFunctionDetails().getName(), 
fnCache.getClassLoader(instanceConfig.getFunctionId()));
+
+        fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
+        if (null == fnClassLoader) {
+            throw new Exception("No function class loader available.");
+        }

Review comment:
       That is the existing logic.  I rather not change it in this PR.  It is 
kind of out of scope.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to