gmiklos-ltg opened a new pull request, #20115: URL: https://github.com/apache/pulsar/pull/20115
Fixes #5350 ### Motivation While evaluating Pulsar Functions for our data transformation use-cases we ran into the following startup issue with our function that was using a custom SerDe class: `2023-04-12T11:30:35,368+0200 [public/default/user-transform-new-2-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - [public/default/user-transform-new-2:0] Uncaught exception in Java Instance java.lang.RuntimeException: User class must be in class path at org.apache.pulsar.common.util.Reflections.createInstance(Reflections.java:72) ~[org.apache.pulsar-pulsar-common-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.instance.InstanceUtils.createInstance(InstanceUtils.java:92) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.instance.InstanceUtils.initializeSerDe(InstanceUtils.java:51) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:238) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:246) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.source.TopicSchema.lambda$getSchema$0(TopicSchema.java:68) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at java.util.HashMap.computeIfAbsent(HashMap.java:1220) ~[?:?] at org.apache.pulsar.functions.source.TopicSchema.getSchema(TopicSchema.java:68) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.source.PulsarSource.buildPulsarSourceConsumerConfig(PulsarSource.java:176) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.source.SingleConsumerPulsarSource.open(SingleConsumerPulsarSource.java:69) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupInput(JavaInstanceRunnable.java:774) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setup(JavaInstanceRunnable.java:226) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:259) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at java.lang.Thread.run(Thread.java:833) ~[?:?] Caused by: java.lang.ClassNotFoundException: com.bridge.data.UserMessageSerDe at java.net.URLClassLoader.findClass(URLClassLoader.java:445) ~[?:?] at java.lang.ClassLoader.loadClass(ClassLoader.java:587) ~[?:?] at java.lang.ClassLoader.loadClass(ClassLoader.java:520) ~[?:?] at java.lang.Class.forName0(Native Method) ~[?:?] at java.lang.Class.forName(Class.java:467) ~[?:?] at org.apache.pulsar.common.util.Reflections.createInstance(Reflections.java:70) ~[org.apache.pulsar-pulsar-common-2.11.0.jar:2.11.0] ... 13 more` We made it absolutely sure that the class is included in the fat jar that we have compiled. config.yaml looks like this: ``` tenant: public namespace: default name: user-transform className: com.bridge.data.TransformUserMessage inputs: - "persistent://public/default/debezium.public.users" customSerdeInputs: "persistent://public/default/debezium.public.users": com.bridge.data.UserMessageSerDe output: "persistent://public/default/debezium.public.users-public" outputSerdeClassName: com.bridge.data.PublicUserSerDe jar: "transform-function-v1.jar" parallelism: 1 retainOrdering: true ``` The mentioned `UserMessageSerDe` class is just a very simple wrapper around Jackson to deserialize UserMessage data classes. ### Modifications I have modified TopicSchema in such a way that it accepts a functions classloader parameter which will be used by `newSchemaInstance()` to resolve this class. This way our function can now start up without any issues. ### Verifying this change - [ ] Make sure that the change passes the CI checks. This change is already covered by existing tests, such as `TopicSchemaTest`. ### Does this pull request potentially affect one of the following parts: <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org