eolivelli opened a new pull request #10002:
URL: https://github.com/apache/pulsar/pull/10002


   ### Motivation
   Add support for different key and value Deserializers for the Kafka Source.
   
   With this change the Kafka Connector supports non-String keys and it also 
apply the correct Schema to the Pulsar Topic.
   
   For primitive datatypes **we are not decoding the Kafka key pair into Java 
Objects**, we are simply passing a reference to the internal ByteBuffer (that 
is a wrapper for a byte[]).
   
   The Schema type is decided using the `keyDeserializationClass` and 
`valueDeserializationClass` parameters that you pass to the Kafka Source 
configuration.
   
   This is the mapping;
   * ByteArrayDeserializer,ByteBufferDeserializer,BytesDeserializer:  
Schema.BYTEBUFFER
   * StringDeserializer.class: Schema.STRING
   * DoubleDeserializer:  Schema.DOUBLE
   * FloatDeserializer: Schema.FLOAT
   * IntegerDeserializer: Schema.INT32
   * LongDeserializer: Schema.INT64
   * ShortDeserializer: Schema.INT16;
    * KafkaAvroDeserializer: Schema.AVRO (schema is downloaded from the 
SchemaRegistry)
   
   The the key deserializer is StringDeserializer we use the decoded key as 
Pulsar key.
   The the key is not StringDeserializer then we use the **Pulsar KeyValue data 
type**, with SEPARATED key encoding .
   
   This way on the topic we have a Schema for the key and a Schema for the 
value.
   The key is encoded into the Pulsar key (SEPARATED) and so it is used for 
routing and for compaction.
   
   Limits of this patch:
   - we are not supporting byte[] keys (this is a limitation of Pulsar IO 
Record, that is to be addressed separately)
   - we are only supporting SEPARATE KeyValue encoding (adding support for 
INLINE is simple, but can be done in a separate change if some user requests 
such support) 
   - there is no support for JSON payloads (this is to be implemented in a 
follow up work)
   
   ### Modifications
   - refactor AbstractKafkaSource  in order to 
   
   
   ### Verifying this change
   
   The patch introduce unit tests that cover the new code.
   
   ### Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? it should be documented with 
dedicated docs
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to