This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f36c43f Move all validation/inferring missing args to serverside (#2907) f36c43f is described below commit f36c43f3592ce451176a45076db2487260aaabe2 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Fri Nov 2 14:44:12 2018 -0700 Move all validation/inferring missing args to serverside (#2907) * Move all validation/inferring missing args to serverside * Moved tests to serverside * Fixed all tests * Moved tests from admin to backend * remove unused var * Make it explicitly narclassloader * Dont copy nar files * Copy nar files * Some tests worjing * Do not mock ConnectorIo * Fix build * Some more enhancements to the tests * Ignore io packages in powermock * Fixed unittests * Remove unused stuff * Move changes from client side to serverside * More serverside tests * cleanup * Fixed unittests --- .../apache/pulsar/admin/cli/CmdFunctionsTest.java | 257 --------------------- pulsar-client-tools/pom.xml | 34 +-- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 17 +- .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 26 --- .../org/apache/pulsar/admin/cli/CmdSources.java | 26 --- .../org/apache/pulsar/admin/cli/TestCmdSinks.java | 236 +------------------ .../apache/pulsar/admin/cli/TestCmdSources.java | 210 +---------------- .../pulsar/functions/utils/SinkConfigUtils.java | 8 +- .../pulsar/functions/utils/SourceConfigUtils.java | 4 +- .../pulsar/functions/utils/io/ConnectorUtils.java | 3 +- pulsar-functions/worker/pom.xml | 44 ++++ .../functions/worker/rest/api/FunctionsImpl.java | 5 +- .../rest/api/v2/FunctionApiV2ResourceTest.java | 96 +++++++- .../worker/rest/api/v2/SinkApiV2ResourceTest.java | 138 ++++++++--- .../rest/api/v2/SourceApiV2ResourceTest.java | 161 ++++++++++--- 15 files changed, 390 insertions(+), 875 deletions(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index 7cd870f..077a742 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -274,38 +274,6 @@ public class CmdFunctionsTest { } @Test - public void testCreateFunctionWithHttpUrl() throws Exception { - String fnName = TEST_NAME + "-function"; - String inputTopicName = TEST_NAME + "-input-topic"; - String outputTopicName = TEST_NAME + "-output-topic"; - - ConsoleOutputCapturer consoleOutputCapturer = new ConsoleOutputCapturer(); - consoleOutputCapturer.start(); - - final String url = "http://localhost:1234/test"; - cmd.run(new String[] { - "create", - "--name", fnName, - "--inputs", inputTopicName, - "--output", outputTopicName, - "--jar", url, - "--tenant", "sample", - "--namespace", "ns1", - "--className", DummyFunction.class.getName(), - }); - - CreateFunction creater = cmd.getCreater(); - - consoleOutputCapturer.stop(); - String output = consoleOutputCapturer.getStderr(); - - assertTrue(output.contains("Corrupted Jar File")); - assertEquals(fnName, creater.getFunctionName()); - assertEquals(inputTopicName, creater.getInputs()); - assertEquals(outputTopicName, creater.getOutput()); - } - - @Test public void testGetFunctionStatus() throws Exception { String fnName = TEST_NAME + "-function"; String tenant = "sample"; @@ -347,60 +315,6 @@ public class CmdFunctionsTest { } @Test - public void testCreateSink() throws Exception { - String fnName = TEST_NAME + "-function"; - String inputTopicName = TEST_NAME + "-input-topic"; - - - ConsoleOutputCapturer consoleOutputCapturer = new ConsoleOutputCapturer(); - consoleOutputCapturer.start(); - - final String url = "http://localhost:1234/test"; - cmdSinks.run(new String[] { - "create", - "--name", fnName, - "--inputs", inputTopicName, - "--archive", url, - "--tenant", "sample", - "--namespace", "ns1" - }); - - CreateSink creater = cmdSinks.getCreateSink(); - - consoleOutputCapturer.stop(); - String output = consoleOutputCapturer.getStderr(); - - assertTrue(output.contains("Corrupt User PackageFile " + url)); - assertEquals(url, creater.archive); - } - - @Test - public void testCreateSource() throws Exception { - String fnName = TEST_NAME + "-function"; - - ConsoleOutputCapturer consoleOutputCapturer = new ConsoleOutputCapturer(); - consoleOutputCapturer.start(); - - final String url = "http://localhost:1234/test"; - cmdSources.run(new String[] { - "create", - "--name", fnName, - "--archive", url, - "--tenant", "sample", - "--namespace", "ns1", - "--destination-topic-name", "input", - }); - - CreateSource creater = cmdSources.getCreateSource(); - - consoleOutputCapturer.stop(); - String output = consoleOutputCapturer.getStderr(); - - assertTrue(output.contains("Corrupt User PackageFile " + url)); - assertEquals(url, creater.archive); - } - - @Test public void testCreateFunctionWithTopicPatterns() throws Exception { String fnName = TEST_NAME + "-function"; String topicPatterns = "persistent://tenant/ns/topicPattern*"; @@ -426,46 +340,6 @@ public class CmdFunctionsTest { } @Test - public void testCreateWithoutTenant() throws Exception { - String fnName = TEST_NAME + "-function"; - String inputTopicName = "persistent://tenant/standalone/namespace/input-topic"; - String outputTopicName = "persistent://tenant/standalone/namespace/output-topic"; - cmd.run(new String[] { - "create", - "--name", fnName, - "--inputs", inputTopicName, - "--output", outputTopicName, - "--jar", JAR_NAME, - "--namespace", "ns1", - "--className", DummyFunction.class.getName(), - }); - - CreateFunction creater = cmd.getCreater(); - assertEquals("public", creater.getFunctionConfig().getTenant()); - verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString()); - } - - @Test - public void testCreateWithoutNamespace() throws Exception { - String fnName = TEST_NAME + "-function"; - String inputTopicName = "persistent://tenant/standalone/namespace/input-topic"; - String outputTopicName = "persistent://tenant/standalone/namespace/output-topic"; - cmd.run(new String[] { - "create", - "--name", fnName, - "--inputs", inputTopicName, - "--output", outputTopicName, - "--jar", JAR_NAME, - "--className", DummyFunction.class.getName(), - }); - - CreateFunction creater = cmd.getCreater(); - assertEquals("public", creater.getFunctionConfig().getTenant()); - assertEquals("default", creater.getFunctionConfig().getNamespace()); - verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString()); - } - - @Test public void testCreateUsingFullyQualifiedFunctionName() throws Exception { String inputTopicName = TEST_NAME + "-input-topic"; String outputTopicName = TEST_NAME + "-output-topic"; @@ -491,25 +365,6 @@ public class CmdFunctionsTest { } @Test - public void testCreateWithoutFunctionName() throws Exception { - String inputTopicName = TEST_NAME + "-input-topic"; - String outputTopicName = TEST_NAME + "-output-topic"; - cmd.run(new String[] { - "create", - "--inputs", inputTopicName, - "--output", outputTopicName, - "--jar", JAR_NAME, - "--tenant", "sample", - "--namespace", "ns1", - "--className", DummyFunction.class.getName(), - }); - - CreateFunction creater = cmd.getCreater(); - assertEquals("CmdFunctionsTest$DummyFunction", creater.getFunctionConfig().getName()); - verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString()); - } - - @Test public void testCreateWithoutOutputTopicWithSkipFlag() throws Exception { String inputTopicName = TEST_NAME + "-input-topic"; cmd.run(new String[] { @@ -729,89 +584,6 @@ public class CmdFunctionsTest { } @Test - public void TestCreateFunctionParallelism() throws Exception{ - - String[] correctArgs = new String[]{ - "--name", fnName, - "--inputs", inputTopicName, - "--output", outputTopicName, - "--jar", JAR_NAME, - "--tenant", "sample", - "--namespace", "ns1", - "--className", DummyFunction.class.getName(), - "--parallelism", "1" - }; - - String[] incorrectArgs = new String[]{ - "--name", fnName, - "--inputs", inputTopicName, - "--output", outputTopicName, - "--jar", JAR_NAME, - "--tenant", "sample", - "--namespace", "ns1", - "--className", DummyFunction.class.getName(), - "--parallelism", "-1" - }; - - testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Function parallelism should positive number"); - - } - - @Test - public void TestCreateTopicName() throws Exception { - - String[] correctArgs = new String[]{ - "--name", fnName, - "--inputs", inputTopicName, - "--output", outputTopicName, - "--jar", JAR_NAME, - "--tenant", "sample", - "--namespace", "ns1", - "--className", DummyFunction.class.getName(), - }; - - String wrongOutputTopicName = TEST_NAME + "-output-topic/test:"; - String[] incorrectArgs = new String[]{ - "--name", fnName, - "--inputs", inputTopicName, - "--output", wrongOutputTopicName, - "--jar", JAR_NAME, - "--tenant", "sample", - "--namespace", "ns1", - "--className", DummyFunction.class.getName(), - }; - - testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Output topic " + wrongOutputTopicName + " is invalid"); - } - - @Test - public void TestCreateClassName() throws Exception { - - String[] correctArgs = new String[]{ - "--name", fnName, - "--inputs", inputTopicName, - "--output", outputTopicName, - "--jar", DummyFunction.class.getProtectionDomain().getCodeSource().getLocation().getPath(), - "--tenant", "sample", - "--namespace", "ns1", - "--className", DummyFunction.class.getName(), - }; - - String cannotLoadClass = "com.test.Function"; - String[] incorrectArgs = new String[]{ - "--name", fnName, - "--inputs", inputTopicName, - "--output", outputTopicName, - "--jar", DummyFunction.class.getProtectionDomain().getCodeSource().getLocation().getPath(), - "--tenant", "sample", - "--namespace", "ns1", - "--className", cannotLoadClass, - }; - - testValidateFunctionsConfigs(correctArgs, incorrectArgs, "User class must be in class path"); - } - - @Test public void testCreateFunctionWithCpu() throws Exception { String fnName = TEST_NAME + "-function"; String inputTopicName = TEST_NAME + "-input-topic"; @@ -998,35 +770,6 @@ public class CmdFunctionsTest { verify(functions, times(1)).updateFunctionWithUrl(any(FunctionConfig.class), anyString()); } - @Test - public void TestCreateSameInOutTopic() throws Exception { - - String[] correctArgs = new String[]{ - "--name", fnName, - "--inputs", inputTopicName, - "--output", outputTopicName, - "--jar", JAR_NAME, - "--tenant", "sample", - "--namespace", "ns1", - "--className", DummyFunction.class.getName(), - }; - - String[] incorrectArgs = new String[]{ - "--name", fnName, - "--inputs", inputTopicName, - "--output", inputTopicName, - "--jar", JAR_NAME, - "--tenant", "sample", - "--namespace", "ns1", - "--className", DummyFunction.class.getName(), - }; - - testValidateFunctionsConfigs(correctArgs, incorrectArgs, - "Output topic " + inputTopicName - + " is also being used as an input topic (topics must be one or the other)"); - - } - public static class ConsoleOutputCapturer { private ByteArrayOutputStream stdout; diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml index 73faa52..8e168f3 100644 --- a/pulsar-client-tools/pom.xml +++ b/pulsar-client-tools/pom.xml @@ -70,37 +70,10 @@ <!-- functions related dependencies (begin) --> <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>pulsar-functions-worker</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>net.jodah</groupId> - <artifactId>typetools</artifactId> - </dependency> - - <dependency> <groupId>org.apache.bookkeeper</groupId> <artifactId>stream-storage-java-client</artifactId> </dependency> - <!-- functions related dependencies (end) --> - - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>pulsar-io-cassandra</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>pulsar-io-twitter</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - </dependencies> <build> @@ -118,12 +91,7 @@ </goals> <configuration> <tasks> - <echo>copy test sink package</echo> - <mkdir dir="${basedir}/src/test/resources"/> - <copy file="${basedir}/../pulsar-io/cassandra/target/pulsar-io-cassandra-${project.version}.nar" tofile="${basedir}/src/test/resources/pulsar-io-cassandra.nar"/> - <echo>copy test source package</echo> - <mkdir dir="${basedir}/src/test/resources"/> - <copy file="${basedir}/../pulsar-io/twitter/target/pulsar-io-twitter-${project.version}.nar" tofile="${basedir}/src/test/resources/pulsar-io-twitter.nar"/> + <copy file="${basedir}/pom.xml" tofile="${basedir}/src/test/resources/dummy.nar"/> </tasks> </configuration> </execution> diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 2ec5aff..24597eb 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -65,7 +65,6 @@ import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.functions.WindowConfig; import org.apache.pulsar.functions.utils.Utils; -import org.apache.pulsar.functions.utils.FunctionConfigUtils; @Slf4j @Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style compute processes that work with Pulsar)") @@ -297,9 +296,6 @@ public class CmdFunctions extends CmdBase { protected String deadLetterTopic; protected FunctionConfig functionConfig; protected String userCodeFile; - // The classLoader associated with this function defn - protected ClassLoader classLoader; - private void mergeArgs() { if (!StringUtils.isBlank(DEPRECATED_className)) className = DEPRECATED_className; @@ -476,9 +472,6 @@ public class CmdFunctions extends CmdBase { userCodeFile = functionConfig.getPy(); } - // infer default vaues - FunctionConfigUtils.inferMissingArguments(functionConfig); - // check if configs are valid validateFunctionConfigs(functionConfig); } @@ -501,15 +494,7 @@ public class CmdFunctions extends CmdBase { } if (!isBlank(functionConfig.getPy()) && !Utils.isFunctionPackageUrlSupported(functionConfig.getPy()) && !new File(functionConfig.getPy()).exists()) { - throw new ParameterException("The specified jar file does not exist"); - } - - try { - // Need to load jar and set context class loader before calling - String functionPkgUrl = Utils.isFunctionPackageUrlSupported(userCodeFile) ? userCodeFile : null; - classLoader = FunctionConfigUtils.validate(functionConfig, functionPkgUrl, null); - } catch (Exception e) { - throw new IllegalArgumentException(e.getMessage()); + throw new ParameterException("The specified python file does not exist"); } } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 48601b2..d14c053 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -22,7 +22,6 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; -import static org.apache.pulsar.functions.utils.SinkConfigUtils.inferMissingArguments; import static org.apache.pulsar.functions.utils.Utils.BUILTIN; import com.beust.jcommander.Parameter; @@ -38,7 +37,6 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Type; -import java.nio.file.Path; import java.nio.file.Paths; import java.util.*; import java.util.stream.Collectors; @@ -51,12 +49,10 @@ import org.apache.commons.lang3.text.WordUtils; import org.apache.pulsar.admin.cli.utils.CmdUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.admin.internal.FunctionsImpl; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.io.SinkConfig; -import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.functions.utils.*; import org.apache.pulsar.functions.utils.io.ConnectorUtils; import org.apache.pulsar.functions.utils.io.Connectors; @@ -306,8 +302,6 @@ public class CmdSinks extends CmdBase { protected SinkConfig sinkConfig; - protected NarClassLoader classLoader; - private void mergeArgs() { if (!StringUtils.isBlank(DEPRECATED_subsName)) subsName = DEPRECATED_subsName; if (!StringUtils.isBlank(DEPRECATED_topicsPattern)) topicsPattern = DEPRECATED_topicsPattern; @@ -421,8 +415,6 @@ public class CmdSinks extends CmdBase { if (null != sinkConfigString) { sinkConfig.setConfigs(parseConfigs(sinkConfigString)); } - - inferMissingArguments(sinkConfig); // check if configs are valid validateSinkConfigs(sinkConfig); @@ -445,24 +437,6 @@ public class CmdSinks extends CmdBase { throw new IllegalArgumentException(String.format("Sink Archive file %s does not exist", sinkConfig.getArchive())); } } - - try { - // Need to load jar and set context class loader before calling - String sourcePkgUrl = Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive()) ? sinkConfig.getArchive() : null; - Path archivePath = (Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive()) || sinkConfig.getArchive().startsWith(BUILTIN)) ? null : new File(sinkConfig.getArchive()).toPath(); - classLoader = SinkConfigUtils.validate(sinkConfig, archivePath, sourcePkgUrl, null); - } catch (Exception e) { - throw new ParameterException(e.getMessage()); - } - } - - - protected org.apache.pulsar.functions.proto.Function.FunctionDetails createSinkConfigProto2(SinkConfig sinkConfig) - throws IOException { - org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder - = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder(); - Utils.mergeJson(FunctionsImpl.printJson(SinkConfigUtils.convert(sinkConfig, classLoader)), functionDetailsBuilder); - return functionDetailsBuilder.build(); } protected String validateSinkType(String sinkType) throws IOException { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index a2271e5..32ebfc9 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -22,7 +22,6 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; -import static org.apache.pulsar.functions.utils.SourceConfigUtils.inferMissingArguments; import static org.apache.pulsar.functions.utils.Utils.BUILTIN; import com.beust.jcommander.Parameter; @@ -38,7 +37,6 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Type; -import java.nio.file.Path; import java.nio.file.Paths; import java.util.LinkedList; import java.util.List; @@ -54,13 +52,10 @@ import org.apache.commons.lang3.text.WordUtils; import org.apache.pulsar.admin.cli.utils.CmdUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.admin.internal.FunctionsImpl; import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.io.ConnectorDefinition; -import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.io.SourceConfig; -import org.apache.pulsar.functions.utils.SourceConfigUtils; import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.utils.io.ConnectorUtils; import org.apache.pulsar.functions.utils.io.Connectors; @@ -296,8 +291,6 @@ public class CmdSources extends CmdBase { protected SourceConfig sourceConfig; - protected NarClassLoader classLoader; - private void mergeArgs() { if (DEPRECATED_processingGuarantees != null) processingGuarantees = DEPRECATED_processingGuarantees; if (!isBlank(DEPRECATED_destinationTopicName)) destinationTopicName = DEPRECATED_destinationTopicName; @@ -380,8 +373,6 @@ public class CmdSources extends CmdBase { sourceConfig.setConfigs(parseConfigs(sourceConfigString)); } - inferMissingArguments(sourceConfig); - // check if source configs are valid validateSourceConfigs(sourceConfig); } @@ -402,23 +393,6 @@ public class CmdSources extends CmdBase { throw new IllegalArgumentException(String.format("Source Archive %s does not exist", sourceConfig.getArchive())); } } - - try { - // Need to load jar and set context class loader before calling - String sourcePkgUrl = Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive()) ? sourceConfig.getArchive() : null; - Path archivePath = (Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive()) || sourceConfig.getArchive().startsWith(BUILTIN)) ? null : new File(sourceConfig.getArchive()).toPath(); - classLoader = SourceConfigUtils.validate(sourceConfig, archivePath, sourcePkgUrl, null); - } catch (Exception e) { - throw new ParameterException(e.getMessage()); - } - } - - protected org.apache.pulsar.functions.proto.Function.FunctionDetails createSourceConfigProto2(SourceConfig sourceConfig) - throws IOException { - org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder - = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder(); - Utils.mergeJson(FunctionsImpl.printJson(SourceConfigUtils.convert(sourceConfig, classLoader)), functionDetailsBuilder); - return functionDetailsBuilder.build(); } protected String validateSourceType(String sourceType) throws IOException { diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java index aea19fd..cbe5296 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java @@ -44,8 +44,6 @@ import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.io.SinkConfig; import org.apache.pulsar.functions.utils.*; -import org.apache.pulsar.io.cassandra.CassandraStringSink; -import org.mockito.Mockito; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; @@ -68,7 +66,7 @@ public class TestCmdSinks { private static final String TENANT = "test-tenant"; private static final String NAMESPACE = "test-namespace"; private static final String NAME = "test"; - private static final String CLASS_NAME = CassandraStringSink.class.getName(); + private static final String CLASS_NAME = "SomeRandomClassName"; private static final String INPUTS = "test-src1,test-src2"; private static final List<String> INPUTS_LIST; static { @@ -86,8 +84,7 @@ public class TestCmdSinks { private static final FunctionConfig.ProcessingGuarantees PROCESSING_GUARANTEES = FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE; private static final Integer PARALLELISM = 1; - private static final String JAR_FILE_NAME = "pulsar-io-cassandra.nar"; - private static final String WRONG_JAR_FILE_NAME = "pulsar-io-twitter.nar"; + private static final String JAR_FILE_NAME = "dummy.nar"; private String JAR_FILE_PATH; private String WRONG_JAR_PATH; private static final Double CPU = 100.0; @@ -123,7 +120,6 @@ public class TestCmdSinks { throw new RuntimeException("Failed to file required test archive: " + JAR_FILE_NAME); } JAR_FILE_PATH = file.getFile(); - WRONG_JAR_PATH = Thread.currentThread().getContextClassLoader().getResource(WRONG_JAR_FILE_NAME).getFile(); Thread.currentThread().setContextClassLoader(Utils.loadJar(new File(JAR_FILE_PATH))); } @@ -168,72 +164,6 @@ public class TestCmdSinks { } @Test - public void testMissingTenant() throws Exception { - SinkConfig sinkConfig = getSinkConfig(); - sinkConfig.setTenant(PUBLIC_TENANT); - testCmdSinkCliMissingArgs( - null, - NAMESPACE, - NAME, - INPUTS, - TOPIC_PATTERN, - CUSTOM_SERDE_INPUT_STRING, - PROCESSING_GUARANTEES, - PARALLELISM, - JAR_FILE_PATH, - CPU, - RAM, - DISK, - SINK_CONFIG_STRING, - sinkConfig - ); - } - - @Test - public void testMissingNamespace() throws Exception { - SinkConfig sinkConfig = getSinkConfig(); - sinkConfig.setNamespace(DEFAULT_NAMESPACE); - testCmdSinkCliMissingArgs( - TENANT, - null, - NAME, - INPUTS, - TOPIC_PATTERN, - CUSTOM_SERDE_INPUT_STRING, - PROCESSING_GUARANTEES, - PARALLELISM, - JAR_FILE_PATH, - CPU, - RAM, - DISK, - SINK_CONFIG_STRING, - sinkConfig - ); - } - - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink name cannot be null") - public void testMissingName() throws Exception { - SinkConfig sinkConfig = getSinkConfig(); - sinkConfig.setName(null); - testCmdSinkCliMissingArgs( - TENANT, - NAMESPACE, - null, - INPUTS, - TOPIC_PATTERN, - CUSTOM_SERDE_INPUT_STRING, - PROCESSING_GUARANTEES, - PARALLELISM, - JAR_FILE_PATH, - CPU, - RAM, - DISK, - SINK_CONFIG_STRING, - sinkConfig - ); - } - - @Test public void testMissingInput() throws Exception { SinkConfig sinkConfig = getSinkConfig(); sinkConfig.setInputSpecs(new HashMap<>()); @@ -303,27 +233,6 @@ public class TestCmdSinks { ); } - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Must specify at least one topic of input via topicToSerdeClassName, topicsPattern, topicToSchemaType or inputSpecs") - public void testMissingAllInputTopics() throws Exception { - SinkConfig sinkConfig = getSinkConfig(); - testCmdSinkCliMissingArgs( - TENANT, - NAMESPACE, - NAME, - null, - null, - null, - PROCESSING_GUARANTEES, - PARALLELISM, - JAR_FILE_PATH, - CPU, - RAM, - DISK, - SINK_CONFIG_STRING, - sinkConfig - ); - } - @Test public void testMissingProcessingGuarantees() throws Exception { SinkConfig sinkConfig = getSinkConfig(); @@ -367,51 +276,7 @@ public class TestCmdSinks { sinkConfig ); } - - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink parallelism should positive number") - public void testNegativeParallelism() throws Exception { - SinkConfig sinkConfig = getSinkConfig(); - sinkConfig.setParallelism(-1); - testCmdSinkCliMissingArgs( - TENANT, - NAMESPACE, - NAME, - INPUTS, - TOPIC_PATTERN, - CUSTOM_SERDE_INPUT_STRING, - PROCESSING_GUARANTEES, - -1, - JAR_FILE_PATH, - CPU, - RAM, - DISK, - SINK_CONFIG_STRING, - sinkConfig - ); - } - - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink parallelism should positive number") - public void testZeroParallelism() throws Exception { - SinkConfig sinkConfig = getSinkConfig(); - sinkConfig.setParallelism(0); - testCmdSinkCliMissingArgs( - TENANT, - NAMESPACE, - NAME, - INPUTS, - TOPIC_PATTERN, - CUSTOM_SERDE_INPUT_STRING, - PROCESSING_GUARANTEES, - 0, - JAR_FILE_PATH, - CPU, - RAM, - DISK, - SINK_CONFIG_STRING, - sinkConfig - ); - } - + @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink archive not specfied") public void testMissingArchive() throws Exception { SinkConfig sinkConfig = getSinkConfig(); @@ -434,28 +299,6 @@ public class TestCmdSinks { ); } - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Failed to extract sink class from archive") - public void testInvalidJarWithNoSource() throws Exception { - SinkConfig sinkConfig = getSinkConfig(); - sinkConfig.setArchive(WRONG_JAR_PATH); - testCmdSinkCliMissingArgs( - TENANT, - NAMESPACE, - NAME, - INPUTS, - TOPIC_PATTERN, - CUSTOM_SERDE_INPUT_STRING, - PROCESSING_GUARANTEES, - PARALLELISM, - WRONG_JAR_PATH, - CPU, - RAM, - DISK, - SINK_CONFIG_STRING, - sinkConfig - ); - } - @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Sink Archive file /tmp/foo.jar" + " does not exist") public void testInvalidJar() throws Exception { @@ -588,36 +431,6 @@ public class TestCmdSinks { } @Test - public void testCmdSinkConfigFileMissingTenant() throws Exception { - SinkConfig testSinkConfig = getSinkConfig(); - testSinkConfig.setTenant(null); - - SinkConfig expectedSinkConfig = getSinkConfig(); - expectedSinkConfig.setTenant(PUBLIC_TENANT); - testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig); - } - - @Test - public void testCmdSinkConfigFileMissingNamespace() throws Exception { - SinkConfig testSinkConfig = getSinkConfig(); - testSinkConfig.setNamespace(null); - - SinkConfig expectedSinkConfig = getSinkConfig(); - expectedSinkConfig.setNamespace(DEFAULT_NAMESPACE); - testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig); - } - - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink name cannot be null") - public void testCmdSinkConfigFileMissingName() throws Exception { - SinkConfig testSinkConfig = getSinkConfig(); - testSinkConfig.setName(null); - - SinkConfig expectedSinkConfig = getSinkConfig(); - expectedSinkConfig.setName(null); - testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig); - } - - @Test public void testCmdSinkConfigFileMissingTopicToSerdeClassName() throws Exception { SinkConfig testSinkConfig = getSinkConfig(); @@ -633,19 +446,6 @@ public class TestCmdSinks { testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig); } - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Must specify at least one topic of input via topicToSerdeClassName, topicsPattern, topicToSchemaType or inputSpecs") - public void testCmdSinkConfigFileMissingAllInput() throws Exception { - SinkConfig testSinkConfig = getSinkConfig(); - testSinkConfig.setInputs(null); - testSinkConfig.setTopicToSchemaType(null); - testSinkConfig.setTopicToSerdeClassName(null); - testSinkConfig.setTopicsPattern(null); - testSinkConfig.setInputSpecs(null); - - SinkConfig expectedSinkConfig = getSinkConfig(); - testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig); - } - @Test public void testCmdSinkConfigFileMissingConfig() throws Exception { SinkConfig testSinkConfig = getSinkConfig(); @@ -656,26 +456,6 @@ public class TestCmdSinks { testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig); } - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink parallelism should positive number") - public void testCmdSinkConfigFileZeroParallelism() throws Exception { - SinkConfig testSinkConfig = getSinkConfig(); - testSinkConfig.setParallelism(0); - - SinkConfig expectedSinkConfig = getSinkConfig(); - expectedSinkConfig.setParallelism(0); - testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig); - } - - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink parallelism should positive number") - public void testCmdSinkConfigFileNegativeParallelism() throws Exception { - SinkConfig testSinkConfig = getSinkConfig(); - testSinkConfig.setParallelism(-1); - - SinkConfig expectedSinkConfig = getSinkConfig(); - expectedSinkConfig.setParallelism(-1); - testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig); - } - @Test public void testCmdSinkConfigFileMissingProcessingGuarantees() throws Exception { SinkConfig testSinkConfig = getSinkConfig(); @@ -716,16 +496,6 @@ public class TestCmdSinks { testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig); } - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Failed to extract sink class from archive") - public void testCmdSinkConfigFileInvalidJarNoSink() throws Exception { - SinkConfig testSinkConfig = getSinkConfig(); - testSinkConfig.setArchive(WRONG_JAR_PATH); - - SinkConfig expectedSinkConfig = getSinkConfig(); - expectedSinkConfig.setArchive(WRONG_JAR_PATH); - testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig); - } - public void testCmdSinkConfigFile(SinkConfig testSinkConfig, SinkConfig expectedSinkConfig) throws Exception { File file = Files.createTempFile("", "").toFile(); diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java index 32a6c60..33d7513 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java @@ -70,10 +70,8 @@ public class TestCmdSources { private static final FunctionConfig.ProcessingGuarantees PROCESSING_GUARANTEES = FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE; private static final Integer PARALLELISM = 1; - private static final String JAR_FILE_NAME = "pulsar-io-twitter.nar"; - private static final String WRONG_JAR_FILE_NAME = "pulsar-io-cassandra.nar"; + private static final String JAR_FILE_NAME = "dummy.nar"; private String JAR_FILE_PATH; - private String WRONG_JAR_PATH; private static final Double CPU = 100.0; private static final Long RAM = 1024L * 1024L; private static final Long DISK = 1024L * 1024L * 1024L; @@ -103,7 +101,6 @@ public class TestCmdSources { mockStatic(CmdFunctions.class); PowerMockito.doNothing().when(localSourceRunner).runCmd(); JAR_FILE_PATH = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME).getFile(); - WRONG_JAR_PATH = Thread.currentThread().getContextClassLoader().getResource(WRONG_JAR_FILE_NAME).getFile(); Thread.currentThread().setContextClassLoader(Utils.loadJar(new File(JAR_FILE_PATH))); } @@ -142,84 +139,6 @@ public class TestCmdSources { } @Test - public void testMissingTenant() throws Exception { - SourceConfig sourceConfig = getSourceConfig(); - sourceConfig.setTenant(PUBLIC_TENANT); - testCmdSourceCliMissingArgs( - null, - NAMESPACE, - NAME, - TOPIC_NAME, SERDE_CLASS_NAME, PROCESSING_GUARANTEES, - PARALLELISM, - JAR_FILE_PATH, - CPU, - RAM, - DISK, - SINK_CONFIG_STRING, - sourceConfig - ); - } - - @Test - public void testMissingNamespace() throws Exception { - SourceConfig sourceConfig = getSourceConfig(); - sourceConfig.setNamespace(DEFAULT_NAMESPACE); - testCmdSourceCliMissingArgs( - TENANT, - null, - NAME, - TOPIC_NAME, SERDE_CLASS_NAME, PROCESSING_GUARANTEES, - PARALLELISM, - JAR_FILE_PATH, - CPU, - RAM, - DISK, - SINK_CONFIG_STRING, - sourceConfig - ); - } - - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source name cannot be null") - public void testMissingName() throws Exception { - SourceConfig sourceConfig = getSourceConfig(); - sourceConfig.setName(null); - testCmdSourceCliMissingArgs( - TENANT, - NAMESPACE, - null, - TOPIC_NAME, SERDE_CLASS_NAME, PROCESSING_GUARANTEES, - PARALLELISM, - JAR_FILE_PATH, - CPU, - RAM, - DISK, - SINK_CONFIG_STRING, - sourceConfig - ); - } - - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Topic name cannot be null") - public void testMissingTopicName() throws Exception { - SourceConfig sourceConfig = getSourceConfig(); - sourceConfig.setTopicName(null); - testCmdSourceCliMissingArgs( - TENANT, - NAMESPACE, - NAME, - null, - SERDE_CLASS_NAME, - PROCESSING_GUARANTEES, - PARALLELISM, - JAR_FILE_PATH, - CPU, - RAM, - DISK, - SINK_CONFIG_STRING, - sourceConfig - ); - } - - @Test public void testMissingSerdeClassName() throws Exception { SourceConfig sourceConfig = getSourceConfig(); sourceConfig.setSerdeClassName(null); @@ -278,44 +197,6 @@ public class TestCmdSources { ); } - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source parallelism should positive number") - public void testNegativeParallelism() throws Exception { - SourceConfig sourceConfig = getSourceConfig(); - sourceConfig.setParallelism(-1); - testCmdSourceCliMissingArgs( - TENANT, - NAMESPACE, - NAME, - TOPIC_NAME, SERDE_CLASS_NAME, PROCESSING_GUARANTEES, - -1, - JAR_FILE_PATH, - CPU, - RAM, - DISK, - SINK_CONFIG_STRING, - sourceConfig - ); - } - - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source parallelism should positive number") - public void testZeroParallelism() throws Exception { - SourceConfig sourceConfig = getSourceConfig(); - sourceConfig.setParallelism(0); - testCmdSourceCliMissingArgs( - TENANT, - NAMESPACE, - NAME, - TOPIC_NAME, SERDE_CLASS_NAME, PROCESSING_GUARANTEES, - 0, - JAR_FILE_PATH, - CPU, - RAM, - DISK, - SINK_CONFIG_STRING, - sourceConfig - ); - } - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source archive not specfied") public void testMissingArchive() throws Exception { SourceConfig sourceConfig = getSourceConfig(); @@ -355,25 +236,6 @@ public class TestCmdSources { ); } - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Failed to extract source class from archive") - public void testInvalidJarWithNoSource() throws Exception { - SourceConfig sourceConfig = getSourceConfig(); - sourceConfig.setArchive(WRONG_JAR_PATH); - testCmdSourceCliMissingArgs( - TENANT, - NAMESPACE, - NAME, - TOPIC_NAME, SERDE_CLASS_NAME, PROCESSING_GUARANTEES, - PARALLELISM, - WRONG_JAR_PATH, - CPU, - RAM, - DISK, - SINK_CONFIG_STRING, - sourceConfig - ); - } - @Test public void testMissingConfig() throws Exception { SourceConfig sourceConfig = getSourceConfig(); @@ -474,46 +336,6 @@ public class TestCmdSources { } @Test - public void testCmdSourceConfigFileMissingTenant() throws Exception { - SourceConfig testSourceConfig = getSourceConfig(); - testSourceConfig.setTenant(null); - - SourceConfig expectedSourceConfig = getSourceConfig(); - expectedSourceConfig.setTenant(PUBLIC_TENANT); - testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig); - } - - @Test - public void testCmdSourceConfigFileMissingNamespace() throws Exception { - SourceConfig testSourceConfig = getSourceConfig(); - testSourceConfig.setNamespace(null); - - SourceConfig expectedSourceConfig = getSourceConfig(); - expectedSourceConfig.setNamespace(DEFAULT_NAMESPACE); - testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig); - } - - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source name cannot be null") - public void testCmdSourceConfigFileMissingName() throws Exception { - SourceConfig testSourceConfig = getSourceConfig(); - testSourceConfig.setName(null); - - SourceConfig expectedSourceConfig = getSourceConfig(); - expectedSourceConfig.setName(null); - testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig); - } - - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Topic name cannot be null") - public void testCmdSourceConfigFileMissingTopicName() throws Exception { - SourceConfig testSourceConfig = getSourceConfig(); - testSourceConfig.setTopicName(null); - - SourceConfig expectedSourceConfig = getSourceConfig(); - expectedSourceConfig.setTopicName(null); - testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig); - } - - @Test public void testCmdSourceConfigFileMissingSerdeClassname() throws Exception { SourceConfig testSourceConfig = getSourceConfig(); testSourceConfig.setSerdeClassName(null); @@ -533,26 +355,6 @@ public class TestCmdSources { testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig); } - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source parallelism should positive number") - public void testCmdSourceConfigFileZeroParallelism() throws Exception { - SourceConfig testSourceConfig = getSourceConfig(); - testSourceConfig.setParallelism(0); - - SourceConfig expectedSourceConfig = getSourceConfig(); - expectedSourceConfig.setParallelism(0); - testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig); - } - - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source parallelism should positive number") - public void testCmdSourceConfigFileNegativeParallelism() throws Exception { - SourceConfig testSourceConfig = getSourceConfig(); - testSourceConfig.setParallelism(-1); - - SourceConfig expectedSourceConfig = getSourceConfig(); - expectedSourceConfig.setParallelism(-1); - testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig); - } - @Test public void testCmdSourceConfigFileMissingProcessingGuarantees() throws Exception { SourceConfig testSourceConfig = getSourceConfig(); @@ -593,16 +395,6 @@ public class TestCmdSources { testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig); } - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Failed to extract source class from archive") - public void testCmdSourceConfigFileInvalidJarNoSource() throws Exception { - SourceConfig testSourceConfig = getSourceConfig(); - testSourceConfig.setArchive(WRONG_JAR_PATH); - - SourceConfig expectedSourceConfig = getSourceConfig(); - expectedSourceConfig.setArchive(WRONG_JAR_PATH); - testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig); - } - public void testCmdSourceConfigFile(SourceConfig testSourceConfig, SourceConfig expectedSourceConfig) throws Exception { File file = Files.createTempFile("", "").toFile(); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index 558f33b..2cbd460 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -56,10 +56,10 @@ public class SinkConfigUtils { FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); - boolean isBuiltin = sinkConfig.getArchive().startsWith(Utils.BUILTIN); + boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(Utils.BUILTIN); if (!isBuiltin) { - if (sinkConfig.getArchive().startsWith(Utils.FILE)) { + if (!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(Utils.FILE)) { if (isBlank(sinkConfig.getClassName())) { throw new IllegalArgumentException("Class-name must be present for archive with file-url"); } @@ -286,9 +286,7 @@ public class SinkConfigUtils { NarClassLoader classLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile); if (classLoader == null) { - // This happens at the cli for builtin. There is no need to check this since - // the actual check will be done at serverside - return null; + throw new IllegalArgumentException("Sink Package is not provided"); } String sinkClassName; diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index 07d593d..6ce1db3 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -210,9 +210,7 @@ public class SourceConfigUtils { NarClassLoader classLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile); if (classLoader == null) { - // This happens at the cli for builtin. There is no need to check this since - // the actual check will be done at serverside - return null; + throw new IllegalArgumentException("Source Package is not provided"); } String sourceClassName; diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java index c6feb5a..8780671 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java @@ -46,8 +46,7 @@ public class ConnectorUtils { /** * Extract the Pulsar IO Source class from a connector archive. */ - public static String getIOSourceClass(ClassLoader classLoader) throws IOException { - NarClassLoader ncl = (NarClassLoader) classLoader; + public static String getIOSourceClass(NarClassLoader ncl) throws IOException { String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME); ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml index 24bbd6e..eb67777 100644 --- a/pulsar-functions/worker/pom.xml +++ b/pulsar-functions/worker/pom.xml @@ -129,6 +129,50 @@ </exclusions> </dependency> + <!-- functions related dependencies (end) --> + + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-io-cassandra</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-io-twitter</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <!-- this task will copy nar files to resources for the test to work --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <phase>compile</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <tasks> + <echo>copy test sink package</echo> + <mkdir dir="${basedir}/src/test/resources"/> + <copy file="${basedir}/../../pulsar-io/cassandra/target/pulsar-io-cassandra-${project.version}.nar" tofile="${basedir}/src/test/resources/pulsar-io-cassandra.nar"/> + <echo>copy test source package</echo> + <mkdir dir="${basedir}/src/test/resources"/> + <copy file="${basedir}/../../pulsar-io/twitter/target/pulsar-io-twitter-${project.version}.nar" tofile="${basedir}/src/test/resources/pulsar-io-twitter.nar"/> + </tasks> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </project> diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 89e3b48..914a691 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -1058,7 +1058,7 @@ public class FunctionsImpl { private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String componentName, String functionDetailsJson, String componentConfigJson, String componentType, - String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException, URISyntaxException { + String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException { if (tenant == null) { throw new IllegalArgumentException("Tenant is not provided"); } @@ -1071,12 +1071,14 @@ public class FunctionsImpl { if (componentType.equals(FUNCTION) && !isEmpty(componentConfigJson)) { FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class); + FunctionConfigUtils.inferMissingArguments(functionConfig); ClassLoader clsLoader = FunctionConfigUtils.validate(functionConfig, functionPkgUrl, uploadedInputStreamAsFile); return FunctionConfigUtils.convert(functionConfig, clsLoader); } if (componentType.equals(SOURCE)) { Path archivePath = null; SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class); + SourceConfigUtils.inferMissingArguments(sourceConfig); if (!StringUtils.isEmpty(sourceConfig.getArchive())) { String builtinArchive = sourceConfig.getArchive(); if (builtinArchive.startsWith(BUILTIN)) { @@ -1094,6 +1096,7 @@ public class FunctionsImpl { if (componentType.equals(SINK)) { Path archivePath = null; SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class); + SinkConfigUtils.inferMissingArguments(sinkConfig); if (!StringUtils.isEmpty(sinkConfig.getArchive())) { String builtinArchive = sinkConfig.getArchive(); if (builtinArchive.startsWith(BUILTIN)) { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index cd55916..1a7b51a 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -165,6 +165,7 @@ public class FunctionApiV2ResourceTest { outputSerdeClassName, className, parallelism, + null, "Tenant is not provided"); } @@ -181,6 +182,7 @@ public class FunctionApiV2ResourceTest { outputSerdeClassName, className, parallelism, + null, "Namespace is not provided"); } @@ -197,6 +199,7 @@ public class FunctionApiV2ResourceTest { outputSerdeClassName, className, parallelism, + null, "Function Name is not provided"); } @@ -213,6 +216,7 @@ public class FunctionApiV2ResourceTest { outputSerdeClassName, className, parallelism, + null, "Function Package is not provided"); } @@ -229,6 +233,7 @@ public class FunctionApiV2ResourceTest { outputSerdeClassName, className, parallelism, + null, "No input topic(s) specified for the function"); } @@ -245,6 +250,7 @@ public class FunctionApiV2ResourceTest { outputSerdeClassName, className, parallelism, + null, "Function Package is not provided"); } @@ -261,9 +267,96 @@ public class FunctionApiV2ResourceTest { outputSerdeClassName, null, parallelism, + null, "Function classname cannot be null"); } + @Test + public void testRegisterFunctionWrongClassName() throws IOException { + testRegisterFunctionMissingArguments( + tenant, + namespace, + function, + mockedInputStream, + topicsToSerDeClassName, + mockedFormData, + outputTopic, + outputSerdeClassName, + "UnknownClass", + parallelism, + null, + "User class must be in class path"); + } + + @Test + public void testRegisterFunctionWrongParallelism() throws IOException { + testRegisterFunctionMissingArguments( + tenant, + namespace, + function, + mockedInputStream, + topicsToSerDeClassName, + mockedFormData, + outputTopic, + outputSerdeClassName, + className, + -2, + null, + "Function parallelism should positive number"); + } + + @Test + public void testRegisterFunctionSameInputOutput() throws IOException { + testRegisterFunctionMissingArguments( + tenant, + namespace, + function, + mockedInputStream, + topicsToSerDeClassName, + mockedFormData, + topicsToSerDeClassName.keySet().iterator().next(), + outputSerdeClassName, + className, + parallelism, + null, + "Output topic " + topicsToSerDeClassName.keySet().iterator().next() + + " is also being used as an input topic (topics must be one or the other)"); + } + + @Test + public void testRegisterFunctionWrongOutputTopic() throws IOException { + testRegisterFunctionMissingArguments( + tenant, + namespace, + function, + mockedInputStream, + topicsToSerDeClassName, + mockedFormData, + function + "-output-topic/test:", + outputSerdeClassName, + className, + parallelism, + null, + "Output topic " + function + "-output-topic/test:" + " is invalid"); + } + + @Test + public void testRegisterFunctionHttpUrl() throws IOException { + testRegisterFunctionMissingArguments( + tenant, + namespace, + function, + null, + topicsToSerDeClassName, + null, + outputTopic, + outputSerdeClassName, + className, + parallelism, + "http://localhost:1234/test", + "Corrupted Jar File"); + } + private void testRegisterFunctionMissingArguments( String tenant, String namespace, @@ -275,6 +368,7 @@ public class FunctionApiV2ResourceTest { String outputSerdeClassName, String className, Integer parallelism, + String functionPkgUrl, String errorExpected) throws IOException { FunctionConfig functionConfig = new FunctionConfig(); if (tenant != null) { @@ -309,7 +403,7 @@ public class FunctionApiV2ResourceTest { function, inputStream, details, - null, + functionPkgUrl, null, new Gson().toJson(functionConfig), FunctionsImpl.FUNCTION, diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java index 7bf1a46..57e3cc5 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java @@ -26,7 +26,6 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.config.Configurator; import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; @@ -38,8 +37,7 @@ import org.apache.pulsar.functions.utils.SinkConfigUtils; import org.apache.pulsar.functions.worker.*; import org.apache.pulsar.functions.worker.request.RequestResult; import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; -import org.apache.pulsar.io.core.Sink; -import org.apache.pulsar.io.core.SinkContext; +import org.apache.pulsar.io.cassandra.CassandraStringSink; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.mockito.Mockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; @@ -53,8 +51,10 @@ import org.testng.annotations.Test; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.net.URL; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -62,7 +62,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -78,7 +77,7 @@ import static org.testng.Assert.assertEquals; * Unit test of {@link SinkApiV2Resource}. */ @PrepareForTest({Utils.class, SinkConfigUtils.class}) -@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" }) +@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" }) @Slf4j public class SinkApiV2ResourceTest { @@ -87,16 +86,6 @@ public class SinkApiV2ResourceTest { return new org.powermock.modules.testng.PowerMockObjectFactory(); } - private static final class TestSink implements Sink<String> { - - @Override public void open(final Map<String, Object> config, SinkContext sinkContext) { - } - - @Override public void write(Record<String> record) { } - - @Override public void close() { } - } - private static final String tenant = "test-tenant"; private static final String namespace = "test-namespace"; private static final String sink = "test-sink"; @@ -105,9 +94,12 @@ public class SinkApiV2ResourceTest { topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", TopicSchema.DEFAULT_SERDE); } private static final String subscriptionName = "test-subscription"; - private static final String className = TestSink.class.getName(); - private static final String serde = TopicSchema.DEFAULT_SERDE; + private static final String className = CassandraStringSink.class.getName(); private static final int parallelism = 1; + private static final String JAR_FILE_NAME = "pulsar-io-cassandra.nar"; + private static final String INVALID_JAR_FILE_NAME = "pulsar-io-twitter.nar"; + private String JAR_FILE_PATH; + private String INVALID_JAR_FILE_PATH; private WorkerService mockedWorkerService; private FunctionMetaDataManager mockedManager; @@ -135,6 +127,14 @@ public class SinkApiV2ResourceTest { when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace); when(mockedWorkerService.isInitialized()).thenReturn(true); + URL file = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME); + if (file == null) { + throw new RuntimeException("Failed to file required test archive: " + JAR_FILE_NAME); + } + JAR_FILE_PATH = file.getFile(); + INVALID_JAR_FILE_PATH = Thread.currentThread().getContextClassLoader().getResource(INVALID_JAR_FILE_NAME).getFile(); + + // worker config WorkerConfig workerConfig = new WorkerConfig() .setWorkerId("test") @@ -146,9 +146,6 @@ public class SinkApiV2ResourceTest { when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig); this.resource = spy(new FunctionsImpl(() -> mockedWorkerService)); - mockStatic(SinkConfigUtils.class); - when(SinkConfigUtils.convert(anyObject(), anyObject())).thenReturn(FunctionDetails.newBuilder().build()); - when(SinkConfigUtils.validate(any(), any(), any(), any())).thenReturn(null); Mockito.doReturn("Sink").when(this.resource).calculateSubjectType(any()); } @@ -167,6 +164,7 @@ public class SinkApiV2ResourceTest { topicsToSerDeClassName, className, parallelism, + null, "Tenant is not provided"); } @@ -181,6 +179,7 @@ public class SinkApiV2ResourceTest { topicsToSerDeClassName, className, parallelism, + null, "Namespace is not provided"); } @@ -195,6 +194,7 @@ public class SinkApiV2ResourceTest { topicsToSerDeClassName, className, parallelism, + null, "Sink Name is not provided"); } @@ -209,7 +209,8 @@ public class SinkApiV2ResourceTest { topicsToSerDeClassName, className, parallelism, - "Function Package is not provided"); + null, + "Sink Package is not provided"); } @Test @@ -223,7 +224,84 @@ public class SinkApiV2ResourceTest { topicsToSerDeClassName, className, parallelism, - "Function Package is not provided"); + null, + "zip file is empty"); + } + + @Test + public void testRegisterSinkInvalidJarNoSink() throws IOException { + FileInputStream inputStream = new FileInputStream(INVALID_JAR_FILE_PATH); + testRegisterSinkMissingArguments( + tenant, + namespace, + sink, + inputStream, + null, + topicsToSerDeClassName, + className, + parallelism, + null, + "Failed to extract sink class from archive"); + } + + @Test + public void testRegisterSinkNoInput() throws IOException { + testRegisterSinkMissingArguments( + tenant, + namespace, + sink, + mockedInputStream, + mockedFormData, + null, + className, + parallelism, + null, + "Must specify at least one topic of input via topicToSerdeClassName, topicsPattern, topicToSchemaType or inputSpecs"); + } + + @Test + public void testRegisterSinkNegativeParallelism() throws IOException { + testRegisterSinkMissingArguments( + tenant, + namespace, + sink, + mockedInputStream, + mockedFormData, + topicsToSerDeClassName, + className, + -2, + null, + "Sink parallelism should positive number"); + } + + @Test + public void testRegisterSinkZeroParallelism() throws IOException { + testRegisterSinkMissingArguments( + tenant, + namespace, + sink, + mockedInputStream, + mockedFormData, + topicsToSerDeClassName, + className, + 0, + null, + "Sink parallelism should positive number"); + } + + @Test + public void testRegisterSinkHttpUrl() throws IOException { + testRegisterSinkMissingArguments( + tenant, + namespace, + sink, + null, + null, + topicsToSerDeClassName, + className, + parallelism, + "http://localhost:1234/test", + "Corrupt User PackageFile " + "http://localhost:1234/test with error Connection refused (Connection refused)"); } private void testRegisterSinkMissingArguments( @@ -235,6 +313,7 @@ public class SinkApiV2ResourceTest { Map<String, String> inputTopicMap, String className, Integer parallelism, + String pkgUrl, String errorExpected) throws IOException { SinkConfig sinkConfig = new SinkConfig(); if (tenant != null) { @@ -262,7 +341,7 @@ public class SinkApiV2ResourceTest { sink, inputStream, details, - null, + pkgUrl, null, new Gson().toJson(sinkConfig), FunctionsImpl.SINK, @@ -272,7 +351,7 @@ public class SinkApiV2ResourceTest { Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(errorExpected).reason); } - private Response registerDefaultSink() { + private Response registerDefaultSink() throws IOException { SinkConfig sinkConfig = new SinkConfig(); sinkConfig.setTenant(tenant); sinkConfig.setNamespace(namespace); @@ -284,7 +363,7 @@ public class SinkApiV2ResourceTest { tenant, namespace, sink, - mockedInputStream, + new FileInputStream(JAR_FILE_PATH), mockedFormData, null, null, @@ -440,7 +519,7 @@ public class SinkApiV2ResourceTest { topicsToSerDeClassName, className, parallelism, - "Function Package is not provided"); + "Sink Package is not provided"); } @Test @@ -454,7 +533,7 @@ public class SinkApiV2ResourceTest { topicsToSerDeClassName, className, parallelism, - "Function Package is not provided"); + "zip file is empty"); } private void testUpdateSinkMissingArguments( @@ -518,7 +597,7 @@ public class SinkApiV2ResourceTest { tenant, namespace, sink, - mockedInputStream, + new FileInputStream(JAR_FILE_PATH), mockedFormData, null, null, @@ -577,8 +656,7 @@ public class SinkApiV2ResourceTest { public void testUpdateSinkWithUrl() throws IOException { Configurator.setRootLevel(Level.DEBUG); - String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath(); - String filePackageUrl = "file://" + fileLocation; + String filePackageUrl = "file://" + JAR_FILE_PATH; SinkConfig sinkConfig = new SinkConfig(); sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java index 2002b3a..3a4785f 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java @@ -26,7 +26,6 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.config.Configurator; import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.proto.Function.*; import org.apache.pulsar.functions.runtime.RuntimeFactory; @@ -36,8 +35,7 @@ import org.apache.pulsar.functions.utils.SourceConfigUtils; import org.apache.pulsar.functions.worker.*; import org.apache.pulsar.functions.worker.request.RequestResult; import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; -import org.apache.pulsar.io.core.Source; -import org.apache.pulsar.io.core.SourceContext; +import org.apache.pulsar.io.twitter.TwitterFireHose; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.mockito.Mockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; @@ -51,9 +49,9 @@ import org.testng.annotations.Test; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import java.io.*; +import java.net.URL; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; import static org.mockito.Matchers.*; @@ -69,8 +67,8 @@ import static org.testng.Assert.assertEquals; /** * Unit test of {@link SourceApiV2Resource}. */ -@PrepareForTest({Utils.class,SourceConfigUtils.class}) -@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" }) +@PrepareForTest({Utils.class}) +@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" }) @Slf4j public class SourceApiV2ResourceTest { @@ -79,24 +77,17 @@ public class SourceApiV2ResourceTest { return new org.powermock.modules.testng.PowerMockObjectFactory(); } - private static final class TestSource implements Source<String> { - - @Override public void open(final Map<String, Object> config, SourceContext sourceContext) { - } - - @Override public Record<String> read() { return null; } - - @Override public void close() { } - } - private static final String tenant = "test-tenant"; private static final String namespace = "test-namespace"; private static final String source = "test-source"; private static final String outputTopic = "test-output-topic"; private static final String outputSerdeClassName = TopicSchema.DEFAULT_SERDE; - private static final String className = TestSource.class.getName(); - private static final String serde = TopicSchema.DEFAULT_SERDE; + private static final String className = TwitterFireHose.class.getName(); private static final int parallelism = 1; + private static final String JAR_FILE_NAME = "pulsar-io-twitter.nar"; + private static final String INVALID_JAR_FILE_NAME = "pulsar-io-cassandra.nar"; + private String JAR_FILE_PATH; + private String INVALID_JAR_FILE_PATH; private WorkerService mockedWorkerService; private FunctionMetaDataManager mockedManager; @@ -124,6 +115,13 @@ public class SourceApiV2ResourceTest { when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace); when(mockedWorkerService.isInitialized()).thenReturn(true); + URL file = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME); + if (file == null) { + throw new RuntimeException("Failed to file required test archive: " + JAR_FILE_NAME); + } + JAR_FILE_PATH = file.getFile(); + INVALID_JAR_FILE_PATH = Thread.currentThread().getContextClassLoader().getResource(INVALID_JAR_FILE_NAME).getFile(); + // worker config WorkerConfig workerConfig = new WorkerConfig() .setWorkerId("test") @@ -135,9 +133,6 @@ public class SourceApiV2ResourceTest { when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig); this.resource = spy(new FunctionsImpl(() -> mockedWorkerService)); - mockStatic(SourceConfigUtils.class); - when(SourceConfigUtils.convert(anyObject(), anyObject())).thenReturn(FunctionDetails.newBuilder().build()); - when(SourceConfigUtils.validate(any(), any(), any(), any())).thenReturn(null); Mockito.doReturn("Source").when(this.resource).calculateSubjectType(any()); } @@ -157,6 +152,7 @@ public class SourceApiV2ResourceTest { outputSerdeClassName, className, parallelism, + null, "Tenant is not provided"); } @@ -172,11 +168,12 @@ public class SourceApiV2ResourceTest { outputSerdeClassName, className, parallelism, + null, "Namespace is not provided"); } @Test - public void testRegisterSourceMissingFunctionName() throws IOException { + public void testRegisterSourceMissingSourceName() throws IOException { testRegisterSourceMissingArguments( tenant, namespace, @@ -187,6 +184,7 @@ public class SourceApiV2ResourceTest { outputSerdeClassName, className, parallelism, + null, "Source Name is not provided"); } @@ -202,7 +200,8 @@ public class SourceApiV2ResourceTest { outputSerdeClassName, className, parallelism, - "Function Package is not provided"); + null, + "Source Package is not provided"); } @Test @@ -217,7 +216,58 @@ public class SourceApiV2ResourceTest { outputSerdeClassName, className, parallelism, - "Function Package is not provided"); + null, + "zip file is empty"); + } + + @Test + public void testRegisterSourceInvalidJarWithNoSource() throws IOException { + FileInputStream inputStream = new FileInputStream(INVALID_JAR_FILE_PATH); + testRegisterSourceMissingArguments( + tenant, + namespace, + source, + inputStream, + null, + outputTopic, + outputSerdeClassName, + className, + parallelism, + null, + "Failed to extract source class from archive"); + } + + @Test + public void testRegisterSourceNoOutputTopic() throws IOException { + FileInputStream inputStream = new FileInputStream(JAR_FILE_PATH); + testRegisterSourceMissingArguments( + tenant, + namespace, + source, + inputStream, + mockedFormData, + null, + outputSerdeClassName, + className, + parallelism, + null, + "Topic name cannot be null"); + } + + @Test + public void testRegisterSourceHttpUrl() throws IOException { + testRegisterSourceMissingArguments( + tenant, + namespace, + source, + null, + null, + outputTopic, + outputSerdeClassName, + className, + parallelism, + "http://localhost:1234/test", + "Corrupt User PackageFile " + "http://localhost:1234/test with error Connection refused (Connection refused)"); } private void testRegisterSourceMissingArguments( @@ -230,7 +280,8 @@ public class SourceApiV2ResourceTest { String outputSerdeClassName, String className, Integer parallelism, - String errorExpected) throws IOException { + String pkgUrl, + String errorExpected) { SourceConfig sourceConfig = new SourceConfig(); if (tenant != null) { sourceConfig.setTenant(tenant); @@ -260,7 +311,7 @@ public class SourceApiV2ResourceTest { function, inputStream, details, - null, + pkgUrl, null, new Gson().toJson(sourceConfig), FunctionsImpl.SOURCE, @@ -270,7 +321,7 @@ public class SourceApiV2ResourceTest { Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(errorExpected).reason); } - private Response registerDefaultSource() { + private Response registerDefaultSource() throws IOException { SourceConfig sourceConfig = new SourceConfig(); sourceConfig.setTenant(tenant); sourceConfig.setNamespace(namespace); @@ -283,7 +334,7 @@ public class SourceApiV2ResourceTest { tenant, namespace, source, - mockedInputStream, + new FileInputStream(JAR_FILE_PATH), mockedFormData, null, null, @@ -443,7 +494,7 @@ public class SourceApiV2ResourceTest { outputSerdeClassName, className, parallelism, - "Function Package is not provided"); + "Source Package is not provided"); } @Test @@ -458,7 +509,52 @@ public class SourceApiV2ResourceTest { outputSerdeClassName, className, parallelism, - "Function Package is not provided"); + "zip file is empty"); + } + + @Test + public void testUpdateSourceMissingTopicName() throws IOException { + testUpdateSourceMissingArguments( + tenant, + namespace, + source, + mockedInputStream, + mockedFormData, + null, + outputSerdeClassName, + className, + parallelism, + "Topic name cannot be null"); + } + + @Test + public void testUpdateSourceNegativeParallelism() throws IOException { + testUpdateSourceMissingArguments( + tenant, + namespace, + source, + mockedInputStream, + mockedFormData, + outputTopic, + outputSerdeClassName, + className, + -2, + "Source parallelism should positive number"); + } + + @Test + public void testUpdateSourceZeroParallelism() throws IOException { + testUpdateSourceMissingArguments( + tenant, + namespace, + source, + mockedInputStream, + mockedFormData, + outputTopic, + outputSerdeClassName, + className, + 0, + "Source parallelism should positive number"); } private void testUpdateSourceMissingArguments( @@ -527,7 +623,7 @@ public class SourceApiV2ResourceTest { tenant, namespace, source, - mockedInputStream, + new FileInputStream(JAR_FILE_PATH), mockedFormData, null, null, @@ -586,8 +682,7 @@ public class SourceApiV2ResourceTest { public void testUpdateSourceWithUrl() throws IOException { Configurator.setRootLevel(Level.DEBUG); - String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath(); - String filePackageUrl = "file://" + fileLocation; + String filePackageUrl = "file://" + JAR_FILE_PATH; SourceConfig sourceConfig = new SourceConfig(); sourceConfig.setTopicName(outputTopic);