Supporting the x-lang boundary is a good point. So you are suggesting that:

   1. We make NullableCoder as a standard coder.
   2. KafkaIO wraps the keyCoder with NullabeCoder directly if it requires.

Is that correct?


On Wed, Jun 2, 2021 at 6:47 PM Chamikara Jayalath <[email protected]>
wrote:

> I think we should make NullableCoder a standard coder for Beam [1] and use
> a standard Nullablecoder(KeyCoder) for Kafka keys (where KeyCoder might be
> the standard ByteArrayCoder for example)
> I think we have compatible Java and Python NullableCoder implementations
> already so implementing this should be relatively straightforward.
>
> Non-standard coders may not be supported by runners at the cross-language
> boundary.
>
> Thanks,
> Cham
>
> [1]
> https://github.com/apache/beam/blob/d0c3dd72874fada03cc601f30cde022a8dd6aa9c/model/pipeline/src/main/proto/beam_runner_api.proto#L784
>
> On Wed, Jun 2, 2021 at 6:25 PM Ahmet Altay <[email protected]> wrote:
>
>> /cc folks who commented on the issue: @Robin Qiu <[email protected]> 
>> @Chamikara
>> Jayalath <[email protected]> @Alexey Romanenko
>> <[email protected]> @Daniel Collins <[email protected]>
>>
>> On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu <[email protected]> wrote:
>>
>>> Hello,
>>>
>>> I'm working on [this issue](
>>> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She was
>>> very helpful in identifying the issue which is that KafkaRecordCoder
>>> couldn't handle the case when key is null.
>>>
>>> We came out with two potential solutions. Yet both have its pros and
>>> cons so I'm hoping to gather some suggestions/opinions or ideas of how to
>>> handle this issue. For our solutions:
>>>
>>> 1. directly wrapping the keyCoder with Nullablecoder i.e.
>>> NullableCoder.of(keyCoder)
>>>     cons: backwards compatibility problem
>>>
>>> 2. writing a completely new class named something like
>>> NullableKeyKafkaRecordCoder
>>>     instead of using KVCoder and encode/decode KVs, we have KeyCoder and
>>> ValueCoder as fields and another BooleanCoder to encode/decode T/F for
>>> present of null key. If key is null, KeyCoder will not encode/decode.
>>>
>>>               - [L63] encode(...){
>>>                        stringCoder.encode(topic, ...);
>>>                        intCoder.encode(partition, ...);
>>>                        longCoder.encode(offset, ...);
>>>                        longCoder.encode(timestamp, ...);
>>>                        intCoder.encode(timestamptype, ...);
>>>                        headerCoder.encode(...)
>>>                        if(Key!=null){
>>>                               BooleanCoder.encode(false, ...);
>>>                               KeyCoder.encode(key, ...);
>>>                        }else{
>>>                               BooleanCoder.encode(true, ...);
>>>                               // skips KeyCoder when key is null
>>>                        }
>>>                       ValueCoder.encode(value, ...);
>>>                 }
>>>
>>>               - [L74] decode(...){
>>>                       return new KafkaRecord<>(
>>>                                             stringCoder.decode(inStream),
>>>                                             intCoder.decode(inStream),
>>>                                             longCoder.decode(inStream),
>>>                                             longCoder.decode(inStream),
>>>
>>> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
>>>                                             (Headers)
>>> toHeaders(headerCoder.decode(inStream)),
>>>
>>> BooleanCoder.decode(inStream)? null:KeyCoder.decode(inStream),
>>>                                             ValueCoder.decode(inStream)
>>>                                             );
>>>                 }
>>>
>>> Best regards,
>>> Weiwen
>>>
>>

Reply via email to