eolivelli commented on a change in pull request #9413:
URL: https://github.com/apache/pulsar/pull/9413#discussion_r568474597
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
##########
@@ -707,13 +726,17 @@ private void testPulsarSourceLocalRun(String
jarFilePathUrl) throws Exception {
}
}
+ @Test(timeOut = 20000, groups = "builtin")
+ public void testPulsarSourceStatsBuiltin() throws Exception {
+ testPulsarSourceLocalRun(String.format("%s://data-generator",
Utils.BUILTIN));
+ }
- @Test
+ @Test(timeOut = 20000)
Review comment:
I am not sure we need these timeouts,
if the test breaks then it is not likely that other tests will run.
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
##########
@@ -354,6 +377,8 @@ private PulsarWorkerService
createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setAuthenticationEnabled(true);
workerConfig.setAuthorizationEnabled(true);
+
workerConfig.setConnectorsDirectory(Files.createTempDirectory("test").toFile().getAbsolutePath());
Review comment:
what about naming the directory "tempconnectorsdir" ?
##########
File path:
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
##########
@@ -371,114 +370,38 @@ public static ExtractedSinkDetails validate(SinkConfig
sinkConfig, Path archiveP
throw new IllegalArgumentException("Sink timeout must be a
positive number");
}
- if (archivePath == null && sinkPackageFile == null) {
- throw new IllegalArgumentException("Sink package is not provided");
- }
-
- Class<?> typeArg;
- ClassLoader classLoader;
String sinkClassName = sinkConfig.getClassName();
- ClassLoader jarClassLoader = null;
- ClassLoader narClassLoader = null;
-
- Exception jarClassLoaderException = null;
- Exception narClassLoaderException = null;
-
- try {
- jarClassLoader = ClassLoaderUtils.extractClassLoader(archivePath,
sinkPackageFile);
- } catch (Exception e) {
- jarClassLoaderException = e;
- }
- try {
- narClassLoader = FunctionCommon.extractNarClassLoader(archivePath,
sinkPackageFile, narExtractionDirectory);
- } catch (Exception e) {
- narClassLoaderException = e;
- }
-
- // if sink class name is not provided, we can only try to load archive
as a NAR
- if (isEmpty(sinkClassName)) {
- if (narClassLoader == null) {
- throw new IllegalArgumentException("Sink package does not have
the correct format. " +
- "Pulsar cannot determine if the package is a NAR
package or JAR package." +
- "Sink classname is not provided and attempts to load
it as a NAR package produced error: "
- + narClassLoaderException.getMessage());
- }
+ // if class name in sink config is not set, this should be a built-in
sink
+ // thus we should try to find it class name in the NAR service
definition
+ if (sinkClassName == null) {
try {
- sinkClassName = ConnectorUtils.getIOSinkClass(narClassLoader);
+ sinkClassName = ConnectorUtils.getIOSinkClass((NarClassLoader)
sinkClassLoader);
Review comment:
if we are blindly casting to `NarClassLoader` can we change the type of
the argument ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]