I think we should make NullableCoder a standard coder for Beam [1] and use a standard Nullablecoder(KeyCoder) for Kafka keys (where KeyCoder might be the standard ByteArrayCoder for example) I think we have compatible Java and Python NullableCoder implementations already so implementing this should be relatively straightforward.
Non-standard coders may not be supported by runners at the cross-language boundary. Thanks, Cham [1] https://github.com/apache/beam/blob/d0c3dd72874fada03cc601f30cde022a8dd6aa9c/model/pipeline/src/main/proto/beam_runner_api.proto#L784 On Wed, Jun 2, 2021 at 6:25 PM Ahmet Altay <[email protected]> wrote: > /cc folks who commented on the issue: @Robin Qiu <[email protected]> > @Chamikara > Jayalath <[email protected]> @Alexey Romanenko > <[email protected]> @Daniel Collins <[email protected]> > > On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu <[email protected]> wrote: > >> Hello, >> >> I'm working on [this issue]( >> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She was >> very helpful in identifying the issue which is that KafkaRecordCoder >> couldn't handle the case when key is null. >> >> We came out with two potential solutions. Yet both have its pros and cons >> so I'm hoping to gather some suggestions/opinions or ideas of how to handle >> this issue. For our solutions: >> >> 1. directly wrapping the keyCoder with Nullablecoder i.e. >> NullableCoder.of(keyCoder) >> cons: backwards compatibility problem >> >> 2. writing a completely new class named something like >> NullableKeyKafkaRecordCoder >> instead of using KVCoder and encode/decode KVs, we have KeyCoder and >> ValueCoder as fields and another BooleanCoder to encode/decode T/F for >> present of null key. If key is null, KeyCoder will not encode/decode. >> >> - [L63] encode(...){ >> stringCoder.encode(topic, ...); >> intCoder.encode(partition, ...); >> longCoder.encode(offset, ...); >> longCoder.encode(timestamp, ...); >> intCoder.encode(timestamptype, ...); >> headerCoder.encode(...) >> if(Key!=null){ >> BooleanCoder.encode(false, ...); >> KeyCoder.encode(key, ...); >> }else{ >> BooleanCoder.encode(true, ...); >> // skips KeyCoder when key is null >> } >> ValueCoder.encode(value, ...); >> } >> >> - [L74] decode(...){ >> return new KafkaRecord<>( >> stringCoder.decode(inStream), >> intCoder.decode(inStream), >> longCoder.decode(inStream), >> longCoder.decode(inStream), >> >> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)), >> (Headers) >> toHeaders(headerCoder.decode(inStream)), >> >> BooleanCoder.decode(inStream)? null:KeyCoder.decode(inStream), >> ValueCoder.decode(inStream) >> ); >> } >> >> Best regards, >> Weiwen >> >
