I think we should make NullableCoder a standard coder for Beam [1] and use
a standard Nullablecoder(KeyCoder) for Kafka keys (where KeyCoder might be
the standard ByteArrayCoder for example)
I think we have compatible Java and Python NullableCoder implementations
already so implementing this should be relatively straightforward.

Non-standard coders may not be supported by runners at the cross-language
boundary.

Thanks,
Cham

[1]
https://github.com/apache/beam/blob/d0c3dd72874fada03cc601f30cde022a8dd6aa9c/model/pipeline/src/main/proto/beam_runner_api.proto#L784

On Wed, Jun 2, 2021 at 6:25 PM Ahmet Altay <[email protected]> wrote:

> /cc folks who commented on the issue: @Robin Qiu <[email protected]> 
> @Chamikara
> Jayalath <[email protected]> @Alexey Romanenko
> <[email protected]> @Daniel Collins <[email protected]>
>
> On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu <[email protected]> wrote:
>
>> Hello,
>>
>> I'm working on [this issue](
>> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She was
>> very helpful in identifying the issue which is that KafkaRecordCoder
>> couldn't handle the case when key is null.
>>
>> We came out with two potential solutions. Yet both have its pros and cons
>> so I'm hoping to gather some suggestions/opinions or ideas of how to handle
>> this issue. For our solutions:
>>
>> 1. directly wrapping the keyCoder with Nullablecoder i.e.
>> NullableCoder.of(keyCoder)
>>     cons: backwards compatibility problem
>>
>> 2. writing a completely new class named something like
>> NullableKeyKafkaRecordCoder
>>     instead of using KVCoder and encode/decode KVs, we have KeyCoder and
>> ValueCoder as fields and another BooleanCoder to encode/decode T/F for
>> present of null key. If key is null, KeyCoder will not encode/decode.
>>
>>               - [L63] encode(...){
>>                        stringCoder.encode(topic, ...);
>>                        intCoder.encode(partition, ...);
>>                        longCoder.encode(offset, ...);
>>                        longCoder.encode(timestamp, ...);
>>                        intCoder.encode(timestamptype, ...);
>>                        headerCoder.encode(...)
>>                        if(Key!=null){
>>                               BooleanCoder.encode(false, ...);
>>                               KeyCoder.encode(key, ...);
>>                        }else{
>>                               BooleanCoder.encode(true, ...);
>>                               // skips KeyCoder when key is null
>>                        }
>>                       ValueCoder.encode(value, ...);
>>                 }
>>
>>               - [L74] decode(...){
>>                       return new KafkaRecord<>(
>>                                             stringCoder.decode(inStream),
>>                                             intCoder.decode(inStream),
>>                                             longCoder.decode(inStream),
>>                                             longCoder.decode(inStream),
>>
>> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
>>                                             (Headers)
>> toHeaders(headerCoder.decode(inStream)),
>>
>> BooleanCoder.decode(inStream)? null:KeyCoder.decode(inStream),
>>                                             ValueCoder.decode(inStream)
>>                                             );
>>                 }
>>
>> Best regards,
>> Weiwen
>>
>

Reply via email to