This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 05e57dd3a44 [fix][fn] Fix JavaInstanceStarter inferring type class 
name error (#19896)
05e57dd3a44 is described below

commit 05e57dd3a443c5b99c21054c56a1b497455fa867
Author: jiangpengcheng <[email protected]>
AuthorDate: Wed May 24 16:41:52 2023 +0800

    [fix][fn] Fix JavaInstanceStarter inferring type class name error (#19896)
---
 .../functions/runtime/JavaInstanceStarter.java     | 19 +++++++++++-----
 .../functions/runtime/thread/ThreadRuntime.java    | 19 +++++++++-------
 .../runtime/thread/ThreadRuntimeFactory.java       | 26 ++++++++++++++++++----
 3 files changed, 47 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index c4f44be3df3..33e837d66e2 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -49,10 +49,13 @@ import 
org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
+import 
org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
 
 
 @Slf4j
@@ -192,7 +195,10 @@ public class JavaInstanceStarter implements AutoCloseable {
             functionDetailsJsonString = functionDetailsJsonString.substring(0, 
functionDetailsJsonString.length() - 1);
         }
         JsonFormat.parser().merge(functionDetailsJsonString, 
functionDetailsBuilder);
-        inferringMissingTypeClassName(functionDetailsBuilder, 
functionInstanceClassLoader);
+        FunctionCacheManager fnCache = new 
FunctionCacheManagerImpl(rootClassLoader);
+        ClassLoader functionClassLoader = ThreadRuntime.loadJars(jarFile, 
instanceConfig, functionId,
+                functionDetailsBuilder.getName(), narExtractionDirectory, 
fnCache);
+        inferringMissingTypeClassName(functionDetailsBuilder, 
functionClassLoader);
         Function.FunctionDetails functionDetails = 
functionDetailsBuilder.build();
         instanceConfig.setFunctionDetails(functionDetails);
         instanceConfig.setPort(port);
@@ -237,7 +243,7 @@ public class JavaInstanceStarter implements AutoCloseable {
                         
.tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled))
                         .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
                 secretsProvider, collectorRegistry, narExtractionDirectory, 
rootClassLoader,
-                exposePulsarAdminClientEnabled, webServiceUrl);
+                exposePulsarAdminClientEnabled, webServiceUrl, fnCache);
         runtimeSpawner = new RuntimeSpawner(
                 instanceConfig,
                 jarFile,
@@ -329,7 +335,8 @@ public class JavaInstanceStarter implements AutoCloseable {
                     Map<String, Object> userConfigs = new 
Gson().fromJson(functionDetailsBuilder.getUserConfig(),
                             new TypeToken<Map<String, Object>>() {
                             }.getType());
-                    boolean isWindowConfigPresent = 
userConfigs.containsKey(WindowConfig.WINDOW_CONFIG_KEY);
+                    boolean isWindowConfigPresent =
+                            userConfigs != null && 
userConfigs.containsKey(WindowConfig.WINDOW_CONFIG_KEY);
                     String className = functionDetailsBuilder.getClassName();
                     if (isWindowConfigPresent) {
                         WindowConfig windowConfig = new Gson().fromJson(
@@ -360,7 +367,8 @@ public class JavaInstanceStarter implements AutoCloseable {
             case SINK:
                 if ((functionDetailsBuilder.hasSink()
                         && 
functionDetailsBuilder.getSink().getTypeClassName().isEmpty())) {
-                    String typeArg = 
getSinkType(functionDetailsBuilder.getClassName(), classLoader).getName();
+                    String typeArg =
+                            
getSinkType(functionDetailsBuilder.getSink().getClassName(), 
classLoader).getName();
 
                     Function.SinkSpec.Builder sinkBuilder =
                             
Function.SinkSpec.newBuilder(functionDetailsBuilder.getSink());
@@ -378,7 +386,8 @@ public class JavaInstanceStarter implements AutoCloseable {
             case SOURCE:
                 if ((functionDetailsBuilder.hasSource()
                         && 
functionDetailsBuilder.getSource().getTypeClassName().isEmpty())) {
-                    String typeArg = 
getSourceType(functionDetailsBuilder.getClassName(), classLoader).getName();
+                    String typeArg =
+                            
getSourceType(functionDetailsBuilder.getSource().getClassName(), 
classLoader).getName();
 
                     Function.SourceSpec.Builder sourceBuilder =
                             
Function.SourceSpec.newBuilder(functionDetailsBuilder.getSource());
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index 0aa0cd95aef..ed128568bcf 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -137,14 +137,16 @@ public class ThreadRuntime implements Runtime {
                         .getClassLoader();
             }
         }
-        return loadJars(jarFile, instanceConfig, functionId, 
narExtractionDirectory, fnCache);
+        return loadJars(jarFile, instanceConfig, functionId, 
instanceConfig.getFunctionDetails().getName(),
+                narExtractionDirectory, fnCache);
     }
 
-    private static ClassLoader loadJars(String jarFile,
-                                 InstanceConfig instanceConfig,
-                                 String functionId,
-                                 String narExtractionDirectory,
-                                 FunctionCacheManager fnCache) throws 
Exception {
+    public static ClassLoader loadJars(String jarFile,
+                                       InstanceConfig instanceConfig,
+                                       String functionId,
+                                       String functionName,
+                                       String narExtractionDirectory,
+                                       FunctionCacheManager fnCache) throws 
Exception {
         if (jarFile == null) {
             return Thread.currentThread().getContextClassLoader();
         }
@@ -175,8 +177,9 @@ public class ThreadRuntime implements Runtime {
                     Collections.emptyList());
         }
 
-        log.info("Initialize function class loader for function {} at function 
cache manager, functionClassLoader: {}",
-                instanceConfig.getFunctionDetails().getName(), 
fnCache.getClassLoader(functionId));
+        log.info(
+                "Initialize function class loader for function {} at function 
cache manager, functionClassLoader: {}",
+                functionName, fnCache.getClassLoader(functionId));
 
         fnClassLoader = fnCache.getClassLoader(functionId);
         if (null == fnClassLoader) {
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index 7bc055b25d6..cb9ad27a2df 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -86,7 +86,21 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
                 stateStorageImplClass, storageServiceUrl, null, 
secretsProvider, collectorRegistry,
                 narExtractionDirectory,
                 rootClassLoader, exposePulsarAdminClientEnabled, 
pulsarWebServiceUrl, Optional.empty(),
-                Optional.empty());
+                Optional.empty(), null);
+    }
+
+    public ThreadRuntimeFactory(String threadGroupName, String 
pulsarServiceUrl,
+                                String stateStorageImplClass,
+                                String storageServiceUrl,
+                                AuthenticationConfig authConfig, 
SecretsProvider secretsProvider,
+                                FunctionCollectorRegistry collectorRegistry, 
String narExtractionDirectory,
+                                ClassLoader rootClassLoader, boolean 
exposePulsarAdminClientEnabled,
+                                String pulsarWebServiceUrl, 
FunctionCacheManager fnCache) throws Exception {
+        initialize(threadGroupName, Optional.empty(), pulsarServiceUrl, 
authConfig,
+                stateStorageImplClass, storageServiceUrl, null, 
secretsProvider, collectorRegistry,
+                narExtractionDirectory,
+                rootClassLoader, exposePulsarAdminClientEnabled, 
pulsarWebServiceUrl, Optional.empty(),
+                Optional.empty(), fnCache);
     }
 
     private void initialize(String threadGroupName, 
Optional<ThreadRuntimeFactoryConfig.MemoryLimit> memoryLimit,
@@ -96,7 +110,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
                             FunctionCollectorRegistry collectorRegistry, 
String narExtractionDirectory,
                             ClassLoader rootClassLoader, boolean 
exposePulsarAdminClientEnabled,
                             String pulsarWebServiceUrl, 
Optional<ConnectorsManager> connectorsManager,
-                            Optional<FunctionsManager> functionsManager)
+                            Optional<FunctionsManager> functionsManager, 
FunctionCacheManager fnCache)
             throws PulsarClientException {
 
         if (rootClassLoader == null) {
@@ -106,7 +120,10 @@ public class ThreadRuntimeFactory implements 
RuntimeFactory {
         this.rootClassLoader = rootClassLoader;
         this.secretsProviderConfigurator = secretsProviderConfigurator;
         this.defaultSecretsProvider = secretsProvider;
-        this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
+        this.fnCache = fnCache;
+        if (fnCache == null) {
+            this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
+        }
         this.threadGroup = new ThreadGroup(threadGroupName);
         this.pulsarAdmin =
                 exposePulsarAdminClientEnabled ? 
InstanceUtils.createPulsarAdminClient(pulsarWebServiceUrl, authConfig)
@@ -171,7 +188,8 @@ public class ThreadRuntimeFactory implements RuntimeFactory 
{
                 workerConfig.getStateStorageServiceUrl(), 
secretsProviderConfigurator, null,
                 null, workerConfig.getNarExtractionDirectory(), null,
                 workerConfig.isExposeAdminClientEnabled(),
-                workerConfig.getPulsarWebServiceUrl(), 
Optional.of(connectorsManager), Optional.of(functionsManager));
+                workerConfig.getPulsarWebServiceUrl(), 
Optional.of(connectorsManager), Optional.of(functionsManager),
+                null);
     }
 
     @Override

Reply via email to