jerrypeng commented on a change in pull request #9500:
URL: https://github.com/apache/pulsar/pull/9500#discussion_r572351249
##########
File path:
pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
##########
@@ -84,34 +95,82 @@
this.secretsProvider = secretsProvider;
this.collectorRegistry = collectorRegistry;
this.narExtractionDirectory = narExtractionDirectory;
- this.javaInstanceRunnable = new JavaInstanceRunnable(
- instanceConfig,
- fnCache,
- jarFile,
- pulsarClient,
- pulsarAdmin,
- stateStorageServiceUrl,
- secretsProvider,
- collectorRegistry,
- narExtractionDirectory);
+ this.connectorsManager = connectorsManager;
+ }
+
+ private static ClassLoader getFunctionClassLoader(InstanceConfig
instanceConfig,
+ String jarFile,
+ String
narExtractionDirectory,
+ FunctionCacheManager
fnCache,
+
Optional<ConnectorsManager> connectorsManager) throws Exception {
+
+ if
(FunctionCommon.isFunctionCodeBuiltin(instanceConfig.getFunctionDetails())
+ && connectorsManager.isPresent()) {
+ switch
(InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails())) {
+ case SOURCE:
+ return connectorsManager.get().getConnector(
+
instanceConfig.getFunctionDetails().getSource().getBuiltin()).getClassLoader();
+ case SINK:
+ return connectorsManager.get().getConnector(
+
instanceConfig.getFunctionDetails().getSink().getBuiltin()).getClassLoader();
+ default:
+ return loadJars(jarFile, instanceConfig,
narExtractionDirectory, fnCache);
+ }
+ } else {
+ return loadJars(jarFile, instanceConfig, narExtractionDirectory,
fnCache);
+ }
+ }
+
+ private static ClassLoader loadJars(String jarFile,
+ InstanceConfig instanceConfig,
+ String narExtractionDirectory,
+ FunctionCacheManager fnCache) throws
Exception {
+ ClassLoader fnClassLoader;
+ try {
+ log.info("Load JAR: {}", jarFile);
+ // Let's first try to treat it as a nar archive
+ fnCache.registerFunctionInstanceWithArchive(
+ instanceConfig.getFunctionId(),
+ instanceConfig.getInstanceName(),
+ jarFile, narExtractionDirectory);
+ } catch (FileNotFoundException e) {
+ // create the function class loader
+ fnCache.registerFunctionInstance(
+ instanceConfig.getFunctionId(),
+ instanceConfig.getInstanceName(),
+ Arrays.asList(jarFile),
+ Collections.emptyList());
+ }
+
+ log.info("Initialize function class loader for function {} at function
cache manager, functionClassLoader: {}",
+ instanceConfig.getFunctionDetails().getName(),
fnCache.getClassLoader(instanceConfig.getFunctionId()));
+
+ fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
+ if (null == fnClassLoader) {
+ throw new Exception("No function class loader available.");
+ }
Review comment:
That is the existing logic. I rather not change it in this PR. It is
kind of out of scope.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]