This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c0fd11  allowing users to specify function jar in yml file (#1899)
7c0fd11 is described below

commit 7c0fd11542a5a87d62628b12918af03065f9bef0
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Mon Jun 4 12:55:17 2018 -0700

    allowing users to specify function jar in yml file (#1899)
    
    * allowing users to specify function jar in yml file
    
    * fixing unit tests
---
 .../apache/pulsar/admin/cli/CmdFunctionsTest.java  |  4 +-
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  | 43 +++++++++++++++++-----
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 17 +++++++--
 .../org/apache/pulsar/admin/cli/CmdSources.java    | 13 ++++++-
 .../pulsar/functions/utils/FunctionConfig.java     |  5 +++
 .../apache/pulsar/functions/utils/SinkConfig.java  |  3 ++
 .../pulsar/functions/utils/SourceConfig.java       |  3 ++
 .../org/apache/pulsar/functions/utils/Utils.java   |  4 ++
 .../validation/ConfigValidationAnnotations.java    |  9 +++++
 .../functions/utils/validation/ValidatorImpls.java | 17 ++++++++-
 10 files changed, 101 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index 0826956..20499e4 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.utils.Utils;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -76,7 +77,7 @@ import static org.testng.Assert.assertEquals;
  * Unit test of {@link CmdFunctions}.
  */
 @Slf4j
-@PrepareForTest({ CmdFunctions.class, Reflections.class, 
StorageClientBuilder.class })
+@PrepareForTest({ CmdFunctions.class, Reflections.class, 
StorageClientBuilder.class, Utils.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", 
"org.apache.logging.log4j.*" })
 public class CmdFunctionsTest {
 
@@ -127,6 +128,7 @@ public class CmdFunctionsTest {
         when(Reflections.classImplementsIface(anyString(), 
any())).thenReturn(true);
         when(Reflections.createInstance(eq(DummyFunction.class.getName()), 
any(File.class))).thenReturn(new DummyFunction());
         when(Reflections.createInstance(eq(DefaultSerDe.class.getName()), 
any(File.class))).thenReturn(new DefaultSerDe(String.class));
+        PowerMockito.stub(PowerMockito.method(Utils.class, 
"fileExists")).toReturn(true);
     }
 
 //    @Test
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 30916cd..e726cce 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -79,6 +79,7 @@ import static java.util.Objects.isNull;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import static org.apache.pulsar.functions.utils.Utils.fileExists;
 
 @Slf4j
 @Parameters(commandDescription = "Interface for managing Pulsar Functions 
(lightweight, Lambda-style compute processes that work with Pulsar)")
@@ -361,15 +362,18 @@ public class CmdFunctions extends CmdBase {
                 functionConfig.setAutoAck(true);
             }
 
-
             if (null != jarFile) {
-                functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
-                userCodeFile = jarFile;
-            } else if (null != pyFile) {
-                functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON);
-                userCodeFile = pyFile;
-            } else {
-                throw new ParameterException("Either a Java jar or a Python 
file needs to be specified for the function");
+                functionConfig.setJar(jarFile);
+            }
+
+            if (null != pyFile) {
+                functionConfig.setPy(pyFile);
+            }
+
+            if (functionConfig.getJar() != null) {
+                userCodeFile = functionConfig.getJar();
+            } else if (functionConfig.getPy() != null) {
+                userCodeFile = functionConfig.getPy();
             }
 
             // infer default vaues
@@ -378,8 +382,22 @@ public class CmdFunctions extends CmdBase {
 
         protected void validateFunctionConfigs(FunctionConfig functionConfig) {
 
+            if (functionConfig.getJar() != null && functionConfig.getPy() != 
null) {
+                throw new ParameterException("Either a Java jar or a Python 
file needs to"
+                        + " be specified for the function. Cannot specify 
both.");
+            }
+
+            if (functionConfig.getJar() == null && functionConfig.getPy() == 
null) {
+                throw new ParameterException("Either a Java jar or a Python 
file needs to"
+                        + " be specified for the function. Please specify 
one.");
+            }
+
+            if (!fileExists(userCodeFile)) {
+                throw new ParameterException("File " + userCodeFile + " does 
not exist");
+            }
+
             if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
-                File file = new File(jarFile);
+                File file = new File(functionConfig.getJar());
                 ClassLoader userJarLoader;
                 try {
                     userJarLoader = Reflections.loadJar(file);
@@ -394,6 +412,7 @@ public class CmdFunctions extends CmdBase {
                 // Need to load jar and set context class loader before calling
                 ConfigValidation.validateConfig(functionConfig, 
functionConfig.getRuntime().name());
             } catch (Exception e) {
+                log.info("ex: {}", e, e);
                 throw new ParameterException(e.getMessage());
             }
         }
@@ -416,6 +435,12 @@ public class CmdFunctions extends CmdBase {
                 functionConfig.setParallelism(1);
             }
 
+            if (functionConfig.getJar() != null) {
+                functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+            } else if (functionConfig.getPy() != null) {
+                functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON);
+            }
+
             WindowConfig windowConfig = functionConfig.getWindowConfig();
             if (windowConfig != null) {
                 WindowUtils.inferDefaultConfigs(windowConfig);
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index fdbe291..607a4d2 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -51,6 +51,7 @@ import java.util.Map;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 import static 
org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
+import static org.apache.pulsar.functions.utils.Utils.fileExists;
 import static org.apache.pulsar.functions.utils.Utils.getSinkType;
 import static org.apache.pulsar.functions.utils.Utils.loadConfig;
 
@@ -175,7 +176,7 @@ public class CmdSinks extends CmdBase {
             if (null != tenant) {
                 sinkConfig.setTenant(tenant);
             }
-            
+
             if (null != namespace) {
                 sinkConfig.setNamespace(namespace);
             }
@@ -208,8 +209,8 @@ public class CmdSinks extends CmdBase {
                 sinkConfig.setParallelism(parallelism);
             }
 
-            if (null == jarFile) {
-                throw new IllegalArgumentException("Connector JAR not 
specfied");
+            if (null != jarFile) {
+                sinkConfig.setJar(jarFile);
             }
 
             sinkConfig.setResources(new 
org.apache.pulsar.functions.utils.Resources(cpu, ram, disk));
@@ -233,7 +234,15 @@ public class CmdSinks extends CmdBase {
         }
 
         protected void validateSinkConfigs(SinkConfig sinkConfig) {
-            File file = new File(jarFile);
+            if (null == sinkConfig.getJar()) {
+                throw new ParameterException("Sink jar not specfied");
+            }
+
+            if (!fileExists(sinkConfig.getJar())) {
+                throw new ParameterException("Jar file " + sinkConfig.getJar() 
+ " does not exist");
+            }
+
+            File file = new File(sinkConfig.getJar());
             ClassLoader userJarLoader;
             try {
                 userJarLoader = Reflections.loadJar(file);
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 97d5fd9..ffc4891 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -47,6 +47,7 @@ import java.util.Map;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 import static 
org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
+import static org.apache.pulsar.functions.utils.Utils.fileExists;
 import static org.apache.pulsar.functions.utils.Utils.getSourceType;
 import static org.apache.pulsar.functions.utils.Utils.loadConfig;
 
@@ -191,8 +192,8 @@ public class CmdSources extends CmdBase {
                 sourceConfig.setParallelism(parallelism);
             }
 
-            if (null == jarFile) {
-                throw new ParameterException("Source JAR not specfied");
+            if (jarFile != null) {
+                sourceConfig.setJar(jarFile);
             }
 
             sourceConfig.setResources(new 
org.apache.pulsar.functions.utils.Resources(cpu, ram, disk));
@@ -216,6 +217,14 @@ public class CmdSources extends CmdBase {
         }
 
         protected void validateSourceConfigs(SourceConfig sourceConfig) {
+            if (null == sourceConfig.getJar()) {
+                throw new ParameterException("Source jar not specfied");
+            }
+
+            if (!fileExists(sourceConfig.getJar())) {
+                throw new ParameterException("Jar file " + 
sourceConfig.getJar() + " does not exist");
+            }
+
             File file = new File(jarFile);
             ClassLoader userJarLoader;
             try {
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index be40e01..9671d50 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
+import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists;
 import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass;
 import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClasses;
 import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isListEntryCustom;
@@ -95,4 +96,8 @@ public class FunctionConfig {
     private WindowConfig windowConfig;
     @isPositiveNumber
     private Long timeoutMs;
+    @isFileExists
+    private String jar;
+    @isFileExists
+    private String py;
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
index 9fa0307..f4ee77c 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
@@ -24,6 +24,7 @@ import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
 import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
+import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists;
 import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass;
 import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isMapEntryCustom;
 import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber;
@@ -60,4 +61,6 @@ public class SinkConfig {
     private FunctionConfig.ProcessingGuarantees processingGuarantees;
     @isValidResources
     private Resources resources;
+    @isFileExists
+    private String jar;
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
index cdede64..295f339 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
@@ -25,6 +25,7 @@ import lombok.Setter;
 import lombok.ToString;
 import org.apache.pulsar.functions.api.SerDe;
 import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
+import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists;
 import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass;
 import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber;
 import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidResources;
@@ -62,4 +63,6 @@ public class SourceConfig {
     private FunctionConfig.ProcessingGuarantees processingGuarantees;
     @isValidResources
     private Resources resources;
+    @isFileExists
+    private String jar;
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 3f7c2e4..c534ed3 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -205,4 +205,8 @@ public class Utils {
 
         return typeArg;
     }
+
+    public static boolean fileExists(String file) {
+        return new File(file).exists();
+    }
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
index 934f6f5..e6e0583 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
@@ -189,6 +189,15 @@ public class ConfigValidationAnnotations {
     }
 
     /**
+     * check if file exists
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isFileExists {
+        Class<?> validatorClass() default ValidatorImpls.FileValidator.class;
+    }
+
+    /**
      * checks function config as a whole to make sure all fields are valid
      */
     @Retention(RetentionPolicy.RUNTIME)
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index 3e0a168..2ee6044 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -33,12 +33,14 @@ import org.apache.pulsar.functions.utils.SourceConfig;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.utils.WindowConfig;
 
+import java.io.File;
 import java.lang.reflect.InvocationTargetException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
 
+import static org.apache.pulsar.functions.utils.Utils.fileExists;
 import static org.apache.pulsar.functions.utils.Utils.getSinkType;
 import static org.apache.pulsar.functions.utils.Utils.getSourceType;
 
@@ -747,9 +749,22 @@ public class ValidatorImpls {
         }
     }
 
+    public static class FileValidator extends Validator {
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+                return;
+            }
+            new StringValidator().validateField(name, o);
 
+            if (!fileExists((String) o)) {
+                throw new IllegalArgumentException
+                        (String.format("File %s specified in field '%s' does 
not exist", o, name));
+            }
+        }
+    }
 
-        /**
+    /**
      * Validates basic types.
      */
     public static class SimpleTypeValidator extends Validator {

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to