On Thu, Jun 3, 2021 at 2:06 PM Boyuan Zhang <[email protected]> wrote:

> Supporting the x-lang boundary is a good point. So you are suggesting that:
>
>    1. We make NullableCoder as a standard coder.
>    2. KafkaIO wraps the keyCoder with NullabeCoder directly if it
>    requires.
>
> Is that correct?
>

Yeah.


>
>
> On Wed, Jun 2, 2021 at 6:47 PM Chamikara Jayalath <[email protected]>
> wrote:
>
>> I think we should make NullableCoder a standard coder for Beam [1] and
>> use a standard Nullablecoder(KeyCoder) for Kafka keys (where KeyCoder might
>> be the standard ByteArrayCoder for example)
>> I think we have compatible Java and Python NullableCoder implementations
>> already so implementing this should be relatively straightforward.
>>
>> Non-standard coders may not be supported by runners at the cross-language
>> boundary.
>>
>> Thanks,
>> Cham
>>
>> [1]
>> https://github.com/apache/beam/blob/d0c3dd72874fada03cc601f30cde022a8dd6aa9c/model/pipeline/src/main/proto/beam_runner_api.proto#L784
>>
>> On Wed, Jun 2, 2021 at 6:25 PM Ahmet Altay <[email protected]> wrote:
>>
>>> /cc folks who commented on the issue: @Robin Qiu <[email protected]> 
>>> @Chamikara
>>> Jayalath <[email protected]> @Alexey Romanenko
>>> <[email protected]> @Daniel Collins <[email protected]>
>>>
>>> On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu <[email protected]> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm working on [this issue](
>>>> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She was
>>>> very helpful in identifying the issue which is that KafkaRecordCoder
>>>> couldn't handle the case when key is null.
>>>>
>>>> We came out with two potential solutions. Yet both have its pros and
>>>> cons so I'm hoping to gather some suggestions/opinions or ideas of how to
>>>> handle this issue. For our solutions:
>>>>
>>>> 1. directly wrapping the keyCoder with Nullablecoder i.e.
>>>> NullableCoder.of(keyCoder)
>>>>     cons: backwards compatibility problem
>>>>
>>>> 2. writing a completely new class named something like
>>>> NullableKeyKafkaRecordCoder
>>>>     instead of using KVCoder and encode/decode KVs, we have KeyCoder
>>>> and ValueCoder as fields and another BooleanCoder to encode/decode T/F for
>>>> present of null key. If key is null, KeyCoder will not encode/decode.
>>>>
>>>>               - [L63] encode(...){
>>>>                        stringCoder.encode(topic, ...);
>>>>                        intCoder.encode(partition, ...);
>>>>                        longCoder.encode(offset, ...);
>>>>                        longCoder.encode(timestamp, ...);
>>>>                        intCoder.encode(timestamptype, ...);
>>>>                        headerCoder.encode(...)
>>>>                        if(Key!=null){
>>>>                               BooleanCoder.encode(false, ...);
>>>>                               KeyCoder.encode(key, ...);
>>>>                        }else{
>>>>                               BooleanCoder.encode(true, ...);
>>>>                               // skips KeyCoder when key is null
>>>>                        }
>>>>                       ValueCoder.encode(value, ...);
>>>>                 }
>>>>
>>>>               - [L74] decode(...){
>>>>                       return new KafkaRecord<>(
>>>>
>>>> stringCoder.decode(inStream),
>>>>                                             intCoder.decode(inStream),
>>>>                                             longCoder.decode(inStream),
>>>>                                             longCoder.decode(inStream),
>>>>
>>>> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
>>>>                                             (Headers)
>>>> toHeaders(headerCoder.decode(inStream)),
>>>>
>>>> BooleanCoder.decode(inStream)? null:KeyCoder.decode(inStream),
>>>>                                             ValueCoder.decode(inStream)
>>>>                                             );
>>>>                 }
>>>>
>>>> Best regards,
>>>> Weiwen
>>>>
>>>

Reply via email to