This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f36c43f  Move all validation/inferring missing args to serverside 
(#2907)
f36c43f is described below

commit f36c43f3592ce451176a45076db2487260aaabe2
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Fri Nov 2 14:44:12 2018 -0700

    Move all validation/inferring missing args to serverside (#2907)
    
    * Move all validation/inferring missing args to serverside
    
    * Moved tests to serverside
    
    * Fixed all tests
    
    * Moved tests from admin to backend
    
    * remove unused var
    
    * Make it explicitly narclassloader
    
    * Dont copy nar files
    
    * Copy nar files
    
    * Some tests worjing
    
    * Do not mock ConnectorIo
    
    * Fix build
    
    * Some more enhancements to the tests
    
    * Ignore io packages in powermock
    
    * Fixed unittests
    
    * Remove unused stuff
    
    * Move changes from client side to serverside
    
    * More serverside tests
    
    * cleanup
    
    * Fixed unittests
---
 .../apache/pulsar/admin/cli/CmdFunctionsTest.java  | 257 ---------------------
 pulsar-client-tools/pom.xml                        |  34 +--
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |  17 +-
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java |  26 ---
 .../org/apache/pulsar/admin/cli/CmdSources.java    |  26 ---
 .../org/apache/pulsar/admin/cli/TestCmdSinks.java  | 236 +------------------
 .../apache/pulsar/admin/cli/TestCmdSources.java    | 210 +----------------
 .../pulsar/functions/utils/SinkConfigUtils.java    |   8 +-
 .../pulsar/functions/utils/SourceConfigUtils.java  |   4 +-
 .../pulsar/functions/utils/io/ConnectorUtils.java  |   3 +-
 pulsar-functions/worker/pom.xml                    |  44 ++++
 .../functions/worker/rest/api/FunctionsImpl.java   |   5 +-
 .../rest/api/v2/FunctionApiV2ResourceTest.java     |  96 +++++++-
 .../worker/rest/api/v2/SinkApiV2ResourceTest.java  | 138 ++++++++---
 .../rest/api/v2/SourceApiV2ResourceTest.java       | 161 ++++++++++---
 15 files changed, 390 insertions(+), 875 deletions(-)

diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index 7cd870f..077a742 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -274,38 +274,6 @@ public class CmdFunctionsTest {
     }
 
     @Test
-    public void testCreateFunctionWithHttpUrl() throws Exception {
-        String fnName = TEST_NAME + "-function";
-        String inputTopicName = TEST_NAME + "-input-topic";
-        String outputTopicName = TEST_NAME + "-output-topic";
-
-        ConsoleOutputCapturer consoleOutputCapturer = new 
ConsoleOutputCapturer();
-        consoleOutputCapturer.start();
-
-        final String url = "http://localhost:1234/test";;
-        cmd.run(new String[] {
-            "create",
-            "--name", fnName,
-            "--inputs", inputTopicName,
-            "--output", outputTopicName,
-            "--jar", url,
-            "--tenant", "sample",
-            "--namespace", "ns1",
-            "--className", DummyFunction.class.getName(),
-        });
-
-        CreateFunction creater = cmd.getCreater();
-
-        consoleOutputCapturer.stop();
-        String output = consoleOutputCapturer.getStderr();
-
-        assertTrue(output.contains("Corrupted Jar File"));
-        assertEquals(fnName, creater.getFunctionName());
-        assertEquals(inputTopicName, creater.getInputs());
-        assertEquals(outputTopicName, creater.getOutput());
-    }
-
-    @Test
     public void testGetFunctionStatus() throws Exception {
         String fnName = TEST_NAME + "-function";
         String tenant = "sample";
@@ -347,60 +315,6 @@ public class CmdFunctionsTest {
     }
 
     @Test
-    public void testCreateSink() throws Exception {
-        String fnName = TEST_NAME + "-function";
-        String inputTopicName = TEST_NAME + "-input-topic";
-
-
-        ConsoleOutputCapturer consoleOutputCapturer = new 
ConsoleOutputCapturer();
-        consoleOutputCapturer.start();
-
-        final String url = "http://localhost:1234/test";;
-        cmdSinks.run(new String[] {
-            "create",
-            "--name", fnName,
-            "--inputs", inputTopicName,
-            "--archive", url,
-            "--tenant", "sample",
-            "--namespace", "ns1"
-        });
-
-        CreateSink creater = cmdSinks.getCreateSink();
-
-        consoleOutputCapturer.stop();
-        String output = consoleOutputCapturer.getStderr();
-
-        assertTrue(output.contains("Corrupt User PackageFile " + url));
-        assertEquals(url, creater.archive);
-    }
-
-    @Test
-    public void testCreateSource() throws Exception {
-        String fnName = TEST_NAME + "-function";
-
-        ConsoleOutputCapturer consoleOutputCapturer = new 
ConsoleOutputCapturer();
-        consoleOutputCapturer.start();
-
-        final String url = "http://localhost:1234/test";;
-        cmdSources.run(new String[] {
-            "create",
-            "--name", fnName,
-            "--archive", url,
-            "--tenant", "sample",
-            "--namespace", "ns1",
-            "--destination-topic-name", "input",
-        });
-
-        CreateSource creater = cmdSources.getCreateSource();
-
-        consoleOutputCapturer.stop();
-        String output = consoleOutputCapturer.getStderr();
-
-        assertTrue(output.contains("Corrupt User PackageFile " + url));
-        assertEquals(url, creater.archive);
-    }
-
-    @Test
     public void testCreateFunctionWithTopicPatterns() throws Exception {
         String fnName = TEST_NAME + "-function";
         String topicPatterns = "persistent://tenant/ns/topicPattern*";
@@ -426,46 +340,6 @@ public class CmdFunctionsTest {
     }
 
     @Test
-    public void testCreateWithoutTenant() throws Exception {
-        String fnName = TEST_NAME + "-function";
-        String inputTopicName = 
"persistent://tenant/standalone/namespace/input-topic";
-        String outputTopicName = 
"persistent://tenant/standalone/namespace/output-topic";
-        cmd.run(new String[] {
-                "create",
-                "--name", fnName,
-                "--inputs", inputTopicName,
-                "--output", outputTopicName,
-                "--jar", JAR_NAME,
-                "--namespace", "ns1",
-                "--className", DummyFunction.class.getName(),
-        });
-
-        CreateFunction creater = cmd.getCreater();
-        assertEquals("public", creater.getFunctionConfig().getTenant());
-        verify(functions, times(1)).createFunction(any(FunctionConfig.class), 
anyString());
-    }
-
-    @Test
-    public void testCreateWithoutNamespace() throws Exception {
-        String fnName = TEST_NAME + "-function";
-        String inputTopicName = 
"persistent://tenant/standalone/namespace/input-topic";
-        String outputTopicName = 
"persistent://tenant/standalone/namespace/output-topic";
-        cmd.run(new String[] {
-                "create",
-                "--name", fnName,
-                "--inputs", inputTopicName,
-                "--output", outputTopicName,
-                "--jar", JAR_NAME,
-                "--className", DummyFunction.class.getName(),
-        });
-
-        CreateFunction creater = cmd.getCreater();
-        assertEquals("public", creater.getFunctionConfig().getTenant());
-        assertEquals("default", creater.getFunctionConfig().getNamespace());
-        verify(functions, times(1)).createFunction(any(FunctionConfig.class), 
anyString());
-    }
-
-    @Test
     public void testCreateUsingFullyQualifiedFunctionName() throws Exception {
         String inputTopicName = TEST_NAME + "-input-topic";
         String outputTopicName = TEST_NAME + "-output-topic";
@@ -491,25 +365,6 @@ public class CmdFunctionsTest {
     }
 
     @Test
-    public void testCreateWithoutFunctionName() throws Exception {
-        String inputTopicName = TEST_NAME + "-input-topic";
-        String outputTopicName = TEST_NAME + "-output-topic";
-        cmd.run(new String[] {
-                "create",
-                "--inputs", inputTopicName,
-                "--output", outputTopicName,
-                "--jar", JAR_NAME,
-                "--tenant", "sample",
-                "--namespace", "ns1",
-                "--className", DummyFunction.class.getName(),
-        });
-
-        CreateFunction creater = cmd.getCreater();
-        assertEquals("CmdFunctionsTest$DummyFunction", 
creater.getFunctionConfig().getName());
-        verify(functions, times(1)).createFunction(any(FunctionConfig.class), 
anyString());
-    }
-
-    @Test
     public void testCreateWithoutOutputTopicWithSkipFlag() throws Exception {
         String inputTopicName = TEST_NAME + "-input-topic";
         cmd.run(new String[] {
@@ -729,89 +584,6 @@ public class CmdFunctionsTest {
     }
 
     @Test
-    public void TestCreateFunctionParallelism() throws Exception{
-
-        String[] correctArgs = new String[]{
-                "--name", fnName,
-                "--inputs", inputTopicName,
-                "--output", outputTopicName,
-                "--jar", JAR_NAME,
-                "--tenant", "sample",
-                "--namespace", "ns1",
-                "--className", DummyFunction.class.getName(),
-                "--parallelism", "1"
-        };
-
-        String[] incorrectArgs = new String[]{
-                "--name", fnName,
-                "--inputs", inputTopicName,
-                "--output", outputTopicName,
-                "--jar", JAR_NAME,
-                "--tenant", "sample",
-                "--namespace", "ns1",
-                "--className", DummyFunction.class.getName(),
-                "--parallelism", "-1"
-        };
-
-        testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Function 
parallelism should positive number");
-
-    }
-
-    @Test
-    public void TestCreateTopicName() throws Exception {
-
-        String[] correctArgs = new String[]{
-                "--name", fnName,
-                "--inputs", inputTopicName,
-                "--output", outputTopicName,
-                "--jar", JAR_NAME,
-                "--tenant", "sample",
-                "--namespace", "ns1",
-                "--className", DummyFunction.class.getName(),
-        };
-
-        String wrongOutputTopicName = TEST_NAME + "-output-topic/test:";
-        String[] incorrectArgs = new String[]{
-                "--name", fnName,
-                "--inputs", inputTopicName,
-                "--output", wrongOutputTopicName,
-                "--jar", JAR_NAME,
-                "--tenant", "sample",
-                "--namespace", "ns1",
-                "--className", DummyFunction.class.getName(),
-        };
-
-        testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Output topic 
" + wrongOutputTopicName + " is invalid");
-    }
-
-    @Test
-    public void TestCreateClassName() throws Exception {
-
-        String[] correctArgs = new String[]{
-                "--name", fnName,
-                "--inputs", inputTopicName,
-                "--output", outputTopicName,
-                "--jar", 
DummyFunction.class.getProtectionDomain().getCodeSource().getLocation().getPath(),
-                "--tenant", "sample",
-                "--namespace", "ns1",
-                "--className", DummyFunction.class.getName(),
-        };
-
-        String cannotLoadClass = "com.test.Function";
-        String[] incorrectArgs = new String[]{
-                "--name", fnName,
-                "--inputs", inputTopicName,
-                "--output", outputTopicName,
-                "--jar", 
DummyFunction.class.getProtectionDomain().getCodeSource().getLocation().getPath(),
-                "--tenant", "sample",
-                "--namespace", "ns1",
-                "--className", cannotLoadClass,
-        };
-
-        testValidateFunctionsConfigs(correctArgs, incorrectArgs, "User class 
must be in class path");
-    }
-
-    @Test
     public void testCreateFunctionWithCpu() throws Exception {
         String fnName = TEST_NAME + "-function";
         String inputTopicName = TEST_NAME + "-input-topic";
@@ -998,35 +770,6 @@ public class CmdFunctionsTest {
         verify(functions, 
times(1)).updateFunctionWithUrl(any(FunctionConfig.class), anyString());
     }
 
-    @Test
-    public void TestCreateSameInOutTopic() throws Exception {
-
-        String[] correctArgs = new String[]{
-                "--name", fnName,
-                "--inputs", inputTopicName,
-                "--output", outputTopicName,
-                "--jar", JAR_NAME,
-                "--tenant", "sample",
-                "--namespace", "ns1",
-                "--className", DummyFunction.class.getName(),
-        };
-
-        String[] incorrectArgs = new String[]{
-                "--name", fnName,
-                "--inputs", inputTopicName,
-                "--output", inputTopicName,
-                "--jar", JAR_NAME,
-                "--tenant", "sample",
-                "--namespace", "ns1",
-                "--className", DummyFunction.class.getName(),
-        };
-
-        testValidateFunctionsConfigs(correctArgs, incorrectArgs,
-                "Output topic " + inputTopicName
-                        + " is also being used as an input topic (topics must 
be one or the other)");
-
-    }
-
 
     public static class ConsoleOutputCapturer {
         private ByteArrayOutputStream stdout;
diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml
index 73faa52..8e168f3 100644
--- a/pulsar-client-tools/pom.xml
+++ b/pulsar-client-tools/pom.xml
@@ -70,37 +70,10 @@
     <!-- functions related dependencies (begin) -->
 
     <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-functions-worker</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>net.jodah</groupId>
-      <artifactId>typetools</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>org.apache.bookkeeper</groupId>
       <artifactId>stream-storage-java-client</artifactId>
     </dependency>
 
-    <!-- functions related dependencies (end) -->
-
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-io-cassandra</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-io-twitter</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-
   </dependencies>
 
   <build>
@@ -118,12 +91,7 @@
             </goals>
             <configuration>
               <tasks>
-                <echo>copy test sink package</echo>
-                <mkdir dir="${basedir}/src/test/resources"/>
-                <copy 
file="${basedir}/../pulsar-io/cassandra/target/pulsar-io-cassandra-${project.version}.nar"
 tofile="${basedir}/src/test/resources/pulsar-io-cassandra.nar"/>
-                <echo>copy test source package</echo>
-                <mkdir dir="${basedir}/src/test/resources"/>
-                <copy 
file="${basedir}/../pulsar-io/twitter/target/pulsar-io-twitter-${project.version}.nar"
 tofile="${basedir}/src/test/resources/pulsar-io-twitter.nar"/>
+                <copy file="${basedir}/pom.xml" 
tofile="${basedir}/src/test/resources/dummy.nar"/>
               </tasks>
             </configuration>
           </execution>
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 2ec5aff..24597eb 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -65,7 +65,6 @@ import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.functions.WindowConfig;
 import org.apache.pulsar.functions.utils.Utils;
-import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 
 @Slf4j
 @Parameters(commandDescription = "Interface for managing Pulsar Functions 
(lightweight, Lambda-style compute processes that work with Pulsar)")
@@ -297,9 +296,6 @@ public class CmdFunctions extends CmdBase {
         protected String deadLetterTopic;
         protected FunctionConfig functionConfig;
         protected String userCodeFile;
-        // The classLoader associated with this function defn
-        protected ClassLoader classLoader;
-
 
         private void mergeArgs() {
             if (!StringUtils.isBlank(DEPRECATED_className)) className = 
DEPRECATED_className;
@@ -476,9 +472,6 @@ public class CmdFunctions extends CmdBase {
                 userCodeFile = functionConfig.getPy();
             }
 
-            // infer default vaues
-            FunctionConfigUtils.inferMissingArguments(functionConfig);
-
             // check if configs are valid
             validateFunctionConfigs(functionConfig);
         }
@@ -501,15 +494,7 @@ public class CmdFunctions extends CmdBase {
             }
             if (!isBlank(functionConfig.getPy()) && 
!Utils.isFunctionPackageUrlSupported(functionConfig.getPy()) &&
                     !new File(functionConfig.getPy()).exists()) {
-                throw new ParameterException("The specified jar file does not 
exist");
-            }
-
-            try {
-                // Need to load jar and set context class loader before calling
-                String functionPkgUrl = 
Utils.isFunctionPackageUrlSupported(userCodeFile) ? userCodeFile : null;
-                classLoader = FunctionConfigUtils.validate(functionConfig, 
functionPkgUrl, null);
-            } catch (Exception e) {
-                throw new IllegalArgumentException(e.getMessage());
+                throw new ParameterException("The specified python file does 
not exist");
             }
         }
     }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 48601b2..d14c053 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -22,7 +22,6 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static 
org.apache.pulsar.functions.utils.SinkConfigUtils.inferMissingArguments;
 import static org.apache.pulsar.functions.utils.Utils.BUILTIN;
 
 import com.beust.jcommander.Parameter;
@@ -38,7 +37,6 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Type;
-import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.*;
 import java.util.stream.Collectors;
@@ -51,12 +49,10 @@ import org.apache.commons.lang3.text.WordUtils;
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
-import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.utils.*;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.utils.io.Connectors;
@@ -306,8 +302,6 @@ public class CmdSinks extends CmdBase {
 
         protected SinkConfig sinkConfig;
 
-        protected NarClassLoader classLoader;
-
         private void mergeArgs() {
             if (!StringUtils.isBlank(DEPRECATED_subsName)) subsName = 
DEPRECATED_subsName;
             if (!StringUtils.isBlank(DEPRECATED_topicsPattern)) topicsPattern 
= DEPRECATED_topicsPattern;
@@ -421,8 +415,6 @@ public class CmdSinks extends CmdBase {
             if (null != sinkConfigString) {
                 sinkConfig.setConfigs(parseConfigs(sinkConfigString));
             }
-            
-            inferMissingArguments(sinkConfig);
 
             // check if configs are valid
             validateSinkConfigs(sinkConfig);
@@ -445,24 +437,6 @@ public class CmdSinks extends CmdBase {
                     throw new IllegalArgumentException(String.format("Sink 
Archive file %s does not exist", sinkConfig.getArchive()));
                 }
             }
-
-            try {
-                // Need to load jar and set context class loader before calling
-                String sourcePkgUrl = 
Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive()) ? 
sinkConfig.getArchive() : null;
-                Path archivePath = 
(Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive()) || 
sinkConfig.getArchive().startsWith(BUILTIN)) ? null : new 
File(sinkConfig.getArchive()).toPath();
-                classLoader = SinkConfigUtils.validate(sinkConfig, 
archivePath, sourcePkgUrl, null);
-            } catch (Exception e) {
-                throw new ParameterException(e.getMessage());
-            }
-        }
-
-
-        protected org.apache.pulsar.functions.proto.Function.FunctionDetails 
createSinkConfigProto2(SinkConfig sinkConfig)
-                throws IOException {
-            org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder 
functionDetailsBuilder
-                    = 
org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
-            
Utils.mergeJson(FunctionsImpl.printJson(SinkConfigUtils.convert(sinkConfig, 
classLoader)), functionDetailsBuilder);
-            return functionDetailsBuilder.build();
         }
 
         protected String validateSinkType(String sinkType) throws IOException {
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index a2271e5..32ebfc9 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -22,7 +22,6 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static 
org.apache.pulsar.functions.utils.SourceConfigUtils.inferMissingArguments;
 import static org.apache.pulsar.functions.utils.Utils.BUILTIN;
 
 import com.beust.jcommander.Parameter;
@@ -38,7 +37,6 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Type;
-import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.LinkedList;
 import java.util.List;
@@ -54,13 +52,10 @@ import org.apache.commons.lang3.text.WordUtils;
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.io.SourceConfig;
-import org.apache.pulsar.functions.utils.SourceConfigUtils;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.utils.io.Connectors;
@@ -296,8 +291,6 @@ public class CmdSources extends CmdBase {
 
         protected SourceConfig sourceConfig;
 
-        protected NarClassLoader classLoader;
-
         private void mergeArgs() {
             if (DEPRECATED_processingGuarantees != null) processingGuarantees 
= DEPRECATED_processingGuarantees;
             if (!isBlank(DEPRECATED_destinationTopicName)) 
destinationTopicName = DEPRECATED_destinationTopicName;
@@ -380,8 +373,6 @@ public class CmdSources extends CmdBase {
                 sourceConfig.setConfigs(parseConfigs(sourceConfigString));
             }
 
-            inferMissingArguments(sourceConfig);
-
             // check if source configs are valid
             validateSourceConfigs(sourceConfig);
         }
@@ -402,23 +393,6 @@ public class CmdSources extends CmdBase {
                     throw new IllegalArgumentException(String.format("Source 
Archive %s does not exist", sourceConfig.getArchive()));
                 }
             }
-
-            try {
-             // Need to load jar and set context class loader before calling
-                String sourcePkgUrl = 
Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive()) ? 
sourceConfig.getArchive() : null;
-                Path archivePath = 
(Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive()) || 
sourceConfig.getArchive().startsWith(BUILTIN)) ? null : new 
File(sourceConfig.getArchive()).toPath();
-                classLoader = SourceConfigUtils.validate(sourceConfig, 
archivePath, sourcePkgUrl, null);
-            } catch (Exception e) {
-                throw new ParameterException(e.getMessage());
-            }
-        }
-
-        protected org.apache.pulsar.functions.proto.Function.FunctionDetails 
createSourceConfigProto2(SourceConfig sourceConfig)
-                throws IOException {
-            org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder 
functionDetailsBuilder
-                    = 
org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
-            
Utils.mergeJson(FunctionsImpl.printJson(SourceConfigUtils.convert(sourceConfig, 
classLoader)), functionDetailsBuilder);
-            return functionDetailsBuilder.build();
         }
 
         protected String validateSourceType(String sourceType) throws 
IOException {
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index aea19fd..cbe5296 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -44,8 +44,6 @@ import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.functions.utils.*;
-import org.apache.pulsar.io.cassandra.CassandraStringSink;
-import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -68,7 +66,7 @@ public class TestCmdSinks {
     private static final String TENANT = "test-tenant";
     private static final String NAMESPACE = "test-namespace";
     private static final String NAME = "test";
-    private static final String CLASS_NAME = 
CassandraStringSink.class.getName();
+    private static final String CLASS_NAME = "SomeRandomClassName";
     private static final String INPUTS = "test-src1,test-src2";
     private static final List<String> INPUTS_LIST;
     static {
@@ -86,8 +84,7 @@ public class TestCmdSinks {
     private static final FunctionConfig.ProcessingGuarantees 
PROCESSING_GUARANTEES
             = FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE;
     private static final Integer PARALLELISM = 1;
-    private static final String JAR_FILE_NAME = "pulsar-io-cassandra.nar";
-    private static final String WRONG_JAR_FILE_NAME = "pulsar-io-twitter.nar";
+    private static final String JAR_FILE_NAME = "dummy.nar";
     private String JAR_FILE_PATH;
     private String WRONG_JAR_PATH;
     private static final Double CPU = 100.0;
@@ -123,7 +120,6 @@ public class TestCmdSinks {
             throw new RuntimeException("Failed to file required test archive: 
" + JAR_FILE_NAME);
         }
         JAR_FILE_PATH = file.getFile();
-        WRONG_JAR_PATH = 
Thread.currentThread().getContextClassLoader().getResource(WRONG_JAR_FILE_NAME).getFile();
         Thread.currentThread().setContextClassLoader(Utils.loadJar(new 
File(JAR_FILE_PATH)));
     }
 
@@ -168,72 +164,6 @@ public class TestCmdSinks {
     }
 
     @Test
-    public void testMissingTenant() throws Exception {
-        SinkConfig sinkConfig = getSinkConfig();
-        sinkConfig.setTenant(PUBLIC_TENANT);
-        testCmdSinkCliMissingArgs(
-                null,
-                NAMESPACE,
-                NAME,
-                INPUTS,
-                TOPIC_PATTERN,
-                CUSTOM_SERDE_INPUT_STRING,
-                PROCESSING_GUARANTEES,
-                PARALLELISM,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sinkConfig
-        );
-    }
-
-    @Test
-    public void testMissingNamespace() throws Exception {
-        SinkConfig sinkConfig = getSinkConfig();
-        sinkConfig.setNamespace(DEFAULT_NAMESPACE);
-        testCmdSinkCliMissingArgs(
-                TENANT,
-                null,
-                NAME,
-                INPUTS,
-                TOPIC_PATTERN,
-                CUSTOM_SERDE_INPUT_STRING,
-                PROCESSING_GUARANTEES,
-                PARALLELISM,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sinkConfig
-        );
-    }
-
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Sink name cannot be null")
-    public void testMissingName() throws Exception {
-        SinkConfig sinkConfig = getSinkConfig();
-        sinkConfig.setName(null);
-        testCmdSinkCliMissingArgs(
-                TENANT,
-                NAMESPACE,
-                null,
-                INPUTS,
-                TOPIC_PATTERN,
-                CUSTOM_SERDE_INPUT_STRING,
-                PROCESSING_GUARANTEES,
-                PARALLELISM,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sinkConfig
-        );
-    }
-
-    @Test
     public void testMissingInput() throws Exception {
         SinkConfig sinkConfig = getSinkConfig();
         sinkConfig.setInputSpecs(new HashMap<>());
@@ -303,27 +233,6 @@ public class TestCmdSinks {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Must specify at least one topic of input via 
topicToSerdeClassName, topicsPattern, topicToSchemaType or inputSpecs")
-    public void testMissingAllInputTopics() throws Exception {
-        SinkConfig sinkConfig = getSinkConfig();
-        testCmdSinkCliMissingArgs(
-                TENANT,
-                NAMESPACE,
-                NAME,
-                null,
-                null,
-                null,
-                PROCESSING_GUARANTEES,
-                PARALLELISM,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sinkConfig
-        );
-    }
-
     @Test
     public void testMissingProcessingGuarantees() throws Exception {
         SinkConfig sinkConfig = getSinkConfig();
@@ -367,51 +276,7 @@ public class TestCmdSinks {
                 sinkConfig
         );
     }
-
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Sink parallelism should positive number")
-    public void testNegativeParallelism() throws Exception {
-        SinkConfig sinkConfig = getSinkConfig();
-        sinkConfig.setParallelism(-1);
-        testCmdSinkCliMissingArgs(
-                TENANT,
-                NAMESPACE,
-                NAME,
-                INPUTS,
-                TOPIC_PATTERN,
-                CUSTOM_SERDE_INPUT_STRING,
-                PROCESSING_GUARANTEES,
-                -1,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sinkConfig
-        );
-    }
-
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Sink parallelism should positive number")
-    public void testZeroParallelism() throws Exception {
-        SinkConfig sinkConfig = getSinkConfig();
-        sinkConfig.setParallelism(0);
-        testCmdSinkCliMissingArgs(
-                TENANT,
-                NAMESPACE,
-                NAME,
-                INPUTS,
-                TOPIC_PATTERN,
-                CUSTOM_SERDE_INPUT_STRING,
-                PROCESSING_GUARANTEES,
-                0,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sinkConfig
-        );
-    }
-
+    
     @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Sink archive not specfied")
     public void testMissingArchive() throws Exception {
         SinkConfig sinkConfig = getSinkConfig();
@@ -434,28 +299,6 @@ public class TestCmdSinks {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Failed to extract sink class from archive")
-    public void testInvalidJarWithNoSource() throws Exception {
-        SinkConfig sinkConfig = getSinkConfig();
-        sinkConfig.setArchive(WRONG_JAR_PATH);
-        testCmdSinkCliMissingArgs(
-                TENANT,
-                NAMESPACE,
-                NAME,
-                INPUTS,
-                TOPIC_PATTERN,
-                CUSTOM_SERDE_INPUT_STRING,
-                PROCESSING_GUARANTEES,
-                PARALLELISM,
-                WRONG_JAR_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sinkConfig
-        );
-    }
-
     @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Sink Archive file /tmp/foo.jar" +
             " does not exist")
     public void testInvalidJar() throws Exception {
@@ -588,36 +431,6 @@ public class TestCmdSinks {
     }
 
     @Test
-    public void testCmdSinkConfigFileMissingTenant() throws Exception {
-        SinkConfig testSinkConfig = getSinkConfig();
-        testSinkConfig.setTenant(null);
-
-        SinkConfig expectedSinkConfig = getSinkConfig();
-        expectedSinkConfig.setTenant(PUBLIC_TENANT);
-        testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
-    }
-
-    @Test
-    public void testCmdSinkConfigFileMissingNamespace() throws Exception {
-        SinkConfig testSinkConfig = getSinkConfig();
-        testSinkConfig.setNamespace(null);
-
-        SinkConfig expectedSinkConfig = getSinkConfig();
-        expectedSinkConfig.setNamespace(DEFAULT_NAMESPACE);
-        testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
-    }
-
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Sink name cannot be null")
-    public void testCmdSinkConfigFileMissingName() throws Exception {
-        SinkConfig testSinkConfig = getSinkConfig();
-        testSinkConfig.setName(null);
-
-        SinkConfig expectedSinkConfig = getSinkConfig();
-        expectedSinkConfig.setName(null);
-        testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
-    }
-
-    @Test
     public void testCmdSinkConfigFileMissingTopicToSerdeClassName() throws 
Exception {
         SinkConfig testSinkConfig = getSinkConfig();
 
@@ -633,19 +446,6 @@ public class TestCmdSinks {
         testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Must specify at least one topic of input via 
topicToSerdeClassName, topicsPattern, topicToSchemaType or inputSpecs")
-    public void testCmdSinkConfigFileMissingAllInput() throws Exception {
-        SinkConfig testSinkConfig = getSinkConfig();
-        testSinkConfig.setInputs(null);
-        testSinkConfig.setTopicToSchemaType(null);
-        testSinkConfig.setTopicToSerdeClassName(null);
-        testSinkConfig.setTopicsPattern(null);
-        testSinkConfig.setInputSpecs(null);
-
-        SinkConfig expectedSinkConfig = getSinkConfig();
-        testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
-    }
-
     @Test
     public void testCmdSinkConfigFileMissingConfig() throws Exception {
         SinkConfig testSinkConfig = getSinkConfig();
@@ -656,26 +456,6 @@ public class TestCmdSinks {
         testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Sink parallelism should positive number")
-    public void testCmdSinkConfigFileZeroParallelism() throws Exception {
-        SinkConfig testSinkConfig = getSinkConfig();
-        testSinkConfig.setParallelism(0);
-
-        SinkConfig expectedSinkConfig = getSinkConfig();
-        expectedSinkConfig.setParallelism(0);
-        testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
-    }
-
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Sink parallelism should positive number")
-    public void testCmdSinkConfigFileNegativeParallelism() throws Exception {
-        SinkConfig testSinkConfig = getSinkConfig();
-        testSinkConfig.setParallelism(-1);
-
-        SinkConfig expectedSinkConfig = getSinkConfig();
-        expectedSinkConfig.setParallelism(-1);
-        testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
-    }
-
     @Test
     public void testCmdSinkConfigFileMissingProcessingGuarantees() throws 
Exception {
         SinkConfig testSinkConfig = getSinkConfig();
@@ -716,16 +496,6 @@ public class TestCmdSinks {
         testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Failed to extract sink class from archive")
-    public void testCmdSinkConfigFileInvalidJarNoSink() throws Exception {
-        SinkConfig testSinkConfig = getSinkConfig();
-        testSinkConfig.setArchive(WRONG_JAR_PATH);
-
-        SinkConfig expectedSinkConfig = getSinkConfig();
-        expectedSinkConfig.setArchive(WRONG_JAR_PATH);
-        testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
-    }
-
     public void testCmdSinkConfigFile(SinkConfig testSinkConfig, SinkConfig 
expectedSinkConfig) throws Exception {
 
         File file = Files.createTempFile("", "").toFile();
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 32a6c60..33d7513 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -70,10 +70,8 @@ public class TestCmdSources {
     private static final FunctionConfig.ProcessingGuarantees 
PROCESSING_GUARANTEES
             = FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE;
     private static final Integer PARALLELISM = 1;
-    private static final String JAR_FILE_NAME = "pulsar-io-twitter.nar";
-    private static final String WRONG_JAR_FILE_NAME = 
"pulsar-io-cassandra.nar";
+    private static final String JAR_FILE_NAME = "dummy.nar";
     private String JAR_FILE_PATH;
-    private String WRONG_JAR_PATH;
     private static final Double CPU = 100.0;
     private static final Long RAM = 1024L * 1024L;
     private static final Long DISK = 1024L * 1024L * 1024L;
@@ -103,7 +101,6 @@ public class TestCmdSources {
         mockStatic(CmdFunctions.class);
         PowerMockito.doNothing().when(localSourceRunner).runCmd();
         JAR_FILE_PATH = 
Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME).getFile();
-        WRONG_JAR_PATH = 
Thread.currentThread().getContextClassLoader().getResource(WRONG_JAR_FILE_NAME).getFile();
         Thread.currentThread().setContextClassLoader(Utils.loadJar(new 
File(JAR_FILE_PATH)));
     }
 
@@ -142,84 +139,6 @@ public class TestCmdSources {
     }
 
     @Test
-    public void testMissingTenant() throws Exception {
-        SourceConfig sourceConfig = getSourceConfig();
-        sourceConfig.setTenant(PUBLIC_TENANT);
-        testCmdSourceCliMissingArgs(
-                null,
-                NAMESPACE,
-                NAME,
-                TOPIC_NAME, SERDE_CLASS_NAME, PROCESSING_GUARANTEES,
-                PARALLELISM,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sourceConfig
-        );
-    }
-
-    @Test
-    public void testMissingNamespace() throws Exception {
-        SourceConfig sourceConfig = getSourceConfig();
-        sourceConfig.setNamespace(DEFAULT_NAMESPACE);
-        testCmdSourceCliMissingArgs(
-                TENANT,
-                null,
-                NAME,
-                TOPIC_NAME, SERDE_CLASS_NAME, PROCESSING_GUARANTEES,
-                PARALLELISM,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sourceConfig
-        );
-    }
-
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Source name cannot be null")
-    public void testMissingName() throws Exception {
-        SourceConfig sourceConfig = getSourceConfig();
-        sourceConfig.setName(null);
-        testCmdSourceCliMissingArgs(
-                TENANT,
-                NAMESPACE,
-                null,
-                TOPIC_NAME, SERDE_CLASS_NAME, PROCESSING_GUARANTEES,
-                PARALLELISM,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sourceConfig
-        );
-    }
-
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Topic name cannot be null")
-    public void testMissingTopicName() throws Exception {
-        SourceConfig sourceConfig = getSourceConfig();
-        sourceConfig.setTopicName(null);
-        testCmdSourceCliMissingArgs(
-                TENANT,
-                NAMESPACE,
-                NAME,
-                null,
-                SERDE_CLASS_NAME,
-                PROCESSING_GUARANTEES,
-                PARALLELISM,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sourceConfig
-        );
-    }
-
-    @Test
     public void testMissingSerdeClassName() throws Exception {
         SourceConfig sourceConfig = getSourceConfig();
         sourceConfig.setSerdeClassName(null);
@@ -278,44 +197,6 @@ public class TestCmdSources {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Source parallelism should positive number")
-    public void testNegativeParallelism() throws Exception {
-        SourceConfig sourceConfig = getSourceConfig();
-        sourceConfig.setParallelism(-1);
-        testCmdSourceCliMissingArgs(
-                TENANT,
-                NAMESPACE,
-                NAME,
-                TOPIC_NAME, SERDE_CLASS_NAME, PROCESSING_GUARANTEES,
-                -1,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sourceConfig
-        );
-    }
-
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Source parallelism should positive number")
-    public void testZeroParallelism() throws Exception {
-        SourceConfig sourceConfig = getSourceConfig();
-        sourceConfig.setParallelism(0);
-        testCmdSourceCliMissingArgs(
-                TENANT,
-                NAMESPACE,
-                NAME,
-                TOPIC_NAME, SERDE_CLASS_NAME, PROCESSING_GUARANTEES,
-                0,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sourceConfig
-        );
-    }
-
     @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Source archive not specfied")
     public void testMissingArchive() throws Exception {
         SourceConfig sourceConfig = getSourceConfig();
@@ -355,25 +236,6 @@ public class TestCmdSources {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Failed to extract source class from archive")
-    public void testInvalidJarWithNoSource() throws Exception {
-        SourceConfig sourceConfig = getSourceConfig();
-        sourceConfig.setArchive(WRONG_JAR_PATH);
-        testCmdSourceCliMissingArgs(
-                TENANT,
-                NAMESPACE,
-                NAME,
-                TOPIC_NAME, SERDE_CLASS_NAME, PROCESSING_GUARANTEES,
-                PARALLELISM,
-                WRONG_JAR_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sourceConfig
-        );
-    }
-
     @Test
     public void testMissingConfig() throws Exception {
         SourceConfig sourceConfig = getSourceConfig();
@@ -474,46 +336,6 @@ public class TestCmdSources {
     }
 
     @Test
-    public void testCmdSourceConfigFileMissingTenant() throws Exception {
-        SourceConfig testSourceConfig = getSourceConfig();
-        testSourceConfig.setTenant(null);
-
-        SourceConfig expectedSourceConfig = getSourceConfig();
-        expectedSourceConfig.setTenant(PUBLIC_TENANT);
-        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
-    }
-
-    @Test
-    public void testCmdSourceConfigFileMissingNamespace() throws Exception {
-        SourceConfig testSourceConfig = getSourceConfig();
-        testSourceConfig.setNamespace(null);
-
-        SourceConfig expectedSourceConfig = getSourceConfig();
-        expectedSourceConfig.setNamespace(DEFAULT_NAMESPACE);
-        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
-    }
-
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Source name cannot be null")
-    public void testCmdSourceConfigFileMissingName() throws Exception {
-        SourceConfig testSourceConfig = getSourceConfig();
-        testSourceConfig.setName(null);
-
-        SourceConfig expectedSourceConfig = getSourceConfig();
-        expectedSourceConfig.setName(null);
-        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
-    }
-
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Topic name cannot be null")
-    public void testCmdSourceConfigFileMissingTopicName() throws Exception {
-        SourceConfig testSourceConfig = getSourceConfig();
-        testSourceConfig.setTopicName(null);
-
-        SourceConfig expectedSourceConfig = getSourceConfig();
-        expectedSourceConfig.setTopicName(null);
-        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
-    }
-
-    @Test
     public void testCmdSourceConfigFileMissingSerdeClassname() throws 
Exception {
         SourceConfig testSourceConfig = getSourceConfig();
         testSourceConfig.setSerdeClassName(null);
@@ -533,26 +355,6 @@ public class TestCmdSources {
         testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Source parallelism should positive number")
-    public void testCmdSourceConfigFileZeroParallelism() throws Exception {
-        SourceConfig testSourceConfig = getSourceConfig();
-        testSourceConfig.setParallelism(0);
-
-        SourceConfig expectedSourceConfig = getSourceConfig();
-        expectedSourceConfig.setParallelism(0);
-        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
-    }
-
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Source parallelism should positive number")
-    public void testCmdSourceConfigFileNegativeParallelism() throws Exception {
-        SourceConfig testSourceConfig = getSourceConfig();
-        testSourceConfig.setParallelism(-1);
-
-        SourceConfig expectedSourceConfig = getSourceConfig();
-        expectedSourceConfig.setParallelism(-1);
-        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
-    }
-
     @Test
     public void testCmdSourceConfigFileMissingProcessingGuarantees() throws 
Exception {
         SourceConfig testSourceConfig = getSourceConfig();
@@ -593,16 +395,6 @@ public class TestCmdSources {
         testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, 
expectedExceptionsMessageRegExp = "Failed to extract source class from archive")
-    public void testCmdSourceConfigFileInvalidJarNoSource() throws Exception {
-        SourceConfig testSourceConfig = getSourceConfig();
-        testSourceConfig.setArchive(WRONG_JAR_PATH);
-
-        SourceConfig expectedSourceConfig = getSourceConfig();
-        expectedSourceConfig.setArchive(WRONG_JAR_PATH);
-        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
-    }
-
     public void testCmdSourceConfigFile(SourceConfig testSourceConfig, 
SourceConfig expectedSourceConfig) throws Exception {
 
         File file = Files.createTempFile("", "").toFile();
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 558f33b..2cbd460 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -56,10 +56,10 @@ public class SinkConfigUtils {
 
         FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
 
-        boolean isBuiltin = sinkConfig.getArchive().startsWith(Utils.BUILTIN);
+        boolean isBuiltin = 
!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && 
sinkConfig.getArchive().startsWith(Utils.BUILTIN);
 
         if (!isBuiltin) {
-            if (sinkConfig.getArchive().startsWith(Utils.FILE)) {
+            if 
(!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && 
sinkConfig.getArchive().startsWith(Utils.FILE)) {
                 if (isBlank(sinkConfig.getClassName())) {
                     throw new IllegalArgumentException("Class-name must be 
present for archive with file-url");
                 }
@@ -286,9 +286,7 @@ public class SinkConfigUtils {
 
         NarClassLoader classLoader = Utils.extractNarClassLoader(archivePath, 
functionPkgUrl, uploadedInputStreamAsFile);
         if (classLoader == null) {
-            // This happens at the cli for builtin. There is no need to check 
this since
-            // the actual check will be done at serverside
-            return null;
+            throw new IllegalArgumentException("Sink Package is not provided");
         }
 
         String sinkClassName;
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 07d593d..6ce1db3 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -210,9 +210,7 @@ public class SourceConfigUtils {
 
         NarClassLoader classLoader = Utils.extractNarClassLoader(archivePath, 
functionPkgUrl, uploadedInputStreamAsFile);
         if (classLoader == null) {
-            // This happens at the cli for builtin. There is no need to check 
this since
-            // the actual check will be done at serverside
-            return null;
+            throw new IllegalArgumentException("Source Package is not 
provided");
         }
 
         String sourceClassName;
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
index c6feb5a..8780671 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
@@ -46,8 +46,7 @@ public class ConnectorUtils {
     /**
      * Extract the Pulsar IO Source class from a connector archive.
      */
-    public static String getIOSourceClass(ClassLoader classLoader) throws 
IOException {
-        NarClassLoader ncl = (NarClassLoader) classLoader;
+    public static String getIOSourceClass(NarClassLoader ncl) throws 
IOException {
         String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
 
         ConnectorDefinition conf = 
ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index 24bbd6e..eb67777 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -129,6 +129,50 @@
       </exclusions>
     </dependency>
 
+    <!-- functions related dependencies (end) -->
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-cassandra</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-twitter</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
+  <build>
+    <plugins>
+      <!-- this task will copy nar files to resources for the test to work -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>compile</phase>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <configuration>
+              <tasks>
+                <echo>copy test sink package</echo>
+                <mkdir dir="${basedir}/src/test/resources"/>
+                <copy 
file="${basedir}/../../pulsar-io/cassandra/target/pulsar-io-cassandra-${project.version}.nar"
 tofile="${basedir}/src/test/resources/pulsar-io-cassandra.nar"/>
+                <echo>copy test source package</echo>
+                <mkdir dir="${basedir}/src/test/resources"/>
+                <copy 
file="${basedir}/../../pulsar-io/twitter/target/pulsar-io-twitter-${project.version}.nar"
 tofile="${basedir}/src/test/resources/pulsar-io-twitter.nar"/>
+              </tasks>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
 </project>
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 89e3b48..914a691 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -1058,7 +1058,7 @@ public class FunctionsImpl {
 
     private FunctionDetails validateUpdateRequestParams(String tenant, String 
namespace, String componentName,
             String functionDetailsJson, String componentConfigJson, String 
componentType,
-            String functionPkgUrl, File uploadedInputStreamAsFile) throws 
IOException, URISyntaxException {
+            String functionPkgUrl, File uploadedInputStreamAsFile) throws 
IOException {
         if (tenant == null) {
             throw new IllegalArgumentException("Tenant is not provided");
         }
@@ -1071,12 +1071,14 @@ public class FunctionsImpl {
 
         if (componentType.equals(FUNCTION) && !isEmpty(componentConfigJson)) {
             FunctionConfig functionConfig = new 
Gson().fromJson(componentConfigJson, FunctionConfig.class);
+            FunctionConfigUtils.inferMissingArguments(functionConfig);
             ClassLoader clsLoader = 
FunctionConfigUtils.validate(functionConfig, functionPkgUrl, 
uploadedInputStreamAsFile);
             return FunctionConfigUtils.convert(functionConfig, clsLoader);
         }
         if (componentType.equals(SOURCE)) {
             Path archivePath = null;
             SourceConfig sourceConfig = new 
Gson().fromJson(componentConfigJson, SourceConfig.class);
+            SourceConfigUtils.inferMissingArguments(sourceConfig);
             if (!StringUtils.isEmpty(sourceConfig.getArchive())) {
                 String builtinArchive = sourceConfig.getArchive();
                 if (builtinArchive.startsWith(BUILTIN)) {
@@ -1094,6 +1096,7 @@ public class FunctionsImpl {
         if (componentType.equals(SINK)) {
             Path archivePath = null;
             SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, 
SinkConfig.class);
+            SinkConfigUtils.inferMissingArguments(sinkConfig);
             if (!StringUtils.isEmpty(sinkConfig.getArchive())) {
                 String builtinArchive = sinkConfig.getArchive();
                 if (builtinArchive.startsWith(BUILTIN)) {
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index cd55916..1a7b51a 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -165,6 +165,7 @@ public class FunctionApiV2ResourceTest {
                 outputSerdeClassName,
             className,
             parallelism,
+                null,
                 "Tenant is not provided");
     }
 
@@ -181,6 +182,7 @@ public class FunctionApiV2ResourceTest {
                 outputSerdeClassName,
             className,
             parallelism,
+                null,
                 "Namespace is not provided");
     }
 
@@ -197,6 +199,7 @@ public class FunctionApiV2ResourceTest {
                 outputSerdeClassName,
             className,
             parallelism,
+                null,
                 "Function Name is not provided");
     }
 
@@ -213,6 +216,7 @@ public class FunctionApiV2ResourceTest {
                 outputSerdeClassName,
             className,
             parallelism,
+                null,
                 "Function Package is not provided");
     }
 
@@ -229,6 +233,7 @@ public class FunctionApiV2ResourceTest {
                 outputSerdeClassName,
                 className,
                 parallelism,
+                null,
                 "No input topic(s) specified for the function");
     }
 
@@ -245,6 +250,7 @@ public class FunctionApiV2ResourceTest {
                 outputSerdeClassName,
             className,
             parallelism,
+                null,
                 "Function Package is not provided");
     }
 
@@ -261,9 +267,96 @@ public class FunctionApiV2ResourceTest {
                 outputSerdeClassName,
             null,
             parallelism,
+                null,
                 "Function classname cannot be null");
     }
 
+    @Test
+    public void testRegisterFunctionWrongClassName() throws IOException {
+        testRegisterFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                mockedInputStream,
+                topicsToSerDeClassName,
+                mockedFormData,
+                outputTopic,
+                outputSerdeClassName,
+                "UnknownClass",
+                parallelism,
+                null,
+                "User class must be in class path");
+    }
+
+    @Test
+    public void testRegisterFunctionWrongParallelism() throws IOException {
+        testRegisterFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                mockedInputStream,
+                topicsToSerDeClassName,
+                mockedFormData,
+                outputTopic,
+                outputSerdeClassName,
+                className,
+                -2,
+                null,
+                "Function parallelism should positive number");
+    }
+
+    @Test
+    public void testRegisterFunctionSameInputOutput() throws IOException {
+        testRegisterFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                mockedInputStream,
+                topicsToSerDeClassName,
+                mockedFormData,
+                topicsToSerDeClassName.keySet().iterator().next(),
+                outputSerdeClassName,
+                className,
+                parallelism,
+                null,
+                "Output topic " + 
topicsToSerDeClassName.keySet().iterator().next()
+                        + " is also being used as an input topic (topics must 
be one or the other)");
+    }
+
+    @Test
+    public void testRegisterFunctionWrongOutputTopic() throws IOException {
+        testRegisterFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                mockedInputStream,
+                topicsToSerDeClassName,
+                mockedFormData,
+                function + "-output-topic/test:",
+                outputSerdeClassName,
+                className,
+                parallelism,
+                null,
+                "Output topic " + function + "-output-topic/test:" + " is 
invalid");
+    }
+
+    @Test
+    public void testRegisterFunctionHttpUrl() throws IOException {
+        testRegisterFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                null,
+                topicsToSerDeClassName,
+                null,
+                outputTopic,
+                outputSerdeClassName,
+                className,
+                parallelism,
+                "http://localhost:1234/test";,
+                "Corrupted Jar File");
+    }
+
     private void testRegisterFunctionMissingArguments(
             String tenant,
             String namespace,
@@ -275,6 +368,7 @@ public class FunctionApiV2ResourceTest {
             String outputSerdeClassName,
             String className,
             Integer parallelism,
+            String functionPkgUrl,
             String errorExpected) throws IOException {
         FunctionConfig functionConfig = new FunctionConfig();
         if (tenant != null) {
@@ -309,7 +403,7 @@ public class FunctionApiV2ResourceTest {
                 function,
                 inputStream,
                 details,
-                null,
+                functionPkgUrl,
                 null,
                 new Gson().toJson(functionConfig),
                 FunctionsImpl.FUNCTION,
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
index 7bf1a46..57e3cc5 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
@@ -26,7 +26,6 @@ import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.core.config.Configurator;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -38,8 +37,7 @@ import org.apache.pulsar.functions.utils.SinkConfigUtils;
 import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.cassandra.CassandraStringSink;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.mockito.Mockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -53,8 +51,10 @@ import org.testng.annotations.Test;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.URL;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -62,7 +62,6 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -78,7 +77,7 @@ import static org.testng.Assert.assertEquals;
  * Unit test of {@link SinkApiV2Resource}.
  */
 @PrepareForTest({Utils.class, SinkConfigUtils.class})
-@PowerMockIgnore({ "javax.management.*", "javax.ws.*", 
"org.apache.logging.log4j.*" })
+@PowerMockIgnore({ "javax.management.*", "javax.ws.*", 
"org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
 @Slf4j
 public class SinkApiV2ResourceTest {
 
@@ -87,16 +86,6 @@ public class SinkApiV2ResourceTest {
         return new org.powermock.modules.testng.PowerMockObjectFactory();
     }
 
-    private static final class TestSink implements Sink<String> {
-
-        @Override public void open(final Map<String, Object> config, 
SinkContext sinkContext) {
-        }
-
-        @Override public void write(Record<String> record) { }
-
-        @Override public void close() { }
-    }
-
     private static final String tenant = "test-tenant";
     private static final String namespace = "test-namespace";
     private static final String sink = "test-sink";
@@ -105,9 +94,12 @@ public class SinkApiV2ResourceTest {
         
topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", 
TopicSchema.DEFAULT_SERDE);
     }
     private static final String subscriptionName = "test-subscription";
-    private static final String className = TestSink.class.getName();
-    private static final String serde = TopicSchema.DEFAULT_SERDE;
+    private static final String className = 
CassandraStringSink.class.getName();
     private static final int parallelism = 1;
+    private static final String JAR_FILE_NAME = "pulsar-io-cassandra.nar";
+    private static final String INVALID_JAR_FILE_NAME = 
"pulsar-io-twitter.nar";
+    private String JAR_FILE_PATH;
+    private String INVALID_JAR_FILE_PATH;
 
     private WorkerService mockedWorkerService;
     private FunctionMetaDataManager mockedManager;
@@ -135,6 +127,14 @@ public class SinkApiV2ResourceTest {
         
when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
         when(mockedWorkerService.isInitialized()).thenReturn(true);
 
+        URL file = 
Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME);
+        if (file == null)  {
+            throw new RuntimeException("Failed to file required test archive: 
" + JAR_FILE_NAME);
+        }
+        JAR_FILE_PATH = file.getFile();
+        INVALID_JAR_FILE_PATH = 
Thread.currentThread().getContextClassLoader().getResource(INVALID_JAR_FILE_NAME).getFile();
+
+
         // worker config
         WorkerConfig workerConfig = new WorkerConfig()
             .setWorkerId("test")
@@ -146,9 +146,6 @@ public class SinkApiV2ResourceTest {
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
-        mockStatic(SinkConfigUtils.class);
-        when(SinkConfigUtils.convert(anyObject(), 
anyObject())).thenReturn(FunctionDetails.newBuilder().build());
-        when(SinkConfigUtils.validate(any(), any(), any(), 
any())).thenReturn(null);
         
Mockito.doReturn("Sink").when(this.resource).calculateSubjectType(any());
     }
 
@@ -167,6 +164,7 @@ public class SinkApiV2ResourceTest {
             topicsToSerDeClassName,
             className,
             parallelism,
+                null,
                 "Tenant is not provided");
     }
 
@@ -181,6 +179,7 @@ public class SinkApiV2ResourceTest {
             topicsToSerDeClassName,
             className,
             parallelism,
+                null,
                 "Namespace is not provided");
     }
 
@@ -195,6 +194,7 @@ public class SinkApiV2ResourceTest {
             topicsToSerDeClassName,
             className,
             parallelism,
+                null,
                 "Sink Name is not provided");
     }
 
@@ -209,7 +209,8 @@ public class SinkApiV2ResourceTest {
             topicsToSerDeClassName,
             className,
             parallelism,
-                "Function Package is not provided");
+                null,
+                "Sink Package is not provided");
     }
 
     @Test
@@ -223,7 +224,84 @@ public class SinkApiV2ResourceTest {
             topicsToSerDeClassName,
             className,
             parallelism,
-                "Function Package is not provided");
+                null,
+                "zip file is empty");
+    }
+
+    @Test
+    public void testRegisterSinkInvalidJarNoSink() throws IOException {
+        FileInputStream inputStream = new 
FileInputStream(INVALID_JAR_FILE_PATH);
+        testRegisterSinkMissingArguments(
+                tenant,
+                namespace,
+                sink,
+                inputStream,
+                null,
+                topicsToSerDeClassName,
+                className,
+                parallelism,
+                null,
+                "Failed to extract sink class from archive");
+    }
+
+    @Test
+    public void testRegisterSinkNoInput() throws IOException {
+        testRegisterSinkMissingArguments(
+                tenant,
+                namespace,
+                sink,
+                mockedInputStream,
+                mockedFormData,
+                null,
+                className,
+                parallelism,
+                null,
+                "Must specify at least one topic of input via 
topicToSerdeClassName, topicsPattern, topicToSchemaType or inputSpecs");
+    }
+
+    @Test
+    public void testRegisterSinkNegativeParallelism() throws IOException {
+        testRegisterSinkMissingArguments(
+                tenant,
+                namespace,
+                sink,
+                mockedInputStream,
+                mockedFormData,
+                topicsToSerDeClassName,
+                className,
+                -2,
+                null,
+                "Sink parallelism should positive number");
+    }
+
+    @Test
+    public void testRegisterSinkZeroParallelism() throws IOException {
+        testRegisterSinkMissingArguments(
+                tenant,
+                namespace,
+                sink,
+                mockedInputStream,
+                mockedFormData,
+                topicsToSerDeClassName,
+                className,
+                0,
+                null,
+                "Sink parallelism should positive number");
+    }
+
+    @Test
+    public void testRegisterSinkHttpUrl() throws IOException {
+        testRegisterSinkMissingArguments(
+                tenant,
+                namespace,
+                sink,
+                null,
+                null,
+                topicsToSerDeClassName,
+                className,
+                parallelism,
+                "http://localhost:1234/test";,
+                "Corrupt User PackageFile " + "http://localhost:1234/test with 
error Connection refused (Connection refused)");
     }
 
     private void testRegisterSinkMissingArguments(
@@ -235,6 +313,7 @@ public class SinkApiV2ResourceTest {
             Map<String, String> inputTopicMap,
             String className,
             Integer parallelism,
+            String pkgUrl,
             String errorExpected) throws IOException {
         SinkConfig sinkConfig = new SinkConfig();
         if (tenant != null) {
@@ -262,7 +341,7 @@ public class SinkApiV2ResourceTest {
                 sink,
                 inputStream,
                 details,
-                null,
+                pkgUrl,
                 null,
                 new Gson().toJson(sinkConfig),
                 FunctionsImpl.SINK,
@@ -272,7 +351,7 @@ public class SinkApiV2ResourceTest {
         Assert.assertEquals(((ErrorData) response.getEntity()).reason, new 
ErrorData(errorExpected).reason);
     }
 
-    private Response registerDefaultSink() {
+    private Response registerDefaultSink() throws IOException {
         SinkConfig sinkConfig = new SinkConfig();
         sinkConfig.setTenant(tenant);
         sinkConfig.setNamespace(namespace);
@@ -284,7 +363,7 @@ public class SinkApiV2ResourceTest {
             tenant,
             namespace,
                 sink,
-            mockedInputStream,
+                new FileInputStream(JAR_FILE_PATH),
             mockedFormData,
             null,
             null,
@@ -440,7 +519,7 @@ public class SinkApiV2ResourceTest {
             topicsToSerDeClassName,
             className,
             parallelism,
-                "Function Package is not provided");
+                "Sink Package is not provided");
     }
 
     @Test
@@ -454,7 +533,7 @@ public class SinkApiV2ResourceTest {
             topicsToSerDeClassName,
             className,
             parallelism,
-                "Function Package is not provided");
+                "zip file is empty");
     }
 
     private void testUpdateSinkMissingArguments(
@@ -518,7 +597,7 @@ public class SinkApiV2ResourceTest {
             tenant,
             namespace,
                 sink,
-            mockedInputStream,
+                new FileInputStream(JAR_FILE_PATH),
             mockedFormData,
             null,
             null,
@@ -577,8 +656,7 @@ public class SinkApiV2ResourceTest {
     public void testUpdateSinkWithUrl() throws IOException {
         Configurator.setRootLevel(Level.DEBUG);
 
-        String fileLocation = 
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        String filePackageUrl = "file://" + fileLocation;
+        String filePackageUrl = "file://" + JAR_FILE_PATH;
 
         SinkConfig sinkConfig = new SinkConfig();
         sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
index 2002b3a..3a4785f 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
@@ -26,7 +26,6 @@ import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.core.config.Configurator;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function.*;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
@@ -36,8 +35,7 @@ import org.apache.pulsar.functions.utils.SourceConfigUtils;
 import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.io.core.Source;
-import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.twitter.TwitterFireHose;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.mockito.Mockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -51,9 +49,9 @@ import org.testng.annotations.Test;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import java.io.*;
+import java.net.URL;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.Matchers.*;
@@ -69,8 +67,8 @@ import static org.testng.Assert.assertEquals;
 /**
  * Unit test of {@link SourceApiV2Resource}.
  */
-@PrepareForTest({Utils.class,SourceConfigUtils.class})
-@PowerMockIgnore({ "javax.management.*", "javax.ws.*", 
"org.apache.logging.log4j.*" })
+@PrepareForTest({Utils.class})
+@PowerMockIgnore({ "javax.management.*", "javax.ws.*", 
"org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
 @Slf4j
 public class SourceApiV2ResourceTest {
 
@@ -79,24 +77,17 @@ public class SourceApiV2ResourceTest {
         return new org.powermock.modules.testng.PowerMockObjectFactory();
     }
 
-    private static final class TestSource implements Source<String> {
-
-        @Override public void open(final Map<String, Object> config, 
SourceContext sourceContext) {
-        }
-
-        @Override public Record<String> read() { return null; }
-
-        @Override public void close() { }
-    }
-
     private static final String tenant = "test-tenant";
     private static final String namespace = "test-namespace";
     private static final String source = "test-source";
     private static final String outputTopic = "test-output-topic";
     private static final String outputSerdeClassName = 
TopicSchema.DEFAULT_SERDE;
-    private static final String className = TestSource.class.getName();
-    private static final String serde = TopicSchema.DEFAULT_SERDE;
+    private static final String className = TwitterFireHose.class.getName();
     private static final int parallelism = 1;
+    private static final String JAR_FILE_NAME = "pulsar-io-twitter.nar";
+    private static final String INVALID_JAR_FILE_NAME = 
"pulsar-io-cassandra.nar";
+    private String JAR_FILE_PATH;
+    private String INVALID_JAR_FILE_PATH;
 
     private WorkerService mockedWorkerService;
     private FunctionMetaDataManager mockedManager;
@@ -124,6 +115,13 @@ public class SourceApiV2ResourceTest {
         
when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
         when(mockedWorkerService.isInitialized()).thenReturn(true);
 
+        URL file = 
Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME);
+        if (file == null)  {
+            throw new RuntimeException("Failed to file required test archive: 
" + JAR_FILE_NAME);
+        }
+        JAR_FILE_PATH = file.getFile();
+        INVALID_JAR_FILE_PATH = 
Thread.currentThread().getContextClassLoader().getResource(INVALID_JAR_FILE_NAME).getFile();
+
         // worker config
         WorkerConfig workerConfig = new WorkerConfig()
             .setWorkerId("test")
@@ -135,9 +133,6 @@ public class SourceApiV2ResourceTest {
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
-        mockStatic(SourceConfigUtils.class);
-        when(SourceConfigUtils.convert(anyObject(), 
anyObject())).thenReturn(FunctionDetails.newBuilder().build());
-        when(SourceConfigUtils.validate(any(), any(), any(), 
any())).thenReturn(null);
         
Mockito.doReturn("Source").when(this.resource).calculateSubjectType(any());
     }
 
@@ -157,6 +152,7 @@ public class SourceApiV2ResourceTest {
                 outputSerdeClassName,
             className,
             parallelism,
+                null,
                 "Tenant is not provided");
     }
 
@@ -172,11 +168,12 @@ public class SourceApiV2ResourceTest {
                 outputSerdeClassName,
             className,
             parallelism,
+                null,
                 "Namespace is not provided");
     }
 
     @Test
-    public void testRegisterSourceMissingFunctionName() throws IOException {
+    public void testRegisterSourceMissingSourceName() throws IOException {
         testRegisterSourceMissingArguments(
             tenant,
             namespace,
@@ -187,6 +184,7 @@ public class SourceApiV2ResourceTest {
                 outputSerdeClassName,
             className,
             parallelism,
+                null,
                 "Source Name is not provided");
     }
 
@@ -202,7 +200,8 @@ public class SourceApiV2ResourceTest {
                 outputSerdeClassName,
             className,
             parallelism,
-                "Function Package is not provided");
+                null,
+                "Source Package is not provided");
     }
 
     @Test
@@ -217,7 +216,58 @@ public class SourceApiV2ResourceTest {
                 outputSerdeClassName,
             className,
             parallelism,
-                "Function Package is not provided");
+                null,
+                "zip file is empty");
+    }
+
+    @Test
+    public void testRegisterSourceInvalidJarWithNoSource() throws IOException {
+        FileInputStream inputStream = new 
FileInputStream(INVALID_JAR_FILE_PATH);
+        testRegisterSourceMissingArguments(
+                tenant,
+                namespace,
+                source,
+                inputStream,
+                null,
+                outputTopic,
+                outputSerdeClassName,
+                className,
+                parallelism,
+                null,
+                "Failed to extract source class from archive");
+    }
+
+    @Test
+    public void testRegisterSourceNoOutputTopic() throws IOException {
+        FileInputStream inputStream = new FileInputStream(JAR_FILE_PATH);
+        testRegisterSourceMissingArguments(
+                tenant,
+                namespace,
+                source,
+                inputStream,
+                mockedFormData,
+                null,
+                outputSerdeClassName,
+                className,
+                parallelism,
+                null,
+                "Topic name cannot be null");
+    }
+
+    @Test
+    public void testRegisterSourceHttpUrl() throws IOException {
+        testRegisterSourceMissingArguments(
+                tenant,
+                namespace,
+                source,
+                null,
+                null,
+                outputTopic,
+                outputSerdeClassName,
+                className,
+                parallelism,
+                "http://localhost:1234/test";,
+                "Corrupt User PackageFile " + "http://localhost:1234/test with 
error Connection refused (Connection refused)");
     }
 
     private void testRegisterSourceMissingArguments(
@@ -230,7 +280,8 @@ public class SourceApiV2ResourceTest {
             String outputSerdeClassName,
             String className,
             Integer parallelism,
-            String errorExpected) throws IOException {
+            String pkgUrl,
+            String errorExpected) {
         SourceConfig sourceConfig = new SourceConfig();
         if (tenant != null) {
             sourceConfig.setTenant(tenant);
@@ -260,7 +311,7 @@ public class SourceApiV2ResourceTest {
                 function,
                 inputStream,
                 details,
-                null,
+                pkgUrl,
                 null,
                 new Gson().toJson(sourceConfig),
                 FunctionsImpl.SOURCE,
@@ -270,7 +321,7 @@ public class SourceApiV2ResourceTest {
         Assert.assertEquals(((ErrorData) response.getEntity()).reason, new 
ErrorData(errorExpected).reason);
     }
 
-    private Response registerDefaultSource() {
+    private Response registerDefaultSource() throws IOException {
         SourceConfig sourceConfig = new SourceConfig();
         sourceConfig.setTenant(tenant);
         sourceConfig.setNamespace(namespace);
@@ -283,7 +334,7 @@ public class SourceApiV2ResourceTest {
             tenant,
             namespace,
                 source,
-            mockedInputStream,
+            new FileInputStream(JAR_FILE_PATH),
             mockedFormData,
             null,
             null,
@@ -443,7 +494,7 @@ public class SourceApiV2ResourceTest {
                 outputSerdeClassName,
             className,
             parallelism,
-                "Function Package is not provided");
+                "Source Package is not provided");
     }
 
     @Test
@@ -458,7 +509,52 @@ public class SourceApiV2ResourceTest {
                 outputSerdeClassName,
             className,
             parallelism,
-                "Function Package is not provided");
+                "zip file is empty");
+    }
+
+    @Test
+    public void testUpdateSourceMissingTopicName() throws IOException {
+        testUpdateSourceMissingArguments(
+                tenant,
+                namespace,
+                source,
+                mockedInputStream,
+                mockedFormData,
+                null,
+                outputSerdeClassName,
+                className,
+                parallelism,
+                "Topic name cannot be null");
+    }
+
+    @Test
+    public void testUpdateSourceNegativeParallelism() throws IOException {
+        testUpdateSourceMissingArguments(
+                tenant,
+                namespace,
+                source,
+                mockedInputStream,
+                mockedFormData,
+                outputTopic,
+                outputSerdeClassName,
+                className,
+                -2,
+                "Source parallelism should positive number");
+    }
+
+    @Test
+    public void testUpdateSourceZeroParallelism() throws IOException {
+        testUpdateSourceMissingArguments(
+                tenant,
+                namespace,
+                source,
+                mockedInputStream,
+                mockedFormData,
+                outputTopic,
+                outputSerdeClassName,
+                className,
+                0,
+                "Source parallelism should positive number");
     }
 
     private void testUpdateSourceMissingArguments(
@@ -527,7 +623,7 @@ public class SourceApiV2ResourceTest {
             tenant,
             namespace,
                 source,
-            mockedInputStream,
+                new FileInputStream(JAR_FILE_PATH),
             mockedFormData,
             null,
             null,
@@ -586,8 +682,7 @@ public class SourceApiV2ResourceTest {
     public void testUpdateSourceWithUrl() throws IOException {
         Configurator.setRootLevel(Level.DEBUG);
 
-        String fileLocation = 
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        String filePackageUrl = "file://" + fileLocation;
+        String filePackageUrl = "file://" + JAR_FILE_PATH;
 
         SourceConfig sourceConfig = new SourceConfig();
         sourceConfig.setTopicName(outputTopic);

Reply via email to