This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 359cfa7bc05 Factorize code in LocalRunner (#17400)
359cfa7bc05 is described below
commit 359cfa7bc05775bf6dd004f21b9907610ed3b3d5
Author: Christophe Bornet <[email protected]>
AuthorDate: Sat Sep 3 09:18:01 2022 +0200
Factorize code in LocalRunner (#17400)
---
.../org/apache/pulsar/functions/LocalRunner.java | 200 +++++++++------------
1 file changed, 84 insertions(+), 116 deletions(-)
diff --git
a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 6c7551f8ac3..a308c98b3da 100644
---
a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++
b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -31,6 +31,7 @@ import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.LinkedList;
@@ -59,6 +60,7 @@ import
org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function;
+import
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
@@ -337,47 +339,10 @@ public class LocalRunner implements AutoCloseable {
parallelism = functionConfig.getParallelism();
if (functionConfig.getRuntime() ==
FunctionConfig.Runtime.JAVA) {
userCodeFile = functionConfig.getJar();
- ClassLoader builtInFunctionClassLoader = userCodeFile !=
null
- ? isBuiltInFunction(userCodeFile)
- : null;
- if (builtInFunctionClassLoader != null) {
- userCodeClassLoader = builtInFunctionClassLoader;
- functionDetails = FunctionConfigUtils.convert(
- functionConfig,
-
FunctionConfigUtils.validateJavaFunction(functionConfig,
builtInFunctionClassLoader));
- } else if (userCodeFile != null &&
Utils.isFunctionPackageUrlSupported(userCodeFile)) {
- File file =
FunctionCommon.extractFileFromPkgURL(userCodeFile);
- ClassLoader functionClassLoader =
FunctionCommon.getClassLoaderFromPackage(
-
Function.FunctionDetails.ComponentType.FUNCTION,
- functionConfig.getClassName(), file,
narExtractionDirectory);
- functionDetails = FunctionConfigUtils.convert(
- functionConfig,
-
FunctionConfigUtils.validateJavaFunction(functionConfig, functionClassLoader));
- userCodeClassLoader = functionClassLoader;
- userCodeClassLoaderCreated = true;
- } else if (userCodeFile != null) {
- File file = new File(userCodeFile);
- if (!file.exists()) {
- throw new RuntimeException("User jar does not
exist");
- }
- ClassLoader functionClassLoader =
FunctionCommon.getClassLoaderFromPackage(
-
Function.FunctionDetails.ComponentType.FUNCTION,
- functionConfig.getClassName(), file,
narExtractionDirectory);
- functionDetails = FunctionConfigUtils.convert(
- functionConfig,
-
FunctionConfigUtils.validateJavaFunction(functionConfig, functionClassLoader));
- userCodeClassLoader = functionClassLoader;
- userCodeClassLoaderCreated = true;
- } else {
- if (!(runtimeEnv == null || runtimeEnv ==
RuntimeEnv.THREAD)) {
- throw new IllegalStateException("The jar property
must be specified in FunctionConfig.");
- }
- functionDetails = FunctionConfigUtils.convert(
- functionConfig,
- FunctionConfigUtils.validateJavaFunction(
- functionConfig,
-
Thread.currentThread().getContextClassLoader()));
- }
+ ClassLoader functionClassLoader = extractClassLoader(
+ userCodeFile, ComponentType.FUNCTION,
functionConfig.getClassName());
+ functionDetails = FunctionConfigUtils.convert(
+ functionConfig,
FunctionConfigUtils.validateJavaFunction(functionConfig, functionClassLoader));
} else if (functionConfig.getRuntime() ==
FunctionConfig.Runtime.GO) {
userCodeFile = functionConfig.getGo();
} else if (functionConfig.getRuntime() ==
FunctionConfig.Runtime.PYTHON) {
@@ -395,85 +360,18 @@ public class LocalRunner implements AutoCloseable {
inferMissingArguments(sourceConfig);
userCodeFile = sourceConfig.getArchive();
parallelism = sourceConfig.getParallelism();
-
- ClassLoader builtInSourceClassLoader = userCodeFile != null ?
isBuiltInSource(userCodeFile) : null;
- if (builtInSourceClassLoader != null) {
- functionDetails = SourceConfigUtils.convert(
- sourceConfig,
SourceConfigUtils.validateAndExtractDetails(
- sourceConfig, builtInSourceClassLoader,
true));
- userCodeClassLoader = builtInSourceClassLoader;
- } else if (userCodeFile != null &&
Utils.isFunctionPackageUrlSupported(userCodeFile)) {
- File file =
FunctionCommon.extractFileFromPkgURL(userCodeFile);
- ClassLoader sourceClassLoader =
FunctionCommon.getClassLoaderFromPackage(
- Function.FunctionDetails.ComponentType.SOURCE,
- sourceConfig.getClassName(), file,
narExtractionDirectory);
- functionDetails = SourceConfigUtils.convert(
- sourceConfig,
-
SourceConfigUtils.validateAndExtractDetails(sourceConfig, sourceClassLoader,
true));
- userCodeClassLoader = sourceClassLoader;
- userCodeClassLoaderCreated = true;
- } else if (userCodeFile != null) {
- File file = new File(userCodeFile);
- if (!file.exists()) {
- throw new RuntimeException("Source archive (" +
userCodeFile + ") does not exist");
- }
- ClassLoader sourceClassLoader =
FunctionCommon.getClassLoaderFromPackage(
- Function.FunctionDetails.ComponentType.SOURCE,
- sourceConfig.getClassName(), file,
narExtractionDirectory);
- functionDetails = SourceConfigUtils.convert(sourceConfig,
-
SourceConfigUtils.validateAndExtractDetails(sourceConfig, sourceClassLoader,
true));
- userCodeClassLoader = sourceClassLoader;
- userCodeClassLoaderCreated = true;
- } else {
- if (!(runtimeEnv == null || runtimeEnv ==
RuntimeEnv.THREAD)) {
- throw new IllegalStateException("The archive property
must be specified in SourceConfig.");
- }
- functionDetails = SourceConfigUtils.convert(
- sourceConfig,
SourceConfigUtils.validateAndExtractDetails(
- sourceConfig,
Thread.currentThread().getContextClassLoader(), true));
- }
+ ClassLoader sourceClassLoader = extractClassLoader(
+ userCodeFile, ComponentType.SOURCE,
sourceConfig.getClassName());
+ functionDetails = SourceConfigUtils.convert(
+ sourceConfig,
SourceConfigUtils.validateAndExtractDetails(sourceConfig, sourceClassLoader,
true));
} else if (sinkConfig != null) {
inferMissingArguments(sinkConfig);
userCodeFile = sinkConfig.getArchive();
parallelism = sinkConfig.getParallelism();
-
- ClassLoader builtInSinkClassLoader = userCodeFile != null ?
isBuiltInSink(userCodeFile) : null;
- if (builtInSinkClassLoader != null) {
- functionDetails = SinkConfigUtils.convert(
- sinkConfig,
SinkConfigUtils.validateAndExtractDetails(
- sinkConfig, builtInSinkClassLoader, null,
true));
- userCodeClassLoader = builtInSinkClassLoader;
- } else if (Utils.isFunctionPackageUrlSupported(userCodeFile)) {
- File file =
FunctionCommon.extractFileFromPkgURL(userCodeFile);
- ClassLoader sinkClassLoader =
FunctionCommon.getClassLoaderFromPackage(
- Function.FunctionDetails.ComponentType.SINK,
- sinkConfig.getClassName(), file,
narExtractionDirectory);
- functionDetails = SinkConfigUtils.convert(
- sinkConfig,
-
SinkConfigUtils.validateAndExtractDetails(sinkConfig, sinkClassLoader, null,
true));
- userCodeClassLoader = sinkClassLoader;
- userCodeClassLoaderCreated = true;
- } else if (userCodeFile != null) {
- File file = new File(userCodeFile);
- if (!file.exists()) {
- throw new RuntimeException("Sink archive does not
exist");
- }
- ClassLoader sinkClassLoader =
FunctionCommon.getClassLoaderFromPackage(
- Function.FunctionDetails.ComponentType.SINK,
- sinkConfig.getClassName(), file,
narExtractionDirectory);
- functionDetails = SinkConfigUtils.convert(
- sinkConfig,
-
SinkConfigUtils.validateAndExtractDetails(sinkConfig, sinkClassLoader, null,
true));
- userCodeClassLoader = sinkClassLoader;
- userCodeClassLoaderCreated = true;
- } else {
- if (!(runtimeEnv == null || runtimeEnv ==
RuntimeEnv.THREAD)) {
- throw new IllegalStateException("The archive property
must be specified in SourceConfig.");
- }
- functionDetails = SinkConfigUtils.convert(
- sinkConfig,
SinkConfigUtils.validateAndExtractDetails(
- sinkConfig,
Thread.currentThread().getContextClassLoader(), null, true));
- }
+ ClassLoader sinkClassLoader = extractClassLoader(
+ userCodeFile, ComponentType.SINK,
sinkConfig.getClassName());
+ functionDetails = SinkConfigUtils.convert(
+ sinkConfig,
SinkConfigUtils.validateAndExtractDetails(sinkConfig, sinkClassLoader, null,
true));
} else {
throw new IllegalArgumentException("Must specify Function,
Source or Sink config");
}
@@ -528,6 +426,62 @@ public class LocalRunner implements AutoCloseable {
}
}
+ private ClassLoader extractClassLoader(String userCodeFile, ComponentType
componentType, String className)
+ throws IOException, URISyntaxException {
+ userCodeClassLoader = userCodeFile != null ? isBuiltIn(userCodeFile,
componentType) : null;
+ if (userCodeClassLoader == null) {
+ if (userCodeFile != null &&
Utils.isFunctionPackageUrlSupported(userCodeFile)) {
+ File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
+ userCodeClassLoader = FunctionCommon.getClassLoaderFromPackage(
+ componentType, className, file,
narExtractionDirectory);
+ userCodeClassLoaderCreated = true;
+ } else if (userCodeFile != null) {
+ File file = new File(userCodeFile);
+ if (!file.exists()) {
+ String errorMsg;
+ switch (componentType) {
+ case FUNCTION:
+ errorMsg = "User jar";
+ break;
+ case SOURCE:
+ errorMsg = "Source archive";
+ break;
+ case SINK:
+ errorMsg = "Sink archive";
+ break;
+ default:
+ throw new IllegalStateException("Unexpected value:
" + componentType);
+ }
+ throw new RuntimeException(errorMsg + " (" + userCodeFile
+ ") does not exist");
+ }
+ userCodeClassLoader = FunctionCommon.getClassLoaderFromPackage(
+ componentType, className, file,
narExtractionDirectory);
+ userCodeClassLoaderCreated = true;
+ } else {
+ if (!(runtimeEnv == null || runtimeEnv == RuntimeEnv.THREAD)) {
+ String errorMsg;
+ switch (componentType) {
+ case FUNCTION:
+ errorMsg = "The jar property must be specified in
FunctionConfig.";
+ break;
+ case SOURCE:
+ errorMsg = "The archive property must be specified
in SourceConfig.";
+ break;
+ case SINK:
+ errorMsg = "The archive property must be specified
in SinkConfig.";
+ break;
+ default:
+ throw new IllegalStateException("Unexpected
ComponentType: " + componentType);
+ }
+ throw new IllegalStateException(errorMsg);
+ }
+ }
+ }
+ return userCodeClassLoader == null
+ ? Thread.currentThread().getContextClassLoader()
+ : userCodeClassLoader;
+ }
+
private void
startProcessMode(org.apache.pulsar.functions.proto.Function.FunctionDetails
functionDetails,
int parallelism, int
instanceIdOffset, String serviceUrl,
String stateStorageServiceUrl,
AuthenticationConfig authConfig,
@@ -698,6 +652,20 @@ public class LocalRunner implements AutoCloseable {
}
}
+ private ClassLoader isBuiltIn(String component, ComponentType
componentType)
+ throws IOException {
+ switch (componentType) {
+ case FUNCTION:
+ return isBuiltInFunction(component);
+ case SOURCE:
+ return isBuiltInSource(component);
+ case SINK:
+ return isBuiltInSink(component);
+ default:
+ throw new IllegalStateException("Unexpected ComponentType: " +
componentType);
+ }
+ }
+
private ClassLoader isBuiltInFunction(String functionType) throws
IOException {
// Validate the connector type from the locally available connectors
TreeMap<String, FunctionArchive> functions = getFunctions();