/cc folks who commented on the issue: @Robin Qiu <[email protected]> @Chamikara Jayalath <[email protected]> @Alexey Romanenko <[email protected]> @Daniel Collins <[email protected]>
On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu <[email protected]> wrote: > Hello, > > I'm working on [this issue]( > https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She was > very helpful in identifying the issue which is that KafkaRecordCoder > couldn't handle the case when key is null. > > We came out with two potential solutions. Yet both have its pros and cons > so I'm hoping to gather some suggestions/opinions or ideas of how to handle > this issue. For our solutions: > > 1. directly wrapping the keyCoder with Nullablecoder i.e. > NullableCoder.of(keyCoder) > cons: backwards compatibility problem > > 2. writing a completely new class named something like > NullableKeyKafkaRecordCoder > instead of using KVCoder and encode/decode KVs, we have KeyCoder and > ValueCoder as fields and another BooleanCoder to encode/decode T/F for > present of null key. If key is null, KeyCoder will not encode/decode. > > - [L63] encode(...){ > stringCoder.encode(topic, ...); > intCoder.encode(partition, ...); > longCoder.encode(offset, ...); > longCoder.encode(timestamp, ...); > intCoder.encode(timestamptype, ...); > headerCoder.encode(...) > if(Key!=null){ > BooleanCoder.encode(false, ...); > KeyCoder.encode(key, ...); > }else{ > BooleanCoder.encode(true, ...); > // skips KeyCoder when key is null > } > ValueCoder.encode(value, ...); > } > > - [L74] decode(...){ > return new KafkaRecord<>( > stringCoder.decode(inStream), > intCoder.decode(inStream), > longCoder.decode(inStream), > longCoder.decode(inStream), > > KafkaTimestampType.forOrdinal(intCoder.decode(inStream)), > (Headers) > toHeaders(headerCoder.decode(inStream)), > BooleanCoder.decode(inStream)? > null:KeyCoder.decode(inStream), > ValueCoder.decode(inStream) > ); > } > > Best regards, > Weiwen >
