Considering the problem of populating KafkaRecord metadata(BEAM-12076 <https://issues.apache.org/jira/projects/BEAM/issues/BEAM-12076>) together, what's the plan there? Are we going to make KafkaRecordCoder as a well-known coder as well? The reason why I ask is because it might be a good chance to revisit the KafkaRecordCoder implementation.
On Thu, Jun 3, 2021 at 2:17 PM Chamikara Jayalath <[email protected]> wrote: > > > On Thu, Jun 3, 2021 at 2:06 PM Boyuan Zhang <[email protected]> wrote: > >> Supporting the x-lang boundary is a good point. So you are suggesting >> that: >> >> 1. We make NullableCoder as a standard coder. >> 2. KafkaIO wraps the keyCoder with NullabeCoder directly if it >> requires. >> >> Is that correct? >> > > Yeah. > > >> >> >> On Wed, Jun 2, 2021 at 6:47 PM Chamikara Jayalath <[email protected]> >> wrote: >> >>> I think we should make NullableCoder a standard coder for Beam [1] and >>> use a standard Nullablecoder(KeyCoder) for Kafka keys (where KeyCoder might >>> be the standard ByteArrayCoder for example) >>> I think we have compatible Java and Python NullableCoder implementations >>> already so implementing this should be relatively straightforward. >>> >>> Non-standard coders may not be supported by runners at the >>> cross-language boundary. >>> >>> Thanks, >>> Cham >>> >>> [1] >>> https://github.com/apache/beam/blob/d0c3dd72874fada03cc601f30cde022a8dd6aa9c/model/pipeline/src/main/proto/beam_runner_api.proto#L784 >>> >>> On Wed, Jun 2, 2021 at 6:25 PM Ahmet Altay <[email protected]> wrote: >>> >>>> /cc folks who commented on the issue: @Robin Qiu <[email protected]> >>>> @Chamikara >>>> Jayalath <[email protected]> @Alexey Romanenko >>>> <[email protected]> @Daniel Collins <[email protected]> >>>> >>>> On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu <[email protected]> wrote: >>>> >>>>> Hello, >>>>> >>>>> I'm working on [this issue]( >>>>> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She >>>>> was very helpful in identifying the issue which is that KafkaRecordCoder >>>>> couldn't handle the case when key is null. >>>>> >>>>> We came out with two potential solutions. Yet both have its pros and >>>>> cons so I'm hoping to gather some suggestions/opinions or ideas of how to >>>>> handle this issue. For our solutions: >>>>> >>>>> 1. directly wrapping the keyCoder with Nullablecoder i.e. >>>>> NullableCoder.of(keyCoder) >>>>> cons: backwards compatibility problem >>>>> >>>>> 2. writing a completely new class named something like >>>>> NullableKeyKafkaRecordCoder >>>>> instead of using KVCoder and encode/decode KVs, we have KeyCoder >>>>> and ValueCoder as fields and another BooleanCoder to encode/decode T/F for >>>>> present of null key. If key is null, KeyCoder will not encode/decode. >>>>> >>>>> - [L63] encode(...){ >>>>> stringCoder.encode(topic, ...); >>>>> intCoder.encode(partition, ...); >>>>> longCoder.encode(offset, ...); >>>>> longCoder.encode(timestamp, ...); >>>>> intCoder.encode(timestamptype, ...); >>>>> headerCoder.encode(...) >>>>> if(Key!=null){ >>>>> BooleanCoder.encode(false, ...); >>>>> KeyCoder.encode(key, ...); >>>>> }else{ >>>>> BooleanCoder.encode(true, ...); >>>>> // skips KeyCoder when key is null >>>>> } >>>>> ValueCoder.encode(value, ...); >>>>> } >>>>> >>>>> - [L74] decode(...){ >>>>> return new KafkaRecord<>( >>>>> >>>>> stringCoder.decode(inStream), >>>>> intCoder.decode(inStream), >>>>> longCoder.decode(inStream), >>>>> longCoder.decode(inStream), >>>>> >>>>> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)), >>>>> (Headers) >>>>> toHeaders(headerCoder.decode(inStream)), >>>>> >>>>> BooleanCoder.decode(inStream)? null:KeyCoder.decode(inStream), >>>>> ValueCoder.decode(inStream) >>>>> ); >>>>> } >>>>> >>>>> Best regards, >>>>> Weiwen >>>>> >>>>
