This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 359cfa7bc05 Factorize code in LocalRunner (#17400)
359cfa7bc05 is described below

commit 359cfa7bc05775bf6dd004f21b9907610ed3b3d5
Author: Christophe Bornet <[email protected]>
AuthorDate: Sat Sep 3 09:18:01 2022 +0200

    Factorize code in LocalRunner (#17400)
---
 .../org/apache/pulsar/functions/LocalRunner.java   | 200 +++++++++------------
 1 file changed, 84 insertions(+), 116 deletions(-)

diff --git 
a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
 
b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 6c7551f8ac3..a308c98b3da 100644
--- 
a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ 
b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -31,6 +31,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.LinkedList;
@@ -59,6 +60,7 @@ import 
org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
 import org.apache.pulsar.functions.proto.Function;
+import 
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.runtime.RuntimeUtils;
@@ -337,47 +339,10 @@ public class LocalRunner implements AutoCloseable {
                 parallelism = functionConfig.getParallelism();
                 if (functionConfig.getRuntime() == 
FunctionConfig.Runtime.JAVA) {
                     userCodeFile = functionConfig.getJar();
-                    ClassLoader builtInFunctionClassLoader = userCodeFile != 
null
-                            ? isBuiltInFunction(userCodeFile)
-                            : null;
-                    if (builtInFunctionClassLoader != null) {
-                        userCodeClassLoader = builtInFunctionClassLoader;
-                        functionDetails = FunctionConfigUtils.convert(
-                                functionConfig,
-                                
FunctionConfigUtils.validateJavaFunction(functionConfig, 
builtInFunctionClassLoader));
-                    } else if (userCodeFile != null && 
Utils.isFunctionPackageUrlSupported(userCodeFile)) {
-                        File file = 
FunctionCommon.extractFileFromPkgURL(userCodeFile);
-                        ClassLoader functionClassLoader = 
FunctionCommon.getClassLoaderFromPackage(
-                                
Function.FunctionDetails.ComponentType.FUNCTION,
-                                functionConfig.getClassName(), file, 
narExtractionDirectory);
-                        functionDetails = FunctionConfigUtils.convert(
-                                functionConfig,
-                                
FunctionConfigUtils.validateJavaFunction(functionConfig, functionClassLoader));
-                        userCodeClassLoader = functionClassLoader;
-                        userCodeClassLoaderCreated = true;
-                    } else if (userCodeFile != null) {
-                        File file = new File(userCodeFile);
-                        if (!file.exists()) {
-                            throw new RuntimeException("User jar does not 
exist");
-                        }
-                        ClassLoader functionClassLoader = 
FunctionCommon.getClassLoaderFromPackage(
-                                
Function.FunctionDetails.ComponentType.FUNCTION,
-                                functionConfig.getClassName(), file, 
narExtractionDirectory);
-                        functionDetails = FunctionConfigUtils.convert(
-                                functionConfig,
-                                
FunctionConfigUtils.validateJavaFunction(functionConfig, functionClassLoader));
-                        userCodeClassLoader = functionClassLoader;
-                        userCodeClassLoaderCreated = true;
-                    } else {
-                        if (!(runtimeEnv == null || runtimeEnv == 
RuntimeEnv.THREAD)) {
-                            throw new IllegalStateException("The jar property 
must be specified in FunctionConfig.");
-                        }
-                        functionDetails = FunctionConfigUtils.convert(
-                                functionConfig,
-                                FunctionConfigUtils.validateJavaFunction(
-                                        functionConfig,
-                                        
Thread.currentThread().getContextClassLoader()));
-                    }
+                    ClassLoader functionClassLoader = extractClassLoader(
+                        userCodeFile, ComponentType.FUNCTION, 
functionConfig.getClassName());
+                    functionDetails = FunctionConfigUtils.convert(
+                        functionConfig, 
FunctionConfigUtils.validateJavaFunction(functionConfig, functionClassLoader));
                 } else if (functionConfig.getRuntime() == 
FunctionConfig.Runtime.GO) {
                     userCodeFile = functionConfig.getGo();
                 } else if (functionConfig.getRuntime() == 
FunctionConfig.Runtime.PYTHON) {
@@ -395,85 +360,18 @@ public class LocalRunner implements AutoCloseable {
                 inferMissingArguments(sourceConfig);
                 userCodeFile = sourceConfig.getArchive();
                 parallelism = sourceConfig.getParallelism();
-
-                ClassLoader builtInSourceClassLoader = userCodeFile != null ? 
isBuiltInSource(userCodeFile) : null;
-                if (builtInSourceClassLoader != null) {
-                    functionDetails = SourceConfigUtils.convert(
-                            sourceConfig, 
SourceConfigUtils.validateAndExtractDetails(
-                                    sourceConfig, builtInSourceClassLoader, 
true));
-                    userCodeClassLoader = builtInSourceClassLoader;
-                } else if (userCodeFile != null && 
Utils.isFunctionPackageUrlSupported(userCodeFile)) {
-                    File file = 
FunctionCommon.extractFileFromPkgURL(userCodeFile);
-                    ClassLoader sourceClassLoader = 
FunctionCommon.getClassLoaderFromPackage(
-                            Function.FunctionDetails.ComponentType.SOURCE,
-                            sourceConfig.getClassName(), file, 
narExtractionDirectory);
-                    functionDetails = SourceConfigUtils.convert(
-                            sourceConfig,
-                            
SourceConfigUtils.validateAndExtractDetails(sourceConfig, sourceClassLoader, 
true));
-                    userCodeClassLoader = sourceClassLoader;
-                    userCodeClassLoaderCreated = true;
-                } else if (userCodeFile != null) {
-                    File file = new File(userCodeFile);
-                    if (!file.exists()) {
-                        throw new RuntimeException("Source archive (" + 
userCodeFile + ") does not exist");
-                    }
-                    ClassLoader sourceClassLoader = 
FunctionCommon.getClassLoaderFromPackage(
-                            Function.FunctionDetails.ComponentType.SOURCE,
-                            sourceConfig.getClassName(), file, 
narExtractionDirectory);
-                    functionDetails = SourceConfigUtils.convert(sourceConfig,
-                            
SourceConfigUtils.validateAndExtractDetails(sourceConfig, sourceClassLoader, 
true));
-                    userCodeClassLoader = sourceClassLoader;
-                    userCodeClassLoaderCreated = true;
-                } else {
-                    if (!(runtimeEnv == null || runtimeEnv == 
RuntimeEnv.THREAD)) {
-                        throw new IllegalStateException("The archive property 
must be specified in SourceConfig.");
-                    }
-                    functionDetails = SourceConfigUtils.convert(
-                            sourceConfig, 
SourceConfigUtils.validateAndExtractDetails(
-                                    sourceConfig, 
Thread.currentThread().getContextClassLoader(), true));
-                }
+                ClassLoader sourceClassLoader = extractClassLoader(
+                    userCodeFile, ComponentType.SOURCE, 
sourceConfig.getClassName());
+                functionDetails = SourceConfigUtils.convert(
+                    sourceConfig, 
SourceConfigUtils.validateAndExtractDetails(sourceConfig, sourceClassLoader, 
true));
             } else if (sinkConfig != null) {
                 inferMissingArguments(sinkConfig);
                 userCodeFile = sinkConfig.getArchive();
                 parallelism = sinkConfig.getParallelism();
-
-                ClassLoader builtInSinkClassLoader = userCodeFile != null ? 
isBuiltInSink(userCodeFile) : null;
-                if (builtInSinkClassLoader != null) {
-                    functionDetails = SinkConfigUtils.convert(
-                            sinkConfig, 
SinkConfigUtils.validateAndExtractDetails(
-                                    sinkConfig, builtInSinkClassLoader, null, 
true));
-                    userCodeClassLoader = builtInSinkClassLoader;
-                } else if (Utils.isFunctionPackageUrlSupported(userCodeFile)) {
-                    File file = 
FunctionCommon.extractFileFromPkgURL(userCodeFile);
-                    ClassLoader sinkClassLoader = 
FunctionCommon.getClassLoaderFromPackage(
-                            Function.FunctionDetails.ComponentType.SINK,
-                            sinkConfig.getClassName(), file, 
narExtractionDirectory);
-                    functionDetails = SinkConfigUtils.convert(
-                            sinkConfig,
-                            
SinkConfigUtils.validateAndExtractDetails(sinkConfig, sinkClassLoader, null, 
true));
-                    userCodeClassLoader = sinkClassLoader;
-                    userCodeClassLoaderCreated = true;
-                } else if (userCodeFile != null) {
-                    File file = new File(userCodeFile);
-                    if (!file.exists()) {
-                        throw new RuntimeException("Sink archive does not 
exist");
-                    }
-                    ClassLoader sinkClassLoader = 
FunctionCommon.getClassLoaderFromPackage(
-                            Function.FunctionDetails.ComponentType.SINK,
-                            sinkConfig.getClassName(), file, 
narExtractionDirectory);
-                    functionDetails = SinkConfigUtils.convert(
-                            sinkConfig,
-                            
SinkConfigUtils.validateAndExtractDetails(sinkConfig, sinkClassLoader, null, 
true));
-                    userCodeClassLoader = sinkClassLoader;
-                    userCodeClassLoaderCreated = true;
-                } else {
-                    if (!(runtimeEnv == null || runtimeEnv == 
RuntimeEnv.THREAD)) {
-                        throw new IllegalStateException("The archive property 
must be specified in SourceConfig.");
-                    }
-                    functionDetails = SinkConfigUtils.convert(
-                            sinkConfig, 
SinkConfigUtils.validateAndExtractDetails(
-                                    sinkConfig, 
Thread.currentThread().getContextClassLoader(), null, true));
-                }
+                ClassLoader sinkClassLoader = extractClassLoader(
+                    userCodeFile, ComponentType.SINK, 
sinkConfig.getClassName());
+                functionDetails = SinkConfigUtils.convert(
+                    sinkConfig, 
SinkConfigUtils.validateAndExtractDetails(sinkConfig, sinkClassLoader, null, 
true));
             } else {
                 throw new IllegalArgumentException("Must specify Function, 
Source or Sink config");
             }
@@ -528,6 +426,62 @@ public class LocalRunner implements AutoCloseable {
         }
     }
 
+    private ClassLoader extractClassLoader(String userCodeFile, ComponentType 
componentType, String className)
+        throws IOException, URISyntaxException {
+        userCodeClassLoader = userCodeFile != null ? isBuiltIn(userCodeFile, 
componentType) : null;
+        if (userCodeClassLoader == null) {
+            if (userCodeFile != null && 
Utils.isFunctionPackageUrlSupported(userCodeFile)) {
+                File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
+                userCodeClassLoader = FunctionCommon.getClassLoaderFromPackage(
+                        componentType, className, file, 
narExtractionDirectory);
+                userCodeClassLoaderCreated = true;
+            } else if (userCodeFile != null) {
+                File file = new File(userCodeFile);
+                if (!file.exists()) {
+                    String errorMsg;
+                    switch (componentType) {
+                        case FUNCTION:
+                            errorMsg = "User jar";
+                            break;
+                        case SOURCE:
+                            errorMsg = "Source archive";
+                            break;
+                        case SINK:
+                            errorMsg = "Sink archive";
+                            break;
+                        default:
+                            throw new IllegalStateException("Unexpected value: 
" + componentType);
+                    }
+                    throw new RuntimeException(errorMsg + " (" + userCodeFile 
+ ") does not exist");
+                }
+                userCodeClassLoader = FunctionCommon.getClassLoaderFromPackage(
+                        componentType, className, file, 
narExtractionDirectory);
+                userCodeClassLoaderCreated = true;
+            } else {
+                if (!(runtimeEnv == null || runtimeEnv == RuntimeEnv.THREAD)) {
+                    String errorMsg;
+                    switch (componentType) {
+                        case FUNCTION:
+                            errorMsg = "The jar property must be specified in 
FunctionConfig.";
+                            break;
+                        case SOURCE:
+                            errorMsg = "The archive property must be specified 
in SourceConfig.";
+                            break;
+                        case SINK:
+                            errorMsg = "The archive property must be specified 
in SinkConfig.";
+                            break;
+                        default:
+                            throw new IllegalStateException("Unexpected 
ComponentType: " + componentType);
+                    }
+                    throw new IllegalStateException(errorMsg);
+                }
+            }
+        }
+        return userCodeClassLoader == null
+            ? Thread.currentThread().getContextClassLoader()
+            : userCodeClassLoader;
+    }
+
     private void 
startProcessMode(org.apache.pulsar.functions.proto.Function.FunctionDetails 
functionDetails,
                                            int parallelism, int 
instanceIdOffset, String serviceUrl,
                                            String stateStorageServiceUrl, 
AuthenticationConfig authConfig,
@@ -698,6 +652,20 @@ public class LocalRunner implements AutoCloseable {
         }
     }
 
+    private ClassLoader isBuiltIn(String component, ComponentType 
componentType)
+        throws IOException {
+        switch (componentType) {
+            case FUNCTION:
+                return isBuiltInFunction(component);
+            case SOURCE:
+                return isBuiltInSource(component);
+            case SINK:
+                return isBuiltInSink(component);
+            default:
+                throw new IllegalStateException("Unexpected ComponentType: " + 
componentType);
+        }
+    }
+
     private ClassLoader isBuiltInFunction(String functionType) throws 
IOException {
         // Validate the connector type from the locally available connectors
         TreeMap<String, FunctionArchive> functions = getFunctions();

Reply via email to