vroyer edited a comment on issue #9899:
URL: https://github.com/apache/pulsar/issues/9899#issuecomment-797570247
I think Record, Sink and Source should be concret class of
KeyValue<Object,Object> , because we want generic connectors working whatever
is the schema or encoding. So we would have Record like this:
```
/**
* Return the SEPARATED key if the message has one associated (stored in
metdata properties).
*/
default Optional<String> getKey() {
return Optional.empty();
}
default Optional<byte[]> getKeyBytes() {
return getKey().map(s -> Base64.getDecoder().decode(s));
}
default KeyValueSchema<Object,Object> getSchema() {
return null;
}
/**
* Retrieves the actual data of the record.
*
* @return The record data
*/
KeyValue<Object, Object> getValue();
default public Optional<Object> getRecordKey() {
if (getSchema() != null) {
switch(getSchema().getKeyValueEncodingType()) {
case SEPARATED:
Schema<Object> objectSchema = getSchema().getKeySchema();
return (objectSchema != null)
?
Optional.of(objectSchema.decode(getKeyBytes().get()))
: Optional.of(getKey().get());
case INLINE:
return Optional.of(getValue().getKey());
}
}
return Optional.of(getValue().getKey());
}
default public Object getRecordValue() {
return getValue().getValue();
}
```
The sink connectors would use getRecordKey() and getRecordValue() to get the
concret key and value object, and they can query the schema to process it
properly, and whatever the key is INLINE or SEPARATED, like with Kafka connect
:-)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]