Considering the problem of populating KafkaRecord metadata(BEAM-12076
<https://issues.apache.org/jira/projects/BEAM/issues/BEAM-12076>) together,
what's the plan there? Are we going to make KafkaRecordCoder as a
well-known coder as well? The reason why I ask is because it might be a
good chance to revisit the KafkaRecordCoder implementation.

On Thu, Jun 3, 2021 at 2:17 PM Chamikara Jayalath <[email protected]>
wrote:

>
>
> On Thu, Jun 3, 2021 at 2:06 PM Boyuan Zhang <[email protected]> wrote:
>
>> Supporting the x-lang boundary is a good point. So you are suggesting
>> that:
>>
>>    1. We make NullableCoder as a standard coder.
>>    2. KafkaIO wraps the keyCoder with NullabeCoder directly if it
>>    requires.
>>
>> Is that correct?
>>
>
> Yeah.
>
>
>>
>>
>> On Wed, Jun 2, 2021 at 6:47 PM Chamikara Jayalath <[email protected]>
>> wrote:
>>
>>> I think we should make NullableCoder a standard coder for Beam [1] and
>>> use a standard Nullablecoder(KeyCoder) for Kafka keys (where KeyCoder might
>>> be the standard ByteArrayCoder for example)
>>> I think we have compatible Java and Python NullableCoder implementations
>>> already so implementing this should be relatively straightforward.
>>>
>>> Non-standard coders may not be supported by runners at the
>>> cross-language boundary.
>>>
>>> Thanks,
>>> Cham
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/d0c3dd72874fada03cc601f30cde022a8dd6aa9c/model/pipeline/src/main/proto/beam_runner_api.proto#L784
>>>
>>> On Wed, Jun 2, 2021 at 6:25 PM Ahmet Altay <[email protected]> wrote:
>>>
>>>> /cc folks who commented on the issue: @Robin Qiu <[email protected]> 
>>>> @Chamikara
>>>> Jayalath <[email protected]> @Alexey Romanenko
>>>> <[email protected]> @Daniel Collins <[email protected]>
>>>>
>>>> On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu <[email protected]> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I'm working on [this issue](
>>>>> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She
>>>>> was very helpful in identifying the issue which is that KafkaRecordCoder
>>>>> couldn't handle the case when key is null.
>>>>>
>>>>> We came out with two potential solutions. Yet both have its pros and
>>>>> cons so I'm hoping to gather some suggestions/opinions or ideas of how to
>>>>> handle this issue. For our solutions:
>>>>>
>>>>> 1. directly wrapping the keyCoder with Nullablecoder i.e.
>>>>> NullableCoder.of(keyCoder)
>>>>>     cons: backwards compatibility problem
>>>>>
>>>>> 2. writing a completely new class named something like
>>>>> NullableKeyKafkaRecordCoder
>>>>>     instead of using KVCoder and encode/decode KVs, we have KeyCoder
>>>>> and ValueCoder as fields and another BooleanCoder to encode/decode T/F for
>>>>> present of null key. If key is null, KeyCoder will not encode/decode.
>>>>>
>>>>>               - [L63] encode(...){
>>>>>                        stringCoder.encode(topic, ...);
>>>>>                        intCoder.encode(partition, ...);
>>>>>                        longCoder.encode(offset, ...);
>>>>>                        longCoder.encode(timestamp, ...);
>>>>>                        intCoder.encode(timestamptype, ...);
>>>>>                        headerCoder.encode(...)
>>>>>                        if(Key!=null){
>>>>>                               BooleanCoder.encode(false, ...);
>>>>>                               KeyCoder.encode(key, ...);
>>>>>                        }else{
>>>>>                               BooleanCoder.encode(true, ...);
>>>>>                               // skips KeyCoder when key is null
>>>>>                        }
>>>>>                       ValueCoder.encode(value, ...);
>>>>>                 }
>>>>>
>>>>>               - [L74] decode(...){
>>>>>                       return new KafkaRecord<>(
>>>>>
>>>>> stringCoder.decode(inStream),
>>>>>                                             intCoder.decode(inStream),
>>>>>                                             longCoder.decode(inStream),
>>>>>                                             longCoder.decode(inStream),
>>>>>
>>>>> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
>>>>>                                             (Headers)
>>>>> toHeaders(headerCoder.decode(inStream)),
>>>>>
>>>>> BooleanCoder.decode(inStream)? null:KeyCoder.decode(inStream),
>>>>>                                             ValueCoder.decode(inStream)
>>>>>                                             );
>>>>>                 }
>>>>>
>>>>> Best regards,
>>>>> Weiwen
>>>>>
>>>>

Reply via email to