This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 05e57dd3a44 [fix][fn] Fix JavaInstanceStarter inferring type class
name error (#19896)
05e57dd3a44 is described below
commit 05e57dd3a443c5b99c21054c56a1b497455fa867
Author: jiangpengcheng <[email protected]>
AuthorDate: Wed May 24 16:41:52 2023 +0800
[fix][fn] Fix JavaInstanceStarter inferring type class name error (#19896)
---
.../functions/runtime/JavaInstanceStarter.java | 19 +++++++++++-----
.../functions/runtime/thread/ThreadRuntime.java | 19 +++++++++-------
.../runtime/thread/ThreadRuntimeFactory.java | 26 ++++++++++++++++++----
3 files changed, 47 insertions(+), 17 deletions(-)
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index c4f44be3df3..33e837d66e2 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -49,10 +49,13 @@ import
org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
+import
org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
@Slf4j
@@ -192,7 +195,10 @@ public class JavaInstanceStarter implements AutoCloseable {
functionDetailsJsonString = functionDetailsJsonString.substring(0,
functionDetailsJsonString.length() - 1);
}
JsonFormat.parser().merge(functionDetailsJsonString,
functionDetailsBuilder);
- inferringMissingTypeClassName(functionDetailsBuilder,
functionInstanceClassLoader);
+ FunctionCacheManager fnCache = new
FunctionCacheManagerImpl(rootClassLoader);
+ ClassLoader functionClassLoader = ThreadRuntime.loadJars(jarFile,
instanceConfig, functionId,
+ functionDetailsBuilder.getName(), narExtractionDirectory,
fnCache);
+ inferringMissingTypeClassName(functionDetailsBuilder,
functionClassLoader);
Function.FunctionDetails functionDetails =
functionDetailsBuilder.build();
instanceConfig.setFunctionDetails(functionDetails);
instanceConfig.setPort(port);
@@ -237,7 +243,7 @@ public class JavaInstanceStarter implements AutoCloseable {
.tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled))
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
secretsProvider, collectorRegistry, narExtractionDirectory,
rootClassLoader,
- exposePulsarAdminClientEnabled, webServiceUrl);
+ exposePulsarAdminClientEnabled, webServiceUrl, fnCache);
runtimeSpawner = new RuntimeSpawner(
instanceConfig,
jarFile,
@@ -329,7 +335,8 @@ public class JavaInstanceStarter implements AutoCloseable {
Map<String, Object> userConfigs = new
Gson().fromJson(functionDetailsBuilder.getUserConfig(),
new TypeToken<Map<String, Object>>() {
}.getType());
- boolean isWindowConfigPresent =
userConfigs.containsKey(WindowConfig.WINDOW_CONFIG_KEY);
+ boolean isWindowConfigPresent =
+ userConfigs != null &&
userConfigs.containsKey(WindowConfig.WINDOW_CONFIG_KEY);
String className = functionDetailsBuilder.getClassName();
if (isWindowConfigPresent) {
WindowConfig windowConfig = new Gson().fromJson(
@@ -360,7 +367,8 @@ public class JavaInstanceStarter implements AutoCloseable {
case SINK:
if ((functionDetailsBuilder.hasSink()
&&
functionDetailsBuilder.getSink().getTypeClassName().isEmpty())) {
- String typeArg =
getSinkType(functionDetailsBuilder.getClassName(), classLoader).getName();
+ String typeArg =
+
getSinkType(functionDetailsBuilder.getSink().getClassName(),
classLoader).getName();
Function.SinkSpec.Builder sinkBuilder =
Function.SinkSpec.newBuilder(functionDetailsBuilder.getSink());
@@ -378,7 +386,8 @@ public class JavaInstanceStarter implements AutoCloseable {
case SOURCE:
if ((functionDetailsBuilder.hasSource()
&&
functionDetailsBuilder.getSource().getTypeClassName().isEmpty())) {
- String typeArg =
getSourceType(functionDetailsBuilder.getClassName(), classLoader).getName();
+ String typeArg =
+
getSourceType(functionDetailsBuilder.getSource().getClassName(),
classLoader).getName();
Function.SourceSpec.Builder sourceBuilder =
Function.SourceSpec.newBuilder(functionDetailsBuilder.getSource());
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index 0aa0cd95aef..ed128568bcf 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -137,14 +137,16 @@ public class ThreadRuntime implements Runtime {
.getClassLoader();
}
}
- return loadJars(jarFile, instanceConfig, functionId,
narExtractionDirectory, fnCache);
+ return loadJars(jarFile, instanceConfig, functionId,
instanceConfig.getFunctionDetails().getName(),
+ narExtractionDirectory, fnCache);
}
- private static ClassLoader loadJars(String jarFile,
- InstanceConfig instanceConfig,
- String functionId,
- String narExtractionDirectory,
- FunctionCacheManager fnCache) throws
Exception {
+ public static ClassLoader loadJars(String jarFile,
+ InstanceConfig instanceConfig,
+ String functionId,
+ String functionName,
+ String narExtractionDirectory,
+ FunctionCacheManager fnCache) throws
Exception {
if (jarFile == null) {
return Thread.currentThread().getContextClassLoader();
}
@@ -175,8 +177,9 @@ public class ThreadRuntime implements Runtime {
Collections.emptyList());
}
- log.info("Initialize function class loader for function {} at function
cache manager, functionClassLoader: {}",
- instanceConfig.getFunctionDetails().getName(),
fnCache.getClassLoader(functionId));
+ log.info(
+ "Initialize function class loader for function {} at function
cache manager, functionClassLoader: {}",
+ functionName, fnCache.getClassLoader(functionId));
fnClassLoader = fnCache.getClassLoader(functionId);
if (null == fnClassLoader) {
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index 7bc055b25d6..cb9ad27a2df 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -86,7 +86,21 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
stateStorageImplClass, storageServiceUrl, null,
secretsProvider, collectorRegistry,
narExtractionDirectory,
rootClassLoader, exposePulsarAdminClientEnabled,
pulsarWebServiceUrl, Optional.empty(),
- Optional.empty());
+ Optional.empty(), null);
+ }
+
+ public ThreadRuntimeFactory(String threadGroupName, String
pulsarServiceUrl,
+ String stateStorageImplClass,
+ String storageServiceUrl,
+ AuthenticationConfig authConfig,
SecretsProvider secretsProvider,
+ FunctionCollectorRegistry collectorRegistry,
String narExtractionDirectory,
+ ClassLoader rootClassLoader, boolean
exposePulsarAdminClientEnabled,
+ String pulsarWebServiceUrl,
FunctionCacheManager fnCache) throws Exception {
+ initialize(threadGroupName, Optional.empty(), pulsarServiceUrl,
authConfig,
+ stateStorageImplClass, storageServiceUrl, null,
secretsProvider, collectorRegistry,
+ narExtractionDirectory,
+ rootClassLoader, exposePulsarAdminClientEnabled,
pulsarWebServiceUrl, Optional.empty(),
+ Optional.empty(), fnCache);
}
private void initialize(String threadGroupName,
Optional<ThreadRuntimeFactoryConfig.MemoryLimit> memoryLimit,
@@ -96,7 +110,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
FunctionCollectorRegistry collectorRegistry,
String narExtractionDirectory,
ClassLoader rootClassLoader, boolean
exposePulsarAdminClientEnabled,
String pulsarWebServiceUrl,
Optional<ConnectorsManager> connectorsManager,
- Optional<FunctionsManager> functionsManager)
+ Optional<FunctionsManager> functionsManager,
FunctionCacheManager fnCache)
throws PulsarClientException {
if (rootClassLoader == null) {
@@ -106,7 +120,10 @@ public class ThreadRuntimeFactory implements
RuntimeFactory {
this.rootClassLoader = rootClassLoader;
this.secretsProviderConfigurator = secretsProviderConfigurator;
this.defaultSecretsProvider = secretsProvider;
- this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
+ this.fnCache = fnCache;
+ if (fnCache == null) {
+ this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
+ }
this.threadGroup = new ThreadGroup(threadGroupName);
this.pulsarAdmin =
exposePulsarAdminClientEnabled ?
InstanceUtils.createPulsarAdminClient(pulsarWebServiceUrl, authConfig)
@@ -171,7 +188,8 @@ public class ThreadRuntimeFactory implements RuntimeFactory
{
workerConfig.getStateStorageServiceUrl(),
secretsProviderConfigurator, null,
null, workerConfig.getNarExtractionDirectory(), null,
workerConfig.isExposeAdminClientEnabled(),
- workerConfig.getPulsarWebServiceUrl(),
Optional.of(connectorsManager), Optional.of(functionsManager));
+ workerConfig.getPulsarWebServiceUrl(),
Optional.of(connectorsManager), Optional.of(functionsManager),
+ null);
}
@Override