kasparjarek opened a new issue, #29:
URL: https://github.com/apache/pulsar-connectors/issues/29

    # Issue
   
   The Pulsar Kafka Connect adaptor passes the full configuration map to 
converters, but converter specific properties, like [JSON 
Converter's](https://github.com/a0x8o/kafka/blob/master/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java#L38)
 `key.converter.schemas.cache.size` for example, are ignored because the prefix 
`key.converter.` is not stripped before configuring converter.
   
   # Expected Behavior (Kafka Connect)
   
   Kafka Connect Worker extracts converter specific config by stripping the 
prefix before calling the `configure()` method ([code 
link](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java#L468)):
   
   ```java
   String configPrefix = classPropertyName + ".";  // "key.converter." or 
"value.converter."
   Map<String, Object> converterConfig = 
config.originalsWithPrefix(configPrefix); // get configs with the prefix and 
ALSO STRIP THE PREFIX
   ...
   plugin.configure(converterConfig, isKeyConverter); // passing striped config 
into configure() method
   ```
   
   Input: `{"key.converter.schemas.cache.size": 100}`
   After stripping: `{"schemas.cache.size": 100}`
   Result: Converter sees `schemas.cache.size` property and uses it
   
   # Current Behavior (Pulsar Adaptor)
   
   The Pulsar Adaptor just passes the whole config into the `configure()` 
method ([code 
link](https://github.com/apache/pulsar-connectors/blob/314b9defb6888804e3c3ec75085b015b7b16ae1b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java#L114-L115))
 without stripping the prefix.
   
   Input: `{"key.converter.schemas.cache.size": 100}`
   No stripping: Full map passed to converter
   Result: Converter looks for `schemas.cache.size` property, doesn't find it, 
uses default
   
   # Fix
   
   Stripping the prefix can be done quite easily inside the `open()` method 
([code 
link](https://github.com/apache/pulsar-connectors/blob/314b9defb6888804e3c3ec75085b015b7b16ae1b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java#L86)):
   
   ```java
   PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new 
PulsarKafkaWorkerConfig(stringConfig);
   
   Map<String, Object> keyConverterConfig = 
pulsarKafkaWorkerConfig.originalsWithPrefix(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG
 + ".");
   Map<String, Object> valueConverterConfig = 
pulsarKafkaWorkerConfig.originalsWithPrefix(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG
 + ".");
   ```
   
   ## Implementation questions
   
   I am happy to prepare fix, but I would need clarification on two questions:
   
   ### 1. Mock Schema Registry URL
   
   The fix would remove the need for this hardcoded `mock` config ([code 
link](https://github.com/apache/pulsar-connectors/blob/314b9defb6888804e3c3ec75085b015b7b16ae1b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java#L108)).
   
   I believe this was added because `schema.registry.url` is required by 
AvroConverter and initialization was failing without it. With the fix, this 
config would be correctly extracted from the prefixed 
`value.converter.schema.registry.url` property (standard Kafka Connect 
configuration).
   
   However, removing this hardcoded value is a breaking change. Users relying 
on the current behavior would need to explicitly configure 
`value.converter.schema.registry.url` when using AvroConverter.
   
   **Question:** Is this breaking change acceptable? If not, we can keep the 
configuration overwrite to be applied to the stripped config map, even though 
it's useless after the fix.
   
   ### 2. `json-with-envelope` Configuration Behavior
   
   The fix would also change how `json-with-envelope` works ([code 
link](https://github.com/apache/pulsar-connectors/blob/314b9defb6888804e3c3ec75085b015b7b16ae1b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java#L58)).
   
   **Current behavior:** Sets top level `schemas.enable = true/false`, which 
(due to the bug) was the only way to control schema behavior for JSON 
converters since prefixed properties were ignored.
   
   **After fix:** Prefixed properties like `key.converter.schemas.enable` would 
be use instead of the top level override and thus introducing breaking change.
   
   I don't fully understand the original motivation behind this override. But 
if the motivation was to introduce some level of control over the 
`schemas.enable` property, than it is not needed anymore after this fix. Users 
could control schema behavior via standard `key.converter.schemas.enable` and 
`value.converter.schemas.enable` properties. **Question 2A:** Would it be 
acceptable to stop overriding `schemas.enable` in this fix?
   
   **Follow up suggestion:** If we stop overriding converter configuration, 
`json-with-envelope` would simply control whether the output record uses 
Schema.BYTES (schemaless) or preserves record's schema. I actually think this 
is a useful capability and would appreciate if it could be configured more 
granularly for key and value separately.
   
   For example, introducing:
   - `key-without-schema` (boolean) - force key to Schema.BYTES regardless of 
converter
   - `value-without-schema` (boolean) - force value to Schema.BYTES regardless 
of converter
   
   These properties would work with any converter (not just JSON) and provide 
finer control over the Pulsar output schema.
   
   **Question 2B:** Would it be acceptable to introduce these new properties 
and mark `json-with-envelope` as deprecated in this fix?
   
   ---
   Because of the breaking changes, it may be best to include this fix in a new 
major version release. This would avoid the confusing behavior that could 
result from mixing the legacy `schemas.enable` override with the corrected 
prefix based converter configuration. What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to