kasparjarek opened a new issue, #30:
URL: https://github.com/apache/pulsar-connectors/issues/30

   # Issue
   
   Pulsar Kafka Connect adaptor uses the same converters for both data and 
offset storage. When using `AvroConverter` the adaptor uses 
`MockSchemaRegistryClient`:
   - Schema IDs stored in memory mock only
   - After restart, offset deserialization fails with "Error retrieving Avro 
value schema for id 1 ... Subject Not Found; error code: 40401"
   - Connector loses offset tracking functionality
   
   Why is this not an issue for data messages using Avro with mock Schema 
Registry? For the data topic the schema is published with property 
`__AVRO_READ_OFFSET__` instructing clients to trim first X bytes ([code 
link](https://github.com/apache/pulsar-connectors/blob/314b9defb6888804e3c3ec75085b015b7b16ae1b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java#L44)).
 But the client for offsets is hardcoded to use Schema.BYTES, so no such schema 
is being published to offset topic ([code 
link](https://github.com/apache/pulsar-connectors/blob/314b9defb6888804e3c3ec75085b015b7b16ae1b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java#L157)).
   
   # How Kafka Connect handles offset converters
   
   Kafka Connect used to have specific configurations to setup converters for 
offsets called `internal.key.converter` and `internal.value.converter`. Those 
were removed and replaced with hard coded JSON converters with `schema.enable` 
set to `false`.
   
   Here is quote from change log to version 2.0.0 ([doc 
link](https://kafka.apache.org/30/getting-started/upgrade/#notable-changes-in-200)):
   
   > In earlier releases, Connect’s worker configuration required the 
internal.key.converter and internal.value.converter properties. In 2.0, these 
are [no longer required](https://cwiki.apache.org/confluence/x/AZQ7B) and 
default to the JSON converter. You may safely remove these properties from your 
Connect standalone and distributed worker configurations:
   internal.key.converter=org.apache.kafka.connect.json.JsonConverter 
internal.key.converter.schemas.enable=false 
internal.value.converter=org.apache.kafka.connect.json.JsonConverter 
internal.value.converter.schemas.enable=false
   
   # Fix
   
   The adaptor should create separate converters for offsets instead of reusing 
the ones used for data. The same way as Kafka Connect does it.
   
   **Backward compatible fix** - For older version of pulsar connectors, to 
keep the behavior backward compatible, I would propose adding a new optional 
configuration properties. When set, new converters for offsets will be created 
allowing usage of `JsonConverter` for offsets (no schema registry dependency) 
while keeping `AvroConverter` for data.
   - `offset.storage.topic.key.converter`
   - `offset.storage.topic.value.converter`
   
   **For new major version**, I would propose to create a new converters 
hardcoded as JSON with `schema.enabled=false` to be compatible with the Kafka 
Connect logic.
   
   I am happy to prepare a fix. Should I create PR for the "major version" fix 
in this repo and backward compatible fix in the 
https://github.com/apache/pulsar repo?
   
   ---
   
   Related issue https://github.com/apache/pulsar-connectors/issues/29
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to