eolivelli opened a new pull request #10211:
URL: https://github.com/apache/pulsar/pull/10211


   ### Motivation
   When you run a Sink<GenericObject> (or Sink<GenericRecord>) you are using 
AutoConsumeSchema and AutoConsumeSchema is a wrapper around the actual schema 
attached to the topic.
   
   In case of a topic with KeyValueSchema you are not able to access the Key 
Schema and the Value Schema because Record#getSchema returns the 
AutoConsumeSchema and not the KeyValueSchema itself.
   
   ### Modifications
   
   - In SinkRecord#getSchema we unwrap the AutoConsumeSchema schema and allow 
to access the actual schema set on the topic.
   - Add new integration tests that simulate real world sinks and possible cases
   
   Two modes for the new integration tests:
   1) setup the sink and then produce messages
   2) setup the schema and then setup the sink
   
   In case 1 the sink finds a topic without schema, and downloads the schema at 
the first message
   In case 2 the sink downs the schema at boot
   
   Cases testes with the integration tests:
   - primitive values
   - keyvalue of primitive values, with INLINE and SEPARATED key encoding
   - keyvalue of structs (KeyValue<AVRO, AVRO>) , with INLINE and SEPARATED key 
encoding
   
   The  test sets a property on the message with the expected schema type, this 
way the sink breaks in case of some error in the system and bad detection of 
the schema.
   
   ### Verifying this change
   
   This change added tests
   
   ### Documentation
   
   I am going to write documentation about how to write Pulsar IO Sinks that 
work with any schema without binding to a specific schema at compile time 
(Sink<GenericObject>)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to