Hello,

I'm working on [this issue](https://issues.apache.org/jira/browse/BEAM-12008) 
with Boyuan. She was very helpful in identifying the issue which is that 
KafkaRecordCoder couldn't handle the case when key is null.

We came out with two potential solutions. Yet both have its pros and cons so 
I'm hoping to gather some suggestions/opinions or ideas of how to handle this 
issue. For our solutions:

1. directly wrapping the keyCoder with Nullablecoder i.e. 
NullableCoder.of(keyCoder)
    cons: backwards compatibility problem

2. writing a completely new class named something like 
NullableKeyKafkaRecordCoder
    instead of using KVCoder and encode/decode KVs, we have KeyCoder and 
ValueCoder as fields and another BooleanCoder to encode/decode T/F for present 
of null key. If key is null, KeyCoder will not encode/decode.
   
              - [L63] encode(...){
                       stringCoder.encode(topic, ...);
                       intCoder.encode(partition, ...);
                       longCoder.encode(offset, ...);
                       longCoder.encode(timestamp, ...);
                       intCoder.encode(timestamptype, ...);
                       headerCoder.encode(...)
                       if(Key!=null){
                              BooleanCoder.encode(false, ...);
                              KeyCoder.encode(key, ...);
                       }else{
                              BooleanCoder.encode(true, ...);
                              // skips KeyCoder when key is null
                       }
                      ValueCoder.encode(value, ...);
                }
    
              - [L74] decode(...){
                      return new KafkaRecord<>(
                                            stringCoder.decode(inStream),
                                            intCoder.decode(inStream),
                                            longCoder.decode(inStream),
                                            longCoder.decode(inStream),
                                            
KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
                                            (Headers) 
toHeaders(headerCoder.decode(inStream)),
                                            BooleanCoder.decode(inStream)? 
null:KeyCoder.decode(inStream),
                                            ValueCoder.decode(inStream)
                                            );
                }

Best regards,
Weiwen

Reply via email to