/cc folks who commented on the issue: @Robin Qiu <[email protected]>
@Chamikara
Jayalath <[email protected]> @Alexey Romanenko <[email protected]>
 @Daniel Collins <[email protected]>

On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu <[email protected]> wrote:

> Hello,
>
> I'm working on [this issue](
> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She was
> very helpful in identifying the issue which is that KafkaRecordCoder
> couldn't handle the case when key is null.
>
> We came out with two potential solutions. Yet both have its pros and cons
> so I'm hoping to gather some suggestions/opinions or ideas of how to handle
> this issue. For our solutions:
>
> 1. directly wrapping the keyCoder with Nullablecoder i.e.
> NullableCoder.of(keyCoder)
>     cons: backwards compatibility problem
>
> 2. writing a completely new class named something like
> NullableKeyKafkaRecordCoder
>     instead of using KVCoder and encode/decode KVs, we have KeyCoder and
> ValueCoder as fields and another BooleanCoder to encode/decode T/F for
> present of null key. If key is null, KeyCoder will not encode/decode.
>
>               - [L63] encode(...){
>                        stringCoder.encode(topic, ...);
>                        intCoder.encode(partition, ...);
>                        longCoder.encode(offset, ...);
>                        longCoder.encode(timestamp, ...);
>                        intCoder.encode(timestamptype, ...);
>                        headerCoder.encode(...)
>                        if(Key!=null){
>                               BooleanCoder.encode(false, ...);
>                               KeyCoder.encode(key, ...);
>                        }else{
>                               BooleanCoder.encode(true, ...);
>                               // skips KeyCoder when key is null
>                        }
>                       ValueCoder.encode(value, ...);
>                 }
>
>               - [L74] decode(...){
>                       return new KafkaRecord<>(
>                                             stringCoder.decode(inStream),
>                                             intCoder.decode(inStream),
>                                             longCoder.decode(inStream),
>                                             longCoder.decode(inStream),
>
> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
>                                             (Headers)
> toHeaders(headerCoder.decode(inStream)),
>                                             BooleanCoder.decode(inStream)?
> null:KeyCoder.decode(inStream),
>                                             ValueCoder.decode(inStream)
>                                             );
>                 }
>
> Best regards,
> Weiwen
>

Reply via email to