kasparjarek opened a new issue, #29:
URL: https://github.com/apache/pulsar-connectors/issues/29
# Issue
The Pulsar Kafka Connect adaptor passes the full configuration map to
converters, but converter specific properties, like [JSON
Converter's](https://github.com/a0x8o/kafka/blob/master/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java#L38)
`key.converter.schemas.cache.size` for example, are ignored because the prefix
`key.converter.` is not stripped before configuring converter.
# Expected Behavior (Kafka Connect)
Kafka Connect Worker extracts converter specific config by stripping the
prefix before calling the `configure()` method ([code
link](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java#L468)):
```java
String configPrefix = classPropertyName + "."; // "key.converter." or
"value.converter."
Map<String, Object> converterConfig =
config.originalsWithPrefix(configPrefix); // get configs with the prefix and
ALSO STRIP THE PREFIX
...
plugin.configure(converterConfig, isKeyConverter); // passing striped config
into configure() method
```
Input: `{"key.converter.schemas.cache.size": 100}`
After stripping: `{"schemas.cache.size": 100}`
Result: Converter sees `schemas.cache.size` property and uses it
# Current Behavior (Pulsar Adaptor)
The Pulsar Adaptor just passes the whole config into the `configure()`
method ([code
link](https://github.com/apache/pulsar-connectors/blob/314b9defb6888804e3c3ec75085b015b7b16ae1b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java#L114-L115))
without stripping the prefix.
Input: `{"key.converter.schemas.cache.size": 100}`
No stripping: Full map passed to converter
Result: Converter looks for `schemas.cache.size` property, doesn't find it,
uses default
# Fix
Stripping the prefix can be done quite easily inside the `open()` method
([code
link](https://github.com/apache/pulsar-connectors/blob/314b9defb6888804e3c3ec75085b015b7b16ae1b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java#L86)):
```java
PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new
PulsarKafkaWorkerConfig(stringConfig);
Map<String, Object> keyConverterConfig =
pulsarKafkaWorkerConfig.originalsWithPrefix(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG
+ ".");
Map<String, Object> valueConverterConfig =
pulsarKafkaWorkerConfig.originalsWithPrefix(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG
+ ".");
```
## Implementation questions
I am happy to prepare fix, but I would need clarification on two questions:
### 1. Mock Schema Registry URL
The fix would remove the need for this hardcoded `mock` config ([code
link](https://github.com/apache/pulsar-connectors/blob/314b9defb6888804e3c3ec75085b015b7b16ae1b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java#L108)).
I believe this was added because `schema.registry.url` is required by
AvroConverter and initialization was failing without it. With the fix, this
config would be correctly extracted from the prefixed
`value.converter.schema.registry.url` property (standard Kafka Connect
configuration).
However, removing this hardcoded value is a breaking change. Users relying
on the current behavior would need to explicitly configure
`value.converter.schema.registry.url` when using AvroConverter.
**Question:** Is this breaking change acceptable? If not, we can keep the
configuration overwrite to be applied to the stripped config map, even though
it's useless after the fix.
### 2. `json-with-envelope` Configuration Behavior
The fix would also change how `json-with-envelope` works ([code
link](https://github.com/apache/pulsar-connectors/blob/314b9defb6888804e3c3ec75085b015b7b16ae1b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java#L58)).
**Current behavior:** Sets top level `schemas.enable = true/false`, which
(due to the bug) was the only way to control schema behavior for JSON
converters since prefixed properties were ignored.
**After fix:** Prefixed properties like `key.converter.schemas.enable` would
be use instead of the top level override and thus introducing breaking change.
I don't fully understand the original motivation behind this override. But
if the motivation was to introduce some level of control over the
`schemas.enable` property, than it is not needed anymore after this fix. Users
could control schema behavior via standard `key.converter.schemas.enable` and
`value.converter.schemas.enable` properties. **Question 2A:** Would it be
acceptable to stop overriding `schemas.enable` in this fix?
**Follow up suggestion:** If we stop overriding converter configuration,
`json-with-envelope` would simply control whether the output record uses
Schema.BYTES (schemaless) or preserves record's schema. I actually think this
is a useful capability and would appreciate if it could be configured more
granularly for key and value separately.
For example, introducing:
- `key-without-schema` (boolean) - force key to Schema.BYTES regardless of
converter
- `value-without-schema` (boolean) - force value to Schema.BYTES regardless
of converter
These properties would work with any converter (not just JSON) and provide
finer control over the Pulsar output schema.
**Question 2B:** Would it be acceptable to introduce these new properties
and mark `json-with-envelope` as deprecated in this fix?
---
Because of the breaking changes, it may be best to include this fix in a new
major version release. This would avoid the confusing behavior that could
result from mixing the legacy `schemas.enable` override with the corrected
prefix based converter configuration. What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]