Could you register a schema for KafkaRecord? Then you can use SchemaCoder which handles the conversion to/from Row.
On Thu, Jun 3, 2021 at 2:39 PM Chamikara Jayalath <[email protected]> wrote: > I think for that and for any transform that produces a PCollection of a > type that is not represented by a standard coder, we would have to add a > cross-language builder class that returns a PCollection that can be > supported at the cross-language boundary. For example, it can be a > PCollection<Row> since RowCoder is already a standard coder. I haven't > looked closely into fixing this particular Jira though. > > Thanks, > Cham > > On Thu, Jun 3, 2021 at 2:31 PM Boyuan Zhang <[email protected]> wrote: > >> Considering the problem of populating KafkaRecord metadata(BEAM-12076 >> <https://issues.apache.org/jira/projects/BEAM/issues/BEAM-12076>) >> together, what's the plan there? Are we going to make KafkaRecordCoder as a >> well-known coder as well? The reason why I ask is because it might be a >> good chance to revisit the KafkaRecordCoder implementation. >> >> On Thu, Jun 3, 2021 at 2:17 PM Chamikara Jayalath <[email protected]> >> wrote: >> >>> >>> >>> On Thu, Jun 3, 2021 at 2:06 PM Boyuan Zhang <[email protected]> wrote: >>> >>>> Supporting the x-lang boundary is a good point. So you are suggesting >>>> that: >>>> >>>> 1. We make NullableCoder as a standard coder. >>>> 2. KafkaIO wraps the keyCoder with NullabeCoder directly if it >>>> requires. >>>> >>>> Is that correct? >>>> >>> >>> Yeah. >>> >>> >>>> >>>> >>>> On Wed, Jun 2, 2021 at 6:47 PM Chamikara Jayalath <[email protected]> >>>> wrote: >>>> >>>>> I think we should make NullableCoder a standard coder for Beam [1] and >>>>> use a standard Nullablecoder(KeyCoder) for Kafka keys (where KeyCoder >>>>> might >>>>> be the standard ByteArrayCoder for example) >>>>> I think we have compatible Java and Python NullableCoder >>>>> implementations already so implementing this should be relatively >>>>> straightforward. >>>>> >>>>> Non-standard coders may not be supported by runners at the >>>>> cross-language boundary. >>>>> >>>>> Thanks, >>>>> Cham >>>>> >>>>> [1] >>>>> https://github.com/apache/beam/blob/d0c3dd72874fada03cc601f30cde022a8dd6aa9c/model/pipeline/src/main/proto/beam_runner_api.proto#L784 >>>>> >>>>> On Wed, Jun 2, 2021 at 6:25 PM Ahmet Altay <[email protected]> wrote: >>>>> >>>>>> /cc folks who commented on the issue: @Robin Qiu <[email protected]> >>>>>> @Chamikara Jayalath <[email protected]> @Alexey Romanenko >>>>>> <[email protected]> @Daniel Collins <[email protected]> >>>>>> >>>>>> On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu <[email protected]> wrote: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> I'm working on [this issue]( >>>>>>> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She >>>>>>> was very helpful in identifying the issue which is that KafkaRecordCoder >>>>>>> couldn't handle the case when key is null. >>>>>>> >>>>>>> We came out with two potential solutions. Yet both have its pros and >>>>>>> cons so I'm hoping to gather some suggestions/opinions or ideas of how >>>>>>> to >>>>>>> handle this issue. For our solutions: >>>>>>> >>>>>>> 1. directly wrapping the keyCoder with Nullablecoder i.e. >>>>>>> NullableCoder.of(keyCoder) >>>>>>> cons: backwards compatibility problem >>>>>>> >>>>>>> 2. writing a completely new class named something like >>>>>>> NullableKeyKafkaRecordCoder >>>>>>> instead of using KVCoder and encode/decode KVs, we have KeyCoder >>>>>>> and ValueCoder as fields and another BooleanCoder to encode/decode T/F >>>>>>> for >>>>>>> present of null key. If key is null, KeyCoder will not encode/decode. >>>>>>> >>>>>>> - [L63] encode(...){ >>>>>>> stringCoder.encode(topic, ...); >>>>>>> intCoder.encode(partition, ...); >>>>>>> longCoder.encode(offset, ...); >>>>>>> longCoder.encode(timestamp, ...); >>>>>>> intCoder.encode(timestamptype, ...); >>>>>>> headerCoder.encode(...) >>>>>>> if(Key!=null){ >>>>>>> BooleanCoder.encode(false, ...); >>>>>>> KeyCoder.encode(key, ...); >>>>>>> }else{ >>>>>>> BooleanCoder.encode(true, ...); >>>>>>> // skips KeyCoder when key is null >>>>>>> } >>>>>>> ValueCoder.encode(value, ...); >>>>>>> } >>>>>>> >>>>>>> - [L74] decode(...){ >>>>>>> return new KafkaRecord<>( >>>>>>> >>>>>>> stringCoder.decode(inStream), >>>>>>> >>>>>>> intCoder.decode(inStream), >>>>>>> >>>>>>> longCoder.decode(inStream), >>>>>>> >>>>>>> longCoder.decode(inStream), >>>>>>> >>>>>>> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)), >>>>>>> (Headers) >>>>>>> toHeaders(headerCoder.decode(inStream)), >>>>>>> >>>>>>> BooleanCoder.decode(inStream)? null:KeyCoder.decode(inStream), >>>>>>> >>>>>>> ValueCoder.decode(inStream) >>>>>>> ); >>>>>>> } >>>>>>> >>>>>>> Best regards, >>>>>>> Weiwen >>>>>>> >>>>>>
