Hello,
I'm working on [this issue](https://issues.apache.org/jira/browse/BEAM-12008)
with Boyuan. She was very helpful in identifying the issue which is that
KafkaRecordCoder couldn't handle the case when key is null.
We came out with two potential solutions. Yet both have its pros and cons so
I'm hoping to gather some suggestions/opinions or ideas of how to handle this
issue. For our solutions:
1. directly wrapping the keyCoder with Nullablecoder i.e.
NullableCoder.of(keyCoder)
cons: backwards compatibility problem
2. writing a completely new class named something like
NullableKeyKafkaRecordCoder
instead of using KVCoder and encode/decode KVs, we have KeyCoder and
ValueCoder as fields and another BooleanCoder to encode/decode T/F for present
of null key. If key is null, KeyCoder will not encode/decode.
- [L63] encode(...){
stringCoder.encode(topic, ...);
intCoder.encode(partition, ...);
longCoder.encode(offset, ...);
longCoder.encode(timestamp, ...);
intCoder.encode(timestamptype, ...);
headerCoder.encode(...)
if(Key!=null){
BooleanCoder.encode(false, ...);
KeyCoder.encode(key, ...);
}else{
BooleanCoder.encode(true, ...);
// skips KeyCoder when key is null
}
ValueCoder.encode(value, ...);
}
- [L74] decode(...){
return new KafkaRecord<>(
stringCoder.decode(inStream),
intCoder.decode(inStream),
longCoder.decode(inStream),
longCoder.decode(inStream),
KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
(Headers)
toHeaders(headerCoder.decode(inStream)),
BooleanCoder.decode(inStream)?
null:KeyCoder.decode(inStream),
ValueCoder.decode(inStream)
);
}
Best regards,
Weiwen