Yaroslav Tkachenko created BEAM-13320:
-----------------------------------------

             Summary: KafkaIO: inconsistent behaviour with Beam Row coders
                 Key: BEAM-13320
                 URL: https://issues.apache.org/jira/browse/BEAM-13320
             Project: Beam
          Issue Type: Bug
          Components: io-java-kafka
            Reporter: Yaroslav Tkachenko


We've been using KafkaIO's _withValueDeserializerAndCoder_ method to provide a 
Beam Row Coder for many versions of Beam, however, it stopped working in 2.30 
after _ReadFromKafkaViaSDF_ implementation was made the default in BEAM-12114.

As far as I can see, _ReadFromKafkaViaUnbounded_ simply uses the key and the 
value coders that were passed with _withValueDeserializerAndCoder._

But _ReadFromKafkaViaSDF_ relies on an internal _ReadSourceDescriptors_ class, 
which, for some reason, doesn't receive the provided coders directly. Instead, 
it leverages _DeserializerProvider_ which uses the _CoderRegistry_ to get the 
coders. BEAM-9569 added a 
[check|https://github.com/apache/beam/pull/10990/files#diff-fbe73e228ab8f76836ebfa899382731fe93c739f8cdb84ba9841b8f9591df175]
 that practically prevents using CoderRegistry with Beam Row objects. {*}The 
prevents us from using Beam Rows with KafkaIO{*}.

As a workaround, we can use _--experiments=beam_fn_api_use_deprecated_read_ to 
force _ReadFromKafkaViaUnbounded_ implementation, but I'm afraid it'll be 
eventually deleted. 

So, I feel like either:
 * The existing KafkaIO documentation or examples needs to be updated to show 
how to rely on schemas when using Beam Rows, as was suggested in BEAM-9569. But 
I don't see how it can work with the existing implementation of KafkaIO.
 * Or _ReadFromKafkaViaSDF_ needs to use the provided coders and not fallback 
to {_}DeserializerProvider{_}. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to