shibd commented on code in PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#discussion_r931138592


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -384,7 +386,12 @@ private void sendOutputMessage(Record srcRecord, Object 
output) throws Exception
         try {
             this.sink.write(sinkRecord);
         } catch (Exception e) {
-            log.info("Encountered exception in sink write: ", e);
+            if (e instanceof ClassCastException && functionClassLoader != 
componentClassLoader) {

Review Comment:
   There is also a question about the type of input for user-defined functions. 
 In the current implementation, the consumer in the sink sets the schema 
according to the generic definition of the sink. usually, we use 
`GenericObject`, If the user-defined function does not match the generic, the 
current logic will go to the following logic instead of being sent to the sink, 
is this the expected behavior?
   
   
   
https://github.com/apache/pulsar/blob/5df15dd2edd7eeab309fea35828915c8698ea339/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L354-L360
   
   Maybe we need to validate the input parameter types of the user-defined 
function at the sink start? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to