eolivelli opened a new pull request #14847:
URL: https://github.com/apache/pulsar/pull/14847


   ### Motivation
   
   Currently a Function cannot access the original Schema of the Message but it 
only receives AutoConsumeSchema that is a special schema that is not suitable 
to Producing messages.
   
   This is an example of Identity Function that picks any message, in spite of 
the Schema and writes it to a output topic.
   
   ```
   @Slf4j
   public class MyFunctionIdentityTransform implements Function<GenericObject, 
Void> {
   
       @Override
       public Void process(GenericObject genericObject, Context context) throws 
Exception {
           Record<?> currentRecord = context.getCurrentRecord();
           log.info("apply to {} {}", genericObject, 
genericObject.getNativeObject());
           log.info("record with schema {} {}", currentRecord.getSchema(), 
currentRecord);
           context.newOutputMessage(context.getOutputTopic(), (Schema) 
currentRecord.getSchema())
                   .value(genericObject.getNativeObject()).send();
           return null;
       }
   }
   ```
   
   This kind of Functions must also work well with KeyValue<GenericRecord, 
GenericRecord> input messages, and preserve the schema properties (like 
KeyValueEncoding.SEPARATED, or the SchemaType of the components).
   
   ### Modifications
   
   Unwrap AutoConsumeSchema in PulsarSource, when we pick a Message from the 
Pulsar topic, and set on the PulsarRecord the wrapped Schema.
   
   ### Verifying this change
   
   I will add tests
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to