I think for that and for any transform that produces a PCollection of a type that is not represented by a standard coder, we would have to add a cross-language builder class that returns a PCollection that can be supported at the cross-language boundary. For example, it can be a PCollection<Row> since RowCoder is already a standard coder. I haven't looked closely into fixing this particular Jira though.
Thanks, Cham On Thu, Jun 3, 2021 at 2:31 PM Boyuan Zhang <[email protected]> wrote: > Considering the problem of populating KafkaRecord metadata(BEAM-12076 > <https://issues.apache.org/jira/projects/BEAM/issues/BEAM-12076>) > together, what's the plan there? Are we going to make KafkaRecordCoder as a > well-known coder as well? The reason why I ask is because it might be a > good chance to revisit the KafkaRecordCoder implementation. > > On Thu, Jun 3, 2021 at 2:17 PM Chamikara Jayalath <[email protected]> > wrote: > >> >> >> On Thu, Jun 3, 2021 at 2:06 PM Boyuan Zhang <[email protected]> wrote: >> >>> Supporting the x-lang boundary is a good point. So you are suggesting >>> that: >>> >>> 1. We make NullableCoder as a standard coder. >>> 2. KafkaIO wraps the keyCoder with NullabeCoder directly if it >>> requires. >>> >>> Is that correct? >>> >> >> Yeah. >> >> >>> >>> >>> On Wed, Jun 2, 2021 at 6:47 PM Chamikara Jayalath <[email protected]> >>> wrote: >>> >>>> I think we should make NullableCoder a standard coder for Beam [1] and >>>> use a standard Nullablecoder(KeyCoder) for Kafka keys (where KeyCoder might >>>> be the standard ByteArrayCoder for example) >>>> I think we have compatible Java and Python NullableCoder >>>> implementations already so implementing this should be relatively >>>> straightforward. >>>> >>>> Non-standard coders may not be supported by runners at the >>>> cross-language boundary. >>>> >>>> Thanks, >>>> Cham >>>> >>>> [1] >>>> https://github.com/apache/beam/blob/d0c3dd72874fada03cc601f30cde022a8dd6aa9c/model/pipeline/src/main/proto/beam_runner_api.proto#L784 >>>> >>>> On Wed, Jun 2, 2021 at 6:25 PM Ahmet Altay <[email protected]> wrote: >>>> >>>>> /cc folks who commented on the issue: @Robin Qiu <[email protected]> >>>>> @Chamikara >>>>> Jayalath <[email protected]> @Alexey Romanenko >>>>> <[email protected]> @Daniel Collins <[email protected]> >>>>> >>>>> On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu <[email protected]> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> I'm working on [this issue]( >>>>>> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She >>>>>> was very helpful in identifying the issue which is that KafkaRecordCoder >>>>>> couldn't handle the case when key is null. >>>>>> >>>>>> We came out with two potential solutions. Yet both have its pros and >>>>>> cons so I'm hoping to gather some suggestions/opinions or ideas of how to >>>>>> handle this issue. For our solutions: >>>>>> >>>>>> 1. directly wrapping the keyCoder with Nullablecoder i.e. >>>>>> NullableCoder.of(keyCoder) >>>>>> cons: backwards compatibility problem >>>>>> >>>>>> 2. writing a completely new class named something like >>>>>> NullableKeyKafkaRecordCoder >>>>>> instead of using KVCoder and encode/decode KVs, we have KeyCoder >>>>>> and ValueCoder as fields and another BooleanCoder to encode/decode T/F >>>>>> for >>>>>> present of null key. If key is null, KeyCoder will not encode/decode. >>>>>> >>>>>> - [L63] encode(...){ >>>>>> stringCoder.encode(topic, ...); >>>>>> intCoder.encode(partition, ...); >>>>>> longCoder.encode(offset, ...); >>>>>> longCoder.encode(timestamp, ...); >>>>>> intCoder.encode(timestamptype, ...); >>>>>> headerCoder.encode(...) >>>>>> if(Key!=null){ >>>>>> BooleanCoder.encode(false, ...); >>>>>> KeyCoder.encode(key, ...); >>>>>> }else{ >>>>>> BooleanCoder.encode(true, ...); >>>>>> // skips KeyCoder when key is null >>>>>> } >>>>>> ValueCoder.encode(value, ...); >>>>>> } >>>>>> >>>>>> - [L74] decode(...){ >>>>>> return new KafkaRecord<>( >>>>>> >>>>>> stringCoder.decode(inStream), >>>>>> intCoder.decode(inStream), >>>>>> >>>>>> longCoder.decode(inStream), >>>>>> >>>>>> longCoder.decode(inStream), >>>>>> >>>>>> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)), >>>>>> (Headers) >>>>>> toHeaders(headerCoder.decode(inStream)), >>>>>> >>>>>> BooleanCoder.decode(inStream)? null:KeyCoder.decode(inStream), >>>>>> >>>>>> ValueCoder.decode(inStream) >>>>>> ); >>>>>> } >>>>>> >>>>>> Best regards, >>>>>> Weiwen >>>>>> >>>>>
