eolivelli opened a new pull request #10002:
URL: https://github.com/apache/pulsar/pull/10002
### Motivation
Add support for different key and value Deserializers for the Kafka Source.
With this change the Kafka Connector supports non-String keys and it also
apply the correct Schema to the Pulsar Topic.
For primitive datatypes **we are not decoding the Kafka key pair into Java
Objects**, we are simply passing a reference to the internal ByteBuffer (that
is a wrapper for a byte[]).
The Schema type is decided using the `keyDeserializationClass` and
`valueDeserializationClass` parameters that you pass to the Kafka Source
configuration.
This is the mapping;
* ByteArrayDeserializer,ByteBufferDeserializer,BytesDeserializer:
Schema.BYTEBUFFER
* StringDeserializer.class: Schema.STRING
* DoubleDeserializer: Schema.DOUBLE
* FloatDeserializer: Schema.FLOAT
* IntegerDeserializer: Schema.INT32
* LongDeserializer: Schema.INT64
* ShortDeserializer: Schema.INT16;
* KafkaAvroDeserializer: Schema.AVRO (schema is downloaded from the
SchemaRegistry)
The the key deserializer is StringDeserializer we use the decoded key as
Pulsar key.
The the key is not StringDeserializer then we use the **Pulsar KeyValue data
type**, with SEPARATED key encoding .
This way on the topic we have a Schema for the key and a Schema for the
value.
The key is encoded into the Pulsar key (SEPARATED) and so it is used for
routing and for compaction.
Limits of this patch:
- we are not supporting byte[] keys (this is a limitation of Pulsar IO
Record, that is to be addressed separately)
- we are only supporting SEPARATE KeyValue encoding (adding support for
INLINE is simple, but can be done in a separate change if some user requests
such support)
- there is no support for JSON payloads (this is to be implemented in a
follow up work)
### Modifications
- refactor AbstractKafkaSource in order to
### Verifying this change
The patch introduce unit tests that cover the new code.
### Documentation
- Does this pull request introduce a new feature? yes
- If yes, how is the feature documented? it should be documented with
dedicated docs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]