Supporting the x-lang boundary is a good point. So you are suggesting that:
1. We make NullableCoder as a standard coder. 2. KafkaIO wraps the keyCoder with NullabeCoder directly if it requires. Is that correct? On Wed, Jun 2, 2021 at 6:47 PM Chamikara Jayalath <[email protected]> wrote: > I think we should make NullableCoder a standard coder for Beam [1] and use > a standard Nullablecoder(KeyCoder) for Kafka keys (where KeyCoder might be > the standard ByteArrayCoder for example) > I think we have compatible Java and Python NullableCoder implementations > already so implementing this should be relatively straightforward. > > Non-standard coders may not be supported by runners at the cross-language > boundary. > > Thanks, > Cham > > [1] > https://github.com/apache/beam/blob/d0c3dd72874fada03cc601f30cde022a8dd6aa9c/model/pipeline/src/main/proto/beam_runner_api.proto#L784 > > On Wed, Jun 2, 2021 at 6:25 PM Ahmet Altay <[email protected]> wrote: > >> /cc folks who commented on the issue: @Robin Qiu <[email protected]> >> @Chamikara >> Jayalath <[email protected]> @Alexey Romanenko >> <[email protected]> @Daniel Collins <[email protected]> >> >> On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu <[email protected]> wrote: >> >>> Hello, >>> >>> I'm working on [this issue]( >>> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She was >>> very helpful in identifying the issue which is that KafkaRecordCoder >>> couldn't handle the case when key is null. >>> >>> We came out with two potential solutions. Yet both have its pros and >>> cons so I'm hoping to gather some suggestions/opinions or ideas of how to >>> handle this issue. For our solutions: >>> >>> 1. directly wrapping the keyCoder with Nullablecoder i.e. >>> NullableCoder.of(keyCoder) >>> cons: backwards compatibility problem >>> >>> 2. writing a completely new class named something like >>> NullableKeyKafkaRecordCoder >>> instead of using KVCoder and encode/decode KVs, we have KeyCoder and >>> ValueCoder as fields and another BooleanCoder to encode/decode T/F for >>> present of null key. If key is null, KeyCoder will not encode/decode. >>> >>> - [L63] encode(...){ >>> stringCoder.encode(topic, ...); >>> intCoder.encode(partition, ...); >>> longCoder.encode(offset, ...); >>> longCoder.encode(timestamp, ...); >>> intCoder.encode(timestamptype, ...); >>> headerCoder.encode(...) >>> if(Key!=null){ >>> BooleanCoder.encode(false, ...); >>> KeyCoder.encode(key, ...); >>> }else{ >>> BooleanCoder.encode(true, ...); >>> // skips KeyCoder when key is null >>> } >>> ValueCoder.encode(value, ...); >>> } >>> >>> - [L74] decode(...){ >>> return new KafkaRecord<>( >>> stringCoder.decode(inStream), >>> intCoder.decode(inStream), >>> longCoder.decode(inStream), >>> longCoder.decode(inStream), >>> >>> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)), >>> (Headers) >>> toHeaders(headerCoder.decode(inStream)), >>> >>> BooleanCoder.decode(inStream)? null:KeyCoder.decode(inStream), >>> ValueCoder.decode(inStream) >>> ); >>> } >>> >>> Best regards, >>> Weiwen >>> >>
