dshaver1 opened a new issue, #20950:
URL: https://github.com/apache/pulsar/issues/20950

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Version
   
   Running standalone 3.0.0 on ubuntu.
   
   ### Minimal reproduce step
   
   1. Upload a sink archive package:
   ```bash
   pulsar-admin packages upload 
sink://test-tenant/test-namespace/pulsar-io-elastic-search-test@0 --path 
/path/to/pulsar-io-elastic-search.nar --description "test jar"
   ```
   2. Try to create a sink with a transform function defined, using the package 
you just uploaded:
   ```bash
   pulsar-admin sinks create \
   --tenant test-tenant \
   --namespace test-namespace \
   --name sink_test_1 \
   -i test-tenant/transient/sink_test_1 \
   -a sink://q6/sink/pulsar-io-elastic-search-test@0 \
   --transform-function https://url/to/your/transform-function.jar \
   --transform-function-classname class.path.for.your.transform.function \
   --transform-function-config "{}"
   ```
   
   ### What did you expect to see?
   
   A working sink with a transform function.
   
   ### What did you see instead?
   
   In the standalone console logs:
   
   ```
   2023-08-07T18:21:29,715-0400 [function-timer-thread-95-1] ERROR 
org.apache.pulsar.functions.runtime.RuntimeSpawner - q6/sink/sink_test_3 
Function Container is dead with following exception. Restarting.
   java.lang.RuntimeException: 
        at 
org.apache.pulsar.functions.runtime.process.ProcessRuntime.tryExtractingDeathException(ProcessRuntime.java:410)
 ~[org.apache.pulsar-pulsar-functions-runtime-3.0.0.jar:3.0.0]
        at 
org.apache.pulsar.functions.runtime.process.ProcessRuntime.isAlive(ProcessRuntime.java:397)
 ~[org.apache.pulsar-pulsar-functions-runtime-3.0.0.jar:3.0.0]
        at 
org.apache.pulsar.functions.runtime.RuntimeSpawner.lambda$start$0(RuntimeSpawner.java:89)
 ~[org.apache.pulsar-pulsar-functions-runtime-3.0.0.jar:3.0.0]
        at 
org.apache.pulsar.common.util.Runnables$CatchingAndLoggingRunnable.run(Runnables.java:54)
 ~[org.apache.pulsar-pulsar-common-3.0.0.jar:3.0.0]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) 
~[?:?]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
 ~[?:?]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 
~[?:?]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 
~[?:?]
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[io.netty-netty-common-4.1.89.Final.jar:4.1.89.Final]
        at java.lang.Thread.run(Thread.java:833) ~[?:?]
   ```
   
   In the function logs:
   ```
   2023-08-07T18:20:31,382-0400 [q6/sink/sink_test_3-0] ERROR 
org.apache.pulsar.functions.instance.JavaInstanceRunnable - 
[q6/sink/sink_test_3:0] Uncaught exception in Java Instance
   java.lang.RuntimeException: User class must be in class path
           at 
org.apache.pulsar.common.util.Reflections.createInstance(Reflections.java:113) 
~[org.apache.pulsar-pulsar-common-3.0.0.jar:3.0.0]
           at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupOutput(JavaInstanceRunnable.java:917)
 ~[org.apache.pulsar-pulsar-functions-instance-3.0.0.jar:3.0.0]
           at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.setup(JavaInstanceRunnable.java:251)
 ~[org.apache.pulsar-pulsar-functions-instance-3.0.0.jar:3.0.0]
           at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:290)
 ~[org.apache.pulsar-pulsar-functions-instance-3.0.0.jar:3.0.0]
           at java.lang.Thread.run(Thread.java:833) ~[?:?]
   Caused by: java.lang.ClassNotFoundException: 
org.apache.pulsar.io.elasticsearch.ElasticSearchSink
           at java.net.URLClassLoader.findClass(URLClassLoader.java:445) ~[?:?]
           at java.lang.ClassLoader.loadClass(ClassLoader.java:592) ~[?:?]
           at java.lang.ClassLoader.loadClass(ClassLoader.java:525) ~[?:?]
           at java.lang.Class.forName0(Native Method) ~[?:?]
           at java.lang.Class.forName(Class.java:467) ~[?:?]
           at 
org.apache.pulsar.common.util.Reflections.createInstance(Reflections.java:111) 
~[org.apache.pulsar-pulsar-common-3.0.0.jar:3.0.0]
   ```
   
   ### Anything else?
   
   I think this is being caused by the main archive jar being overwritten by 
the transform function jar in 
`pulsar/download/pulsar_functions/test-tenant/test-namespace/sink_test_1/0/`. 
If, instead pre-uploading the connector package, you just specify the local 
filepath when creating the sink, you'll see 2 jars in the download directory 
for the function. One for the main archive, and one for the transform function. 
When following my steps above, there's only 1 jar in that location and it's the 
transform function jar, hence why we see the `java.lang.ClassNotFoundException: 
org.apache.pulsar.io.elasticsearch.ElasticSearchSink`.
   
   I believe this is caused by the implementation of [getPackageFile in 
FunctionActioner](https://github.com/apache/pulsar/blob/2ab184e49a036a1dd10dc537bef4ab034a5ad5e0/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java#L168).
 Note that `getPackageFile` is [called 
twice](https://github.com/apache/pulsar/blob/2ab184e49a036a1dd10dc537bef4ab034a5ad5e0/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java#L120-L126)
 in `startFunction()`:
   
   ```java
                   packageFile = getPackageFile(functionMetaData, 
functionDetails, instanceId, pkgLocation,
                           InstanceUtils.calculateSubjectType(functionDetails));
                   if (!isEmpty(transformFunctionPkgLocation.getPackagePath())) 
{
                       transformFunctionPackageFile =
                               getPackageFile(functionMetaData, 
functionDetails, instanceId, transformFunctionPkgLocation,
                                       FunctionDetails.ComponentType.FUNCTION);
                   }
   ```
   
   Once for the main archive, and once for the transform function. But because 
the `functionMetaData` is the same for each call, the target filename generated 
will be the same, since it's derived solely from the 
`functionMetaData.functionDetails`:
   
   ```java
           String[] hierarchy = functionDetails.getClassName().split("\\.");
           String fileName;
           if (hierarchy.length <= 0) {
               fileName = functionDetails.getClassName();
           } else if (hierarchy.length == 1) {
               fileName =  hierarchy[0];
           } else {
               fileName = hierarchy[hierarchy.length - 2];
           }
   ```
   
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to