This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e8025d5 fix issue when submitting NAR via file url (#4577)
e8025d5 is described below
commit e8025d50c5d2cf0a632ad1573308b676d0607923
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Wed Jul 3 14:31:23 2019 -0700
fix issue when submitting NAR via file url (#4577)
* fix issue when submitting NAR via file url
* fix unit tests
* add more specific errors
* fix test
---
.../worker/PulsarFunctionLocalRunTest.java | 21 ++-
.../org/apache/pulsar/functions/LocalRunner.java | 1 -
.../pulsar/functions/utils/SinkConfigUtils.java | 145 +++++++++++++--------
.../pulsar/functions/utils/SourceConfigUtils.java | 122 ++++++++++-------
.../functions/worker/rest/api/FunctionsImpl.java | 1 -
.../functions/worker/rest/api/SinksImpl.java | 1 -
.../functions/worker/rest/api/SourcesImpl.java | 1 -
.../worker/rest/api/v3/SinkApiV3ResourceTest.java | 17 ++-
.../rest/api/v3/SourceApiV3ResourceTest.java | 5 +-
9 files changed, 192 insertions(+), 122 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index bdcd27d..db7dbf2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -599,12 +599,11 @@ public class PulsarFunctionLocalRunTest {
testPulsarSourceLocalRun(null);
}
- // TODO bug to fix involving submitting a NAR via URI
file:///tmp/pulsar-io-twitter-0.0.1.nar
-// @Test(timeOut = 20000)
-// public void testPulsarSourceLocalRunWithFile() throws Exception {
-// String jarFilePathUrl = Utils.FILE + ":" +
getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
-// testPulsarSourceStats(jarFilePathUrl);
-// }
+ @Test(timeOut = 20000)
+ public void testPulsarSourceLocalRunWithFile() throws Exception {
+ String jarFilePathUrl = Utils.FILE + ":" +
getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+ testPulsarSourceLocalRun(jarFilePathUrl);
+ }
@Test(timeOut = 40000)
public void testPulsarSourceLocalRunWithUrl() throws Exception {
@@ -705,11 +704,11 @@ public class PulsarFunctionLocalRunTest {
testPulsarSinkStats(null);
}
-// @Test(timeOut = 20000)
-// public void testPulsarSinkStatsWithFile() throws Exception {
-// String jarFilePathUrl = Utils.FILE + ":" +
getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
-// testPulsarSinkStats(jarFilePathUrl);
-// }
+ @Test(timeOut = 20000)
+ public void testPulsarSinkStatsWithFile() throws Exception {
+ String jarFilePathUrl = Utils.FILE + ":" +
getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+ testPulsarSinkStats(jarFilePathUrl);
+ }
@Test(timeOut = 40000)
public void testPulsarSinkStatsWithUrl() throws Exception {
diff --git
a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 1540744..f4c6eca 100644
---
a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++
b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -248,7 +248,6 @@ public class LocalRunner {
.loadClass(LocalRunner.class.getName())
.getProtectionDomain().getCodeSource().getLocation().getFile();
}
- log.info("userCodeFile: {}", userCodeFile);
String builtInSource = isBuiltInSource(userCodeFile);
if (builtInSource != null) {
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 42eedc3..f0a39b9 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -42,7 +42,11 @@ import java.io.File;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.file.Path;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -283,7 +287,7 @@ public class SinkConfigUtils {
}
public static ExtractedSinkDetails validate(SinkConfig sinkConfig, Path
archivePath,
- File uploadedInputStreamAsFile) {
+ File sinkPackageFile) {
if (isEmpty(sinkConfig.getTenant())) {
throw new IllegalArgumentException("Sink tenant cannot be null");
}
@@ -318,79 +322,112 @@ public class SinkConfigUtils {
throw new IllegalArgumentException("Sink timeout must be a
positive number");
}
- String sinkClassName;
- final Class<?> typeArg;
- final ClassLoader classLoader;
- if (!isEmpty(sinkConfig.getClassName())) {
- sinkClassName = sinkConfig.getClassName();
- // We really don't know if we should use nar class loader or
regular classloader
- ClassLoader jarClassLoader = null;
- ClassLoader narClassLoader = null;
- try {
- jarClassLoader =
FunctionCommon.extractClassLoader(archivePath, uploadedInputStreamAsFile);
- } catch (Exception e) {
+ if (archivePath == null && sinkPackageFile == null) {
+ throw new IllegalArgumentException("Sink package is not provided");
+ }
+
+ Class<?> typeArg;
+ ClassLoader classLoader;
+ String sinkClassName = sinkConfig.getClassName();
+ ClassLoader jarClassLoader = null;
+ ClassLoader narClassLoader = null;
+
+ Exception jarClassLoaderException = null;
+ Exception narClassLoaderException = null;
+
+ try {
+ jarClassLoader = FunctionCommon.extractClassLoader(archivePath,
sinkPackageFile);
+ } catch (Exception e) {
+ jarClassLoaderException = e;
+ }
+ try {
+ narClassLoader = FunctionCommon.extractNarClassLoader(archivePath,
sinkPackageFile);
+ } catch (Exception e) {
+ narClassLoaderException = e;
+ }
+
+ // if sink class name is not provided, we can only try to load archive
as a NAR
+ if (isEmpty(sinkClassName)) {
+ if (narClassLoader == null) {
+ throw new IllegalArgumentException("Sink package does not have
the correct format. " +
+ "Pulsar cannot determine if the package is a NAR
package or JAR package." +
+ "Sink classname is not provided and attempts to load
it as a NAR package produced error: "
+ + narClassLoaderException.getMessage());
}
try {
- narClassLoader =
FunctionCommon.extractNarClassLoader(archivePath, uploadedInputStreamAsFile);
- } catch (Exception e) {
- }
- if (jarClassLoader == null && narClassLoader == null) {
- throw new IllegalArgumentException("Invalid Sink Package");
+ sinkClassName = ConnectorUtils.getIOSinkClass(narClassLoader);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to extract Sink
class from archive", e);
}
- // We use typeArg and classLoader as arguments for lambda
functions that require them to be final
- // Thus we use these tmp vars
- Class<?> tmptypeArg;
- ClassLoader tmpclassLoader;
try {
- tmptypeArg = getSinkType(sinkClassName, narClassLoader);
- tmpclassLoader = narClassLoader;
- } catch (Exception e) {
+ typeArg = getSinkType(sinkClassName, narClassLoader);
+ classLoader = narClassLoader;
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(
+ String.format("Sink class %s must be in class path",
sinkClassName), e);
+ }
+
+ } else {
+ // if sink class name is provided, we need to try to load it as a
JAR and as a NAR.
+ if (jarClassLoader != null) {
+ try {
+ typeArg = getSinkType(sinkClassName, jarClassLoader);
+ classLoader = jarClassLoader;
+ } catch (ClassNotFoundException e) {
+ // class not found in JAR try loading as a NAR and
searching for the class
+ if (narClassLoader != null) {
+ try {
+ typeArg = getSinkType(sinkClassName,
narClassLoader);
+ classLoader = narClassLoader;
+ } catch (ClassNotFoundException e1) {
+ throw new IllegalArgumentException(
+ String.format("Sink class %s must be in
class path", sinkClassName), e1);
+ }
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Sink class %s must be in class
path", sinkClassName), e);
+ }
+ }
+ } else if (narClassLoader != null) {
try {
- tmptypeArg = getSinkType(sinkClassName, jarClassLoader);
+ typeArg = getSinkType(sinkClassName, narClassLoader);
+ classLoader = narClassLoader;
} catch (ClassNotFoundException e1) {
throw new IllegalArgumentException(
String.format("Sink class %s must be in class
path", sinkClassName), e1);
}
- tmpclassLoader = jarClassLoader;
- }
- typeArg = tmptypeArg;
- classLoader = tmpclassLoader;
- } else if
(!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) &&
sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE))
{
- throw new IllegalArgumentException("Class-name must be present for
archive with file-url");
- } else {
- classLoader = FunctionCommon.extractNarClassLoader(archivePath,
uploadedInputStreamAsFile);
- if (classLoader == null) {
- throw new IllegalArgumentException("Sink Package is not
provided");
- }
- try {
- sinkClassName = ConnectorUtils.getIOSinkClass(classLoader);
- } catch (IOException e1) {
- throw new IllegalArgumentException("Failed to extract sink
class from archive", e1);
- }
- try {
- typeArg = getSinkType(sinkClassName, classLoader);
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException(
- String.format("Sink class %s must be in class path",
sinkClassName), e);
+ } else {
+ StringBuilder errorMsg = new StringBuilder("Sink package does
not have the correct format." +
+ " Pulsar cannot determine if the package is a NAR
package or JAR package.");
+
+ if (jarClassLoaderException != null) {
+ errorMsg.append("Attempts to load it as a JAR package
produced error: " + jarClassLoaderException.getMessage());
+ }
+
+ if (narClassLoaderException != null) {
+ errorMsg.append("Attempts to load it as a NAR package
produced error: " + narClassLoaderException.getMessage());
+ }
+
+ throw new IllegalArgumentException(errorMsg.toString());
}
}
if (sinkConfig.getTopicToSerdeClassName() != null) {
- sinkConfig.getTopicToSerdeClassName().forEach((topicName,
serdeClassName) -> {
- ValidatorUtils.validateSerde(serdeClassName, typeArg,
classLoader, true);
- });
+ for (String serdeClassName :
sinkConfig.getTopicToSerdeClassName().values()) {
+ ValidatorUtils.validateSerde(serdeClassName, typeArg,
classLoader, true);
+ }
}
if (sinkConfig.getTopicToSchemaType() != null) {
- sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType)
-> {
+ for (String schemaType :
sinkConfig.getTopicToSchemaType().values()) {
ValidatorUtils.validateSchema(schemaType, typeArg,
classLoader, true);
- });
+ }
}
// topicsPattern does not need checks
if (sinkConfig.getInputSpecs() != null) {
- sinkConfig.getInputSpecs().forEach((topicName, consumerSpec) -> {
+ for (ConsumerConfig consumerSpec :
sinkConfig.getInputSpecs().values()) {
// Only one is set
if (!isEmpty(consumerSpec.getSerdeClassName()) &&
!isEmpty(consumerSpec.getSchemaType())) {
throw new IllegalArgumentException("Only one of
serdeClassName or schemaType should be set");
@@ -401,7 +438,7 @@ public class SinkConfigUtils {
if (!isEmpty(consumerSpec.getSchemaType())) {
ValidatorUtils.validateSchema(consumerSpec.getSchemaType(), typeArg,
classLoader, true);
}
- });
+ }
}
return new ExtractedSinkDetails(sinkClassName, typeArg.getName());
}
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index edbb6d7..149204c 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -222,61 +222,93 @@ public class SourceConfigUtils {
if (sourceConfig.getResources() != null) {
ResourceConfigUtils.validate(sourceConfig.getResources());
}
+ if (archivePath == null && sourcePackageFile == null) {
+ throw new IllegalArgumentException("Source package is not
provided");
+ }
- String sourceClassName;
- final Class<?> typeArg;
- final ClassLoader classLoader;
- if (!isEmpty(sourceConfig.getClassName())) {
- sourceClassName = sourceConfig.getClassName();
- // We really don't know if we should use nar class loader or
regular classloader
- ClassLoader jarClassLoader = null;
- ClassLoader narClassLoader = null;
- try {
- jarClassLoader =
FunctionCommon.extractClassLoader(archivePath, sourcePackageFile);
- } catch (Exception e) {
+ Class<?> typeArg;
+ ClassLoader classLoader;
+ String sourceClassName = sourceConfig.getClassName();
+ ClassLoader jarClassLoader = null;
+ ClassLoader narClassLoader = null;
+
+ Exception jarClassLoaderException = null;
+ Exception narClassLoaderException = null;
+
+ try {
+ jarClassLoader = FunctionCommon.extractClassLoader(archivePath,
sourcePackageFile);
+ } catch (Exception e) {
+ jarClassLoaderException = e;
+ }
+ try {
+ narClassLoader = FunctionCommon.extractNarClassLoader(archivePath,
sourcePackageFile);
+ } catch (Exception e) {
+ narClassLoaderException = e;
+ }
+
+ // if source class name is not provided, we can only try to load
archive as a NAR
+ if (isEmpty(sourceClassName)) {
+ if (narClassLoader == null) {
+ throw new IllegalArgumentException("Source package does not
have the correct format. " +
+ "Pulsar cannot determine if the package is a NAR
package or JAR package." +
+ "Source classname is not provided and attempts to load
it as a NAR package produced the following error.",
+ narClassLoaderException);
}
try {
- narClassLoader =
FunctionCommon.extractNarClassLoader(archivePath, sourcePackageFile);
- } catch (Exception e) {
- }
- if (jarClassLoader == null && narClassLoader == null) {
- throw new IllegalArgumentException("Invalid Source Package");
+ sourceClassName =
ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to extract source
class from archive", e);
}
- // We use typeArg and classLoader as arguments for lambda
functions that require them to be final
- // Thus we use these tmp vars
- Class<?> tmptypeArg;
- ClassLoader tmpclassLoader;
try {
- tmptypeArg = getSourceType(sourceClassName, narClassLoader);
- tmpclassLoader = narClassLoader;
- } catch (Exception e) {
+ typeArg = getSourceType(sourceClassName, narClassLoader);
+ classLoader = narClassLoader;
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(
+ String.format("Source class %s must be in class path",
sourceClassName), e);
+ }
+
+ } else {
+ // if source class name is provided, we need to try to load it as
a JAR and as a NAR.
+ if (jarClassLoader != null) {
try {
- tmptypeArg = getSourceType(sourceClassName,
jarClassLoader);
+ typeArg = getSourceType(sourceClassName, jarClassLoader);
+ classLoader = jarClassLoader;
+ } catch (ClassNotFoundException e) {
+ // class not found in JAR try loading as a NAR and
searching for the class
+ if (narClassLoader != null) {
+ try {
+ typeArg = getSourceType(sourceClassName,
narClassLoader);
+ classLoader = narClassLoader;
+ } catch (ClassNotFoundException e1) {
+ throw new IllegalArgumentException(
+ String.format("Source class %s must be in
class path", sourceClassName), e1);
+ }
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Source class %s must be in
class path", sourceClassName), e);
+ }
+ }
+ } else if (narClassLoader != null) {
+ try {
+ typeArg = getSourceType(sourceClassName, narClassLoader);
+ classLoader = narClassLoader;
} catch (ClassNotFoundException e1) {
throw new IllegalArgumentException(
String.format("Source class %s must be in class
path", sourceClassName), e1);
}
- tmpclassLoader = jarClassLoader;
- }
- typeArg = tmptypeArg;
- classLoader = tmpclassLoader;
- } else if (!StringUtils.isEmpty(sourceConfig.getArchive()) &&
sourceConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE))
{
- throw new IllegalArgumentException("Class-name must be present for
archive with file-url");
- } else {
- classLoader = FunctionCommon.extractNarClassLoader(archivePath,
sourcePackageFile);
- if (classLoader == null) {
- throw new IllegalArgumentException("Source Package is not
provided");
- }
- try {
- sourceClassName =
ConnectorUtils.getIOSourceClass((NarClassLoader) classLoader);
- } catch (IOException e1) {
- throw new IllegalArgumentException("Failed to extract source
class from archive", e1);
- }
- try {
- typeArg = getSourceType(sourceClassName, classLoader);
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException(
- String.format("Source class %s must be in class path",
sourceClassName), e);
+ } else {
+ StringBuilder errorMsg = new StringBuilder("Source package
does not have the correct format." +
+ " Pulsar cannot determine if the package is a NAR
package or JAR package.");
+
+ if (jarClassLoaderException != null) {
+ errorMsg.append("Attempts to load it as a JAR package
produced error: " + jarClassLoaderException.getMessage());
+ }
+
+ if (narClassLoaderException != null) {
+ errorMsg.append("Attempts to load it as a NAR package
produced error: " + narClassLoaderException.getMessage());
+ }
+
+ throw new IllegalArgumentException(errorMsg.toString());
}
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index af23496..d96ed30 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -339,7 +339,6 @@ public class FunctionsImpl extends ComponentImpl {
componentPackageFile = FunctionCommon.createPkgTempFile();
componentPackageFile.deleteOnExit();
- log.info("componentPackageFile: {}", componentPackageFile);
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(),
componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
functionDetails = validateUpdateRequestParams(tenant,
namespace, functionName,
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index 9052266..659cb66 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -344,7 +344,6 @@ public class SinksImpl extends ComponentImpl {
componentPackageFile = FunctionCommon.createPkgTempFile();
componentPackageFile.deleteOnExit();
- log.info("componentPackageFile: {}", componentPackageFile);
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(),
componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
functionDetails = validateUpdateRequestParams(tenant,
namespace, sinkName,
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index e14f167..e9cfeb0 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -341,7 +341,6 @@ public class SourcesImpl extends ComponentImpl {
componentPackageFile = FunctionCommon.createPkgTempFile();
componentPackageFile.deleteOnExit();
- log.info("componentPackageFile: {}", componentPackageFile);
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(),
componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
functionDetails = validateUpdateRequestParams(tenant,
namespace, sourceName,
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 793e9cd..41b3204 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.functions.worker.rest.api.v3;
import com.google.common.collect.Lists;
-import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.logging.log4j.Level;
@@ -243,7 +242,7 @@ public class SinkApiV3ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class,
expectedExceptionsMessageRegExp = "Sink Package is not provided")
+ @Test(expectedExceptions = RestException.class,
expectedExceptionsMessageRegExp = "Sink package is not provided")
public void testRegisterSinkMissingPackage() {
try {
testRegisterSinkMissingArguments(
@@ -283,7 +282,10 @@ public class SinkApiV3ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class,
expectedExceptionsMessageRegExp = "zip file is empty")
+ @Test(expectedExceptions = RestException.class,
expectedExceptionsMessageRegExp = "Sink package does not have the" +
+ " correct format. Pulsar cannot determine if the package is a NAR
package" +
+ " or JAR package.Sink classname is not provided and attempts to
load it as a NAR package produced error: " +
+ "zip file is empty")
public void testRegisterSinkMissingPackageDetails() {
try {
testRegisterSinkMissingArguments(
@@ -303,7 +305,7 @@ public class SinkApiV3ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class,
expectedExceptionsMessageRegExp = "Failed to extract sink class from archive")
+ @Test(expectedExceptions = RestException.class,
expectedExceptionsMessageRegExp = "Failed to extract Sink class from archive")
public void testRegisterSinkInvalidJarNoSink() throws IOException {
try {
FileInputStream inputStream = new
FileInputStream(INVALID_JAR_FILE_PATH);
@@ -948,6 +950,7 @@ public class SinkApiV3ResourceTest {
anyString(),
any(File.class),
any(Namespace.class));
+ PowerMockito.when(WorkerUtils.class, "dumpToTmpFile",
any()).thenCallRealMethod();
when(mockedManager.containsFunction(eq(tenant), eq(namespace),
eq(sink))).thenReturn(true);
@@ -961,7 +964,7 @@ public class SinkApiV3ResourceTest {
}
@Test
- public void testUpdateSinkWithUrl() throws IOException,
ClassNotFoundException {
+ public void testUpdateSinkWithUrl() throws Exception {
Configurator.setRootLevel(Level.DEBUG);
String filePackageUrl = "file://" + JAR_FILE_PATH;
@@ -982,6 +985,7 @@ public class SinkApiV3ResourceTest {
mockStatic(FunctionCommon.class);
doReturn(String.class).when(FunctionCommon.class);
FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class));
+ PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL",
any()).thenCallRealMethod();
doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class));
@@ -989,7 +993,6 @@ public class SinkApiV3ResourceTest {
doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
-
this.mockedFunctionMetaData =
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(any(), any(),
any())).thenReturn(mockedFunctionMetaData);
@@ -1019,6 +1022,7 @@ public class SinkApiV3ResourceTest {
anyString(),
any(File.class),
any(Namespace.class));
+ PowerMockito.when(WorkerUtils.class, "dumpToTmpFile",
any()).thenCallRealMethod();
when(mockedManager.containsFunction(eq(tenant), eq(namespace),
eq(sink))).thenReturn(true);
@@ -1044,6 +1048,7 @@ public class SinkApiV3ResourceTest {
anyString(),
any(File.class),
any(Namespace.class));
+ PowerMockito.when(WorkerUtils.class, "dumpToTmpFile",
any()).thenCallRealMethod();
when(mockedManager.containsFunction(eq(tenant), eq(namespace),
eq(sink))).thenReturn(true);
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 17b4595..276f3c0 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -263,7 +263,7 @@ public class SourceApiV3ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class,
expectedExceptionsMessageRegExp = "Source Package is not provided")
+ @Test(expectedExceptions = RestException.class,
expectedExceptionsMessageRegExp = "Source package is not provided")
public void testRegisterSourceMissingPackage() {
try {
testRegisterSourceMissingArguments(
@@ -979,7 +979,7 @@ public class SourceApiV3ResourceTest {
}
@Test
- public void testUpdateSourceWithUrl() throws IOException,
ClassNotFoundException {
+ public void testUpdateSourceWithUrl() throws Exception {
Configurator.setRootLevel(Level.DEBUG);
String filePackageUrl = "file://" + JAR_FILE_PATH;
@@ -1000,6 +1000,7 @@ public class SourceApiV3ResourceTest {
mockStatic(FunctionCommon.class);
doReturn(String.class).when(FunctionCommon.class);
FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
+ PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL",
any()).thenCallRealMethod();
doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class));