dshaver1 opened a new issue, #20950: URL: https://github.com/apache/pulsar/issues/20950
### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Version Running standalone 3.0.0 on ubuntu. ### Minimal reproduce step 1. Upload a sink archive package: ```bash pulsar-admin packages upload sink://test-tenant/test-namespace/pulsar-io-elastic-search-test@0 --path /path/to/pulsar-io-elastic-search.nar --description "test jar" ``` 2. Try to create a sink with a transform function defined, using the package you just uploaded: ```bash pulsar-admin sinks create \ --tenant test-tenant \ --namespace test-namespace \ --name sink_test_1 \ -i test-tenant/transient/sink_test_1 \ -a sink://q6/sink/pulsar-io-elastic-search-test@0 \ --transform-function https://url/to/your/transform-function.jar \ --transform-function-classname class.path.for.your.transform.function \ --transform-function-config "{}" ``` ### What did you expect to see? A working sink with a transform function. ### What did you see instead? In the standalone console logs: ``` 2023-08-07T18:21:29,715-0400 [function-timer-thread-95-1] ERROR org.apache.pulsar.functions.runtime.RuntimeSpawner - q6/sink/sink_test_3 Function Container is dead with following exception. Restarting. java.lang.RuntimeException: at org.apache.pulsar.functions.runtime.process.ProcessRuntime.tryExtractingDeathException(ProcessRuntime.java:410) ~[org.apache.pulsar-pulsar-functions-runtime-3.0.0.jar:3.0.0] at org.apache.pulsar.functions.runtime.process.ProcessRuntime.isAlive(ProcessRuntime.java:397) ~[org.apache.pulsar-pulsar-functions-runtime-3.0.0.jar:3.0.0] at org.apache.pulsar.functions.runtime.RuntimeSpawner.lambda$start$0(RuntimeSpawner.java:89) ~[org.apache.pulsar-pulsar-functions-runtime-3.0.0.jar:3.0.0] at org.apache.pulsar.common.util.Runnables$CatchingAndLoggingRunnable.run(Runnables.java:54) ~[org.apache.pulsar-pulsar-common-3.0.0.jar:3.0.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.89.Final.jar:4.1.89.Final] at java.lang.Thread.run(Thread.java:833) ~[?:?] ``` In the function logs: ``` 2023-08-07T18:20:31,382-0400 [q6/sink/sink_test_3-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - [q6/sink/sink_test_3:0] Uncaught exception in Java Instance java.lang.RuntimeException: User class must be in class path at org.apache.pulsar.common.util.Reflections.createInstance(Reflections.java:113) ~[org.apache.pulsar-pulsar-common-3.0.0.jar:3.0.0] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupOutput(JavaInstanceRunnable.java:917) ~[org.apache.pulsar-pulsar-functions-instance-3.0.0.jar:3.0.0] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setup(JavaInstanceRunnable.java:251) ~[org.apache.pulsar-pulsar-functions-instance-3.0.0.jar:3.0.0] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:290) ~[org.apache.pulsar-pulsar-functions-instance-3.0.0.jar:3.0.0] at java.lang.Thread.run(Thread.java:833) ~[?:?] Caused by: java.lang.ClassNotFoundException: org.apache.pulsar.io.elasticsearch.ElasticSearchSink at java.net.URLClassLoader.findClass(URLClassLoader.java:445) ~[?:?] at java.lang.ClassLoader.loadClass(ClassLoader.java:592) ~[?:?] at java.lang.ClassLoader.loadClass(ClassLoader.java:525) ~[?:?] at java.lang.Class.forName0(Native Method) ~[?:?] at java.lang.Class.forName(Class.java:467) ~[?:?] at org.apache.pulsar.common.util.Reflections.createInstance(Reflections.java:111) ~[org.apache.pulsar-pulsar-common-3.0.0.jar:3.0.0] ``` ### Anything else? I think this is being caused by the main archive jar being overwritten by the transform function jar in `pulsar/download/pulsar_functions/test-tenant/test-namespace/sink_test_1/0/`. If, instead pre-uploading the connector package, you just specify the local filepath when creating the sink, you'll see 2 jars in the download directory for the function. One for the main archive, and one for the transform function. When following my steps above, there's only 1 jar in that location and it's the transform function jar, hence why we see the `java.lang.ClassNotFoundException: org.apache.pulsar.io.elasticsearch.ElasticSearchSink`. I believe this is caused by the implementation of [getPackageFile in FunctionActioner](https://github.com/apache/pulsar/blob/2ab184e49a036a1dd10dc537bef4ab034a5ad5e0/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java#L168). Note that `getPackageFile` is [called twice](https://github.com/apache/pulsar/blob/2ab184e49a036a1dd10dc537bef4ab034a5ad5e0/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java#L120-L126) in `startFunction()`: ```java packageFile = getPackageFile(functionMetaData, functionDetails, instanceId, pkgLocation, InstanceUtils.calculateSubjectType(functionDetails)); if (!isEmpty(transformFunctionPkgLocation.getPackagePath())) { transformFunctionPackageFile = getPackageFile(functionMetaData, functionDetails, instanceId, transformFunctionPkgLocation, FunctionDetails.ComponentType.FUNCTION); } ``` Once for the main archive, and once for the transform function. But because the `functionMetaData` is the same for each call, the target filename generated will be the same, since it's derived solely from the `functionMetaData.functionDetails`: ```java String[] hierarchy = functionDetails.getClassName().split("\\."); String fileName; if (hierarchy.length <= 0) { fileName = functionDetails.getClassName(); } else if (hierarchy.length == 1) { fileName = hierarchy[0]; } else { fileName = hierarchy[hierarchy.length - 2]; } ``` ### Are you willing to submit a PR? - [X] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
