jerrypeng commented on a change in pull request #9413:
URL: https://github.com/apache/pulsar/pull/9413#discussion_r568848175
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
##########
@@ -707,13 +726,17 @@ private void testPulsarSourceLocalRun(String
jarFilePathUrl) throws Exception {
}
}
+ @Test(timeOut = 20000, groups = "builtin")
+ public void testPulsarSourceStatsBuiltin() throws Exception {
+ testPulsarSourceLocalRun(String.format("%s://data-generator",
Utils.BUILTIN));
+ }
- @Test
+ @Test(timeOut = 20000)
Review comment:
The individual timeout is in place so we don't just block for 1.5-2 hour
which is the absolute test timeout and consume resources for that time.
##########
File path:
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
##########
@@ -371,114 +370,38 @@ public static ExtractedSinkDetails validate(SinkConfig
sinkConfig, Path archiveP
throw new IllegalArgumentException("Sink timeout must be a
positive number");
}
- if (archivePath == null && sinkPackageFile == null) {
- throw new IllegalArgumentException("Sink package is not provided");
- }
-
- Class<?> typeArg;
- ClassLoader classLoader;
String sinkClassName = sinkConfig.getClassName();
- ClassLoader jarClassLoader = null;
- ClassLoader narClassLoader = null;
-
- Exception jarClassLoaderException = null;
- Exception narClassLoaderException = null;
-
- try {
- jarClassLoader = ClassLoaderUtils.extractClassLoader(archivePath,
sinkPackageFile);
- } catch (Exception e) {
- jarClassLoaderException = e;
- }
- try {
- narClassLoader = FunctionCommon.extractNarClassLoader(archivePath,
sinkPackageFile, narExtractionDirectory);
- } catch (Exception e) {
- narClassLoaderException = e;
- }
-
- // if sink class name is not provided, we can only try to load archive
as a NAR
- if (isEmpty(sinkClassName)) {
- if (narClassLoader == null) {
- throw new IllegalArgumentException("Sink package does not have
the correct format. " +
- "Pulsar cannot determine if the package is a NAR
package or JAR package." +
- "Sink classname is not provided and attempts to load
it as a NAR package produced error: "
- + narClassLoaderException.getMessage());
- }
+ // if class name in sink config is not set, this should be a built-in
sink
+ // thus we should try to find it class name in the NAR service
definition
+ if (sinkClassName == null) {
try {
- sinkClassName = ConnectorUtils.getIOSinkClass(narClassLoader);
+ sinkClassName = ConnectorUtils.getIOSinkClass((NarClassLoader)
sinkClassLoader);
Review comment:
`sinkClassLoader` may not be an instance of NarClassLoader. In the line
of code, if the `sinkClassName` is not specified, we are only trying to see if
we can find a className specified in the NAR.
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
##########
@@ -354,6 +377,8 @@ private PulsarWorkerService
createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setAuthenticationEnabled(true);
workerConfig.setAuthorizationEnabled(true);
+
workerConfig.setConnectorsDirectory(Files.createTempDirectory("test").toFile().getAbsolutePath());
Review comment:
sure
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]