eolivelli commented on a change in pull request #9590:
URL: https://github.com/apache/pulsar/pull/9590#discussion_r576752538



##########
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
##########
@@ -400,7 +415,16 @@ public void close() throws Exception {
         ConsumerConfig consumerConfig = new ConsumerConfig();
         
consumerConfig.setSchemaProperties(pulsarSinkConfig.getSchemaProperties());
         if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) {
-            consumerConfig.setSchemaType(pulsarSinkConfig.getSchemaType());
+            if (GenericRecord.class.isAssignableFrom(typeArg)) {

Review comment:
       @sijie initially you pointed out that working here in PulsarSink is not 
the right way, but we should only work on TopicSchema
   https://github.com/apache/pulsar/pull/9481#discussion_r570723440
   
   In fact I believe that in my PR #9481 I took the right way, driven by your 
suggestions.
   
   I believe that this change is not enough in order to support by needs.
    
    BTW if the integration test I added to #9481 works with this patch then we 
can converge to a good  solution.
    My goal is to get that usecase work, in the best way for the project for 
the mid/long term




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to