gmiklos-ltg opened a new pull request, #20115:
URL: https://github.com/apache/pulsar/pull/20115

   Fixes #5350 
   
   ### Motivation
   
   While evaluating Pulsar Functions for our data transformation use-cases we 
ran into the following startup issue with our function that was using a custom 
SerDe class:
   `2023-04-12T11:30:35,368+0200 [public/default/user-transform-new-2-0] ERROR 
org.apache.pulsar.functions.instance.JavaInstanceRunnable - 
[public/default/user-transform-new-2:0] Uncaught exception in Java Instance
   java.lang.RuntimeException: User class must be in class path
        at 
org.apache.pulsar.common.util.Reflections.createInstance(Reflections.java:72) 
~[org.apache.pulsar-pulsar-common-2.11.0.jar:2.11.0]
        at 
org.apache.pulsar.functions.instance.InstanceUtils.createInstance(InstanceUtils.java:92)
 ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
        at 
org.apache.pulsar.functions.instance.InstanceUtils.initializeSerDe(InstanceUtils.java:51)
 ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
        at 
org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:238)
 ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
        at 
org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:246)
 ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
        at 
org.apache.pulsar.functions.source.TopicSchema.lambda$getSchema$0(TopicSchema.java:68)
 ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
        at java.util.HashMap.computeIfAbsent(HashMap.java:1220) ~[?:?]
        at 
org.apache.pulsar.functions.source.TopicSchema.getSchema(TopicSchema.java:68) 
~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
        at 
org.apache.pulsar.functions.source.PulsarSource.buildPulsarSourceConsumerConfig(PulsarSource.java:176)
 ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
        at 
org.apache.pulsar.functions.source.SingleConsumerPulsarSource.open(SingleConsumerPulsarSource.java:69)
 ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupInput(JavaInstanceRunnable.java:774)
 ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.setup(JavaInstanceRunnable.java:226)
 ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:259)
 ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
        at java.lang.Thread.run(Thread.java:833) ~[?:?]
   Caused by: java.lang.ClassNotFoundException: com.bridge.data.UserMessageSerDe
        at java.net.URLClassLoader.findClass(URLClassLoader.java:445) ~[?:?]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:587) ~[?:?]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:520) ~[?:?]
        at java.lang.Class.forName0(Native Method) ~[?:?]
        at java.lang.Class.forName(Class.java:467) ~[?:?]
        at 
org.apache.pulsar.common.util.Reflections.createInstance(Reflections.java:70) 
~[org.apache.pulsar-pulsar-common-2.11.0.jar:2.11.0]
        ... 13 more`
   
   We made it absolutely sure that the class is included in the fat jar that we 
have compiled.
   
   config.yaml looks like this:
   ```
   tenant: public
   namespace: default
   name: user-transform
   className: com.bridge.data.TransformUserMessage
   inputs:
     - "persistent://public/default/debezium.public.users"
   customSerdeInputs:
     "persistent://public/default/debezium.public.users": 
com.bridge.data.UserMessageSerDe
   output: "persistent://public/default/debezium.public.users-public"
   outputSerdeClassName: com.bridge.data.PublicUserSerDe
   jar: "transform-function-v1.jar"
   parallelism: 1
   retainOrdering: true
   ```
   
   The mentioned `UserMessageSerDe` class is just a very simple wrapper around 
Jackson to deserialize UserMessage data classes.
   
   ### Modifications
   
   I have modified TopicSchema in such a way that it accepts a functions 
classloader parameter which will be used by `newSchemaInstance()` to resolve 
this class. This way our function can now start up without any issues.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change is already covered by existing tests, such as `TopicSchemaTest`.
   
   ### Does this pull request potentially affect one of the following parts:
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update 
later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to