I think for that and for any transform that produces a PCollection of a
type that is not represented by a standard coder, we would have to add a
cross-language builder class that returns a PCollection that can be
supported at the cross-language boundary. For example, it can be a
PCollection<Row> since RowCoder is already a standard coder. I haven't
looked closely into fixing this particular Jira though.

Thanks,
Cham

On Thu, Jun 3, 2021 at 2:31 PM Boyuan Zhang <[email protected]> wrote:

> Considering the problem of populating KafkaRecord metadata(BEAM-12076
> <https://issues.apache.org/jira/projects/BEAM/issues/BEAM-12076>)
> together, what's the plan there? Are we going to make KafkaRecordCoder as a
> well-known coder as well? The reason why I ask is because it might be a
> good chance to revisit the KafkaRecordCoder implementation.
>
> On Thu, Jun 3, 2021 at 2:17 PM Chamikara Jayalath <[email protected]>
> wrote:
>
>>
>>
>> On Thu, Jun 3, 2021 at 2:06 PM Boyuan Zhang <[email protected]> wrote:
>>
>>> Supporting the x-lang boundary is a good point. So you are suggesting
>>> that:
>>>
>>>    1. We make NullableCoder as a standard coder.
>>>    2. KafkaIO wraps the keyCoder with NullabeCoder directly if it
>>>    requires.
>>>
>>> Is that correct?
>>>
>>
>> Yeah.
>>
>>
>>>
>>>
>>> On Wed, Jun 2, 2021 at 6:47 PM Chamikara Jayalath <[email protected]>
>>> wrote:
>>>
>>>> I think we should make NullableCoder a standard coder for Beam [1] and
>>>> use a standard Nullablecoder(KeyCoder) for Kafka keys (where KeyCoder might
>>>> be the standard ByteArrayCoder for example)
>>>> I think we have compatible Java and Python NullableCoder
>>>> implementations already so implementing this should be relatively
>>>> straightforward.
>>>>
>>>> Non-standard coders may not be supported by runners at the
>>>> cross-language boundary.
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>> [1]
>>>> https://github.com/apache/beam/blob/d0c3dd72874fada03cc601f30cde022a8dd6aa9c/model/pipeline/src/main/proto/beam_runner_api.proto#L784
>>>>
>>>> On Wed, Jun 2, 2021 at 6:25 PM Ahmet Altay <[email protected]> wrote:
>>>>
>>>>> /cc folks who commented on the issue: @Robin Qiu <[email protected]> 
>>>>> @Chamikara
>>>>> Jayalath <[email protected]> @Alexey Romanenko
>>>>> <[email protected]> @Daniel Collins <[email protected]>
>>>>>
>>>>> On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu <[email protected]> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I'm working on [this issue](
>>>>>> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She
>>>>>> was very helpful in identifying the issue which is that KafkaRecordCoder
>>>>>> couldn't handle the case when key is null.
>>>>>>
>>>>>> We came out with two potential solutions. Yet both have its pros and
>>>>>> cons so I'm hoping to gather some suggestions/opinions or ideas of how to
>>>>>> handle this issue. For our solutions:
>>>>>>
>>>>>> 1. directly wrapping the keyCoder with Nullablecoder i.e.
>>>>>> NullableCoder.of(keyCoder)
>>>>>>     cons: backwards compatibility problem
>>>>>>
>>>>>> 2. writing a completely new class named something like
>>>>>> NullableKeyKafkaRecordCoder
>>>>>>     instead of using KVCoder and encode/decode KVs, we have KeyCoder
>>>>>> and ValueCoder as fields and another BooleanCoder to encode/decode T/F 
>>>>>> for
>>>>>> present of null key. If key is null, KeyCoder will not encode/decode.
>>>>>>
>>>>>>               - [L63] encode(...){
>>>>>>                        stringCoder.encode(topic, ...);
>>>>>>                        intCoder.encode(partition, ...);
>>>>>>                        longCoder.encode(offset, ...);
>>>>>>                        longCoder.encode(timestamp, ...);
>>>>>>                        intCoder.encode(timestamptype, ...);
>>>>>>                        headerCoder.encode(...)
>>>>>>                        if(Key!=null){
>>>>>>                               BooleanCoder.encode(false, ...);
>>>>>>                               KeyCoder.encode(key, ...);
>>>>>>                        }else{
>>>>>>                               BooleanCoder.encode(true, ...);
>>>>>>                               // skips KeyCoder when key is null
>>>>>>                        }
>>>>>>                       ValueCoder.encode(value, ...);
>>>>>>                 }
>>>>>>
>>>>>>               - [L74] decode(...){
>>>>>>                       return new KafkaRecord<>(
>>>>>>
>>>>>> stringCoder.decode(inStream),
>>>>>>                                             intCoder.decode(inStream),
>>>>>>
>>>>>> longCoder.decode(inStream),
>>>>>>
>>>>>> longCoder.decode(inStream),
>>>>>>
>>>>>> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
>>>>>>                                             (Headers)
>>>>>> toHeaders(headerCoder.decode(inStream)),
>>>>>>
>>>>>> BooleanCoder.decode(inStream)? null:KeyCoder.decode(inStream),
>>>>>>
>>>>>> ValueCoder.decode(inStream)
>>>>>>                                             );
>>>>>>                 }
>>>>>>
>>>>>> Best regards,
>>>>>> Weiwen
>>>>>>
>>>>>

Reply via email to