On Thu, Jun 3, 2021 at 2:06 PM Boyuan Zhang <[email protected]> wrote:
> Supporting the x-lang boundary is a good point. So you are suggesting that: > > 1. We make NullableCoder as a standard coder. > 2. KafkaIO wraps the keyCoder with NullabeCoder directly if it > requires. > > Is that correct? > Yeah. > > > On Wed, Jun 2, 2021 at 6:47 PM Chamikara Jayalath <[email protected]> > wrote: > >> I think we should make NullableCoder a standard coder for Beam [1] and >> use a standard Nullablecoder(KeyCoder) for Kafka keys (where KeyCoder might >> be the standard ByteArrayCoder for example) >> I think we have compatible Java and Python NullableCoder implementations >> already so implementing this should be relatively straightforward. >> >> Non-standard coders may not be supported by runners at the cross-language >> boundary. >> >> Thanks, >> Cham >> >> [1] >> https://github.com/apache/beam/blob/d0c3dd72874fada03cc601f30cde022a8dd6aa9c/model/pipeline/src/main/proto/beam_runner_api.proto#L784 >> >> On Wed, Jun 2, 2021 at 6:25 PM Ahmet Altay <[email protected]> wrote: >> >>> /cc folks who commented on the issue: @Robin Qiu <[email protected]> >>> @Chamikara >>> Jayalath <[email protected]> @Alexey Romanenko >>> <[email protected]> @Daniel Collins <[email protected]> >>> >>> On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu <[email protected]> wrote: >>> >>>> Hello, >>>> >>>> I'm working on [this issue]( >>>> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She was >>>> very helpful in identifying the issue which is that KafkaRecordCoder >>>> couldn't handle the case when key is null. >>>> >>>> We came out with two potential solutions. Yet both have its pros and >>>> cons so I'm hoping to gather some suggestions/opinions or ideas of how to >>>> handle this issue. For our solutions: >>>> >>>> 1. directly wrapping the keyCoder with Nullablecoder i.e. >>>> NullableCoder.of(keyCoder) >>>> cons: backwards compatibility problem >>>> >>>> 2. writing a completely new class named something like >>>> NullableKeyKafkaRecordCoder >>>> instead of using KVCoder and encode/decode KVs, we have KeyCoder >>>> and ValueCoder as fields and another BooleanCoder to encode/decode T/F for >>>> present of null key. If key is null, KeyCoder will not encode/decode. >>>> >>>> - [L63] encode(...){ >>>> stringCoder.encode(topic, ...); >>>> intCoder.encode(partition, ...); >>>> longCoder.encode(offset, ...); >>>> longCoder.encode(timestamp, ...); >>>> intCoder.encode(timestamptype, ...); >>>> headerCoder.encode(...) >>>> if(Key!=null){ >>>> BooleanCoder.encode(false, ...); >>>> KeyCoder.encode(key, ...); >>>> }else{ >>>> BooleanCoder.encode(true, ...); >>>> // skips KeyCoder when key is null >>>> } >>>> ValueCoder.encode(value, ...); >>>> } >>>> >>>> - [L74] decode(...){ >>>> return new KafkaRecord<>( >>>> >>>> stringCoder.decode(inStream), >>>> intCoder.decode(inStream), >>>> longCoder.decode(inStream), >>>> longCoder.decode(inStream), >>>> >>>> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)), >>>> (Headers) >>>> toHeaders(headerCoder.decode(inStream)), >>>> >>>> BooleanCoder.decode(inStream)? null:KeyCoder.decode(inStream), >>>> ValueCoder.decode(inStream) >>>> ); >>>> } >>>> >>>> Best regards, >>>> Weiwen >>>> >>>
