Could you register a schema for KafkaRecord? Then you can use SchemaCoder
which handles the conversion to/from Row.

On Thu, Jun 3, 2021 at 2:39 PM Chamikara Jayalath <[email protected]>
wrote:

> I think for that and for any transform that produces a PCollection of a
> type that is not represented by a standard coder, we would have to add a
> cross-language builder class that returns a PCollection that can be
> supported at the cross-language boundary. For example, it can be a
> PCollection<Row> since RowCoder is already a standard coder. I haven't
> looked closely into fixing this particular Jira though.
>
> Thanks,
> Cham
>
> On Thu, Jun 3, 2021 at 2:31 PM Boyuan Zhang <[email protected]> wrote:
>
>> Considering the problem of populating KafkaRecord metadata(BEAM-12076
>> <https://issues.apache.org/jira/projects/BEAM/issues/BEAM-12076>)
>> together, what's the plan there? Are we going to make KafkaRecordCoder as a
>> well-known coder as well? The reason why I ask is because it might be a
>> good chance to revisit the KafkaRecordCoder implementation.
>>
>> On Thu, Jun 3, 2021 at 2:17 PM Chamikara Jayalath <[email protected]>
>> wrote:
>>
>>>
>>>
>>> On Thu, Jun 3, 2021 at 2:06 PM Boyuan Zhang <[email protected]> wrote:
>>>
>>>> Supporting the x-lang boundary is a good point. So you are suggesting
>>>> that:
>>>>
>>>>    1. We make NullableCoder as a standard coder.
>>>>    2. KafkaIO wraps the keyCoder with NullabeCoder directly if it
>>>>    requires.
>>>>
>>>> Is that correct?
>>>>
>>>
>>> Yeah.
>>>
>>>
>>>>
>>>>
>>>> On Wed, Jun 2, 2021 at 6:47 PM Chamikara Jayalath <[email protected]>
>>>> wrote:
>>>>
>>>>> I think we should make NullableCoder a standard coder for Beam [1] and
>>>>> use a standard Nullablecoder(KeyCoder) for Kafka keys (where KeyCoder 
>>>>> might
>>>>> be the standard ByteArrayCoder for example)
>>>>> I think we have compatible Java and Python NullableCoder
>>>>> implementations already so implementing this should be relatively
>>>>> straightforward.
>>>>>
>>>>> Non-standard coders may not be supported by runners at the
>>>>> cross-language boundary.
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/beam/blob/d0c3dd72874fada03cc601f30cde022a8dd6aa9c/model/pipeline/src/main/proto/beam_runner_api.proto#L784
>>>>>
>>>>> On Wed, Jun 2, 2021 at 6:25 PM Ahmet Altay <[email protected]> wrote:
>>>>>
>>>>>> /cc folks who commented on the issue: @Robin Qiu <[email protected]>
>>>>>>  @Chamikara Jayalath <[email protected]> @Alexey Romanenko
>>>>>> <[email protected]> @Daniel Collins <[email protected]>
>>>>>>
>>>>>> On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu <[email protected]> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I'm working on [this issue](
>>>>>>> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She
>>>>>>> was very helpful in identifying the issue which is that KafkaRecordCoder
>>>>>>> couldn't handle the case when key is null.
>>>>>>>
>>>>>>> We came out with two potential solutions. Yet both have its pros and
>>>>>>> cons so I'm hoping to gather some suggestions/opinions or ideas of how 
>>>>>>> to
>>>>>>> handle this issue. For our solutions:
>>>>>>>
>>>>>>> 1. directly wrapping the keyCoder with Nullablecoder i.e.
>>>>>>> NullableCoder.of(keyCoder)
>>>>>>>     cons: backwards compatibility problem
>>>>>>>
>>>>>>> 2. writing a completely new class named something like
>>>>>>> NullableKeyKafkaRecordCoder
>>>>>>>     instead of using KVCoder and encode/decode KVs, we have KeyCoder
>>>>>>> and ValueCoder as fields and another BooleanCoder to encode/decode T/F 
>>>>>>> for
>>>>>>> present of null key. If key is null, KeyCoder will not encode/decode.
>>>>>>>
>>>>>>>               - [L63] encode(...){
>>>>>>>                        stringCoder.encode(topic, ...);
>>>>>>>                        intCoder.encode(partition, ...);
>>>>>>>                        longCoder.encode(offset, ...);
>>>>>>>                        longCoder.encode(timestamp, ...);
>>>>>>>                        intCoder.encode(timestamptype, ...);
>>>>>>>                        headerCoder.encode(...)
>>>>>>>                        if(Key!=null){
>>>>>>>                               BooleanCoder.encode(false, ...);
>>>>>>>                               KeyCoder.encode(key, ...);
>>>>>>>                        }else{
>>>>>>>                               BooleanCoder.encode(true, ...);
>>>>>>>                               // skips KeyCoder when key is null
>>>>>>>                        }
>>>>>>>                       ValueCoder.encode(value, ...);
>>>>>>>                 }
>>>>>>>
>>>>>>>               - [L74] decode(...){
>>>>>>>                       return new KafkaRecord<>(
>>>>>>>
>>>>>>> stringCoder.decode(inStream),
>>>>>>>
>>>>>>> intCoder.decode(inStream),
>>>>>>>
>>>>>>> longCoder.decode(inStream),
>>>>>>>
>>>>>>> longCoder.decode(inStream),
>>>>>>>
>>>>>>> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
>>>>>>>                                             (Headers)
>>>>>>> toHeaders(headerCoder.decode(inStream)),
>>>>>>>
>>>>>>> BooleanCoder.decode(inStream)? null:KeyCoder.decode(inStream),
>>>>>>>
>>>>>>> ValueCoder.decode(inStream)
>>>>>>>                                             );
>>>>>>>                 }
>>>>>>>
>>>>>>> Best regards,
>>>>>>> Weiwen
>>>>>>>
>>>>>>

Reply via email to