jerrypeng commented on a change in pull request #9413:
URL: https://github.com/apache/pulsar/pull/9413#discussion_r568848175



##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
##########
@@ -707,13 +726,17 @@ private void testPulsarSourceLocalRun(String 
jarFilePathUrl) throws Exception {
         }
     }
 
+    @Test(timeOut = 20000, groups = "builtin")
+    public void testPulsarSourceStatsBuiltin() throws Exception {
+        testPulsarSourceLocalRun(String.format("%s://data-generator", 
Utils.BUILTIN));
+    }
 
-    @Test
+    @Test(timeOut = 20000)

Review comment:
       The individual timeout is in place so we don't just block for 1.5-2 hour 
which is the absolute test timeout and consume resources for that time.

##########
File path: 
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
##########
@@ -371,114 +370,38 @@ public static ExtractedSinkDetails validate(SinkConfig 
sinkConfig, Path archiveP
             throw new IllegalArgumentException("Sink timeout must be a 
positive number");
         }
 
-        if (archivePath == null && sinkPackageFile == null) {
-            throw new IllegalArgumentException("Sink package is not provided");
-        }
-
-        Class<?> typeArg;
-        ClassLoader classLoader;
         String sinkClassName = sinkConfig.getClassName();
-        ClassLoader jarClassLoader = null;
-        ClassLoader narClassLoader = null;
-
-        Exception jarClassLoaderException = null;
-        Exception narClassLoaderException = null;
-
-        try {
-            jarClassLoader = ClassLoaderUtils.extractClassLoader(archivePath, 
sinkPackageFile);
-        } catch (Exception e) {
-            jarClassLoaderException = e;
-        }
-        try {
-            narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, 
sinkPackageFile, narExtractionDirectory);
-        } catch (Exception e) {
-            narClassLoaderException = e;
-        }
-
-        // if sink class name is not provided, we can only try to load archive 
as a NAR
-        if (isEmpty(sinkClassName)) {
-            if (narClassLoader == null) {
-                throw new IllegalArgumentException("Sink package does not have 
the correct format. " +
-                        "Pulsar cannot determine if the package is a NAR 
package or JAR package." +
-                        "Sink classname is not provided and attempts to load 
it as a NAR package produced error: "
-                        + narClassLoaderException.getMessage());
-            }
+        // if class name in sink config is not set, this should be a built-in 
sink
+        // thus we should try to find it class name in the NAR service 
definition
+        if (sinkClassName == null) {
             try {
-                sinkClassName = ConnectorUtils.getIOSinkClass(narClassLoader);
+                sinkClassName = ConnectorUtils.getIOSinkClass((NarClassLoader) 
sinkClassLoader);

Review comment:
       `sinkClassLoader` may not be an instance of NarClassLoader.  In the line 
of code, if the `sinkClassName` is not specified, we are only trying to see if 
we can find a className specified in the NAR. 

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
##########
@@ -354,6 +377,8 @@ private PulsarWorkerService 
createPulsarFunctionWorker(ServiceConfiguration conf
         workerConfig.setAuthenticationEnabled(true);
         workerConfig.setAuthorizationEnabled(true);
 
+        
workerConfig.setConnectorsDirectory(Files.createTempDirectory("test").toFile().getAbsolutePath());

Review comment:
       sure




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to