eolivelli opened a new pull request #10211: URL: https://github.com/apache/pulsar/pull/10211
### Motivation When you run a Sink<GenericObject> (or Sink<GenericRecord>) you are using AutoConsumeSchema and AutoConsumeSchema is a wrapper around the actual schema attached to the topic. In case of a topic with KeyValueSchema you are not able to access the Key Schema and the Value Schema because Record#getSchema returns the AutoConsumeSchema and not the KeyValueSchema itself. ### Modifications - In SinkRecord#getSchema we unwrap the AutoConsumeSchema schema and allow to access the actual schema set on the topic. - Add new integration tests that simulate real world sinks and possible cases Two modes for the new integration tests: 1) setup the sink and then produce messages 2) setup the schema and then setup the sink In case 1 the sink finds a topic without schema, and downloads the schema at the first message In case 2 the sink downs the schema at boot Cases testes with the integration tests: - primitive values - keyvalue of primitive values, with INLINE and SEPARATED key encoding - keyvalue of structs (KeyValue<AVRO, AVRO>) , with INLINE and SEPARATED key encoding The test sets a property on the message with the expected schema type, this way the sink breaks in case of some error in the system and bad detection of the schema. ### Verifying this change This change added tests ### Documentation I am going to write documentation about how to write Pulsar IO Sinks that work with any schema without binding to a specific schema at compile time (Sink<GenericObject>) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
