[ 
https://issues.apache.org/jira/browse/BEAM-13320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17501993#comment-17501993
 ] 

Beam JIRA Bot commented on BEAM-13320:
--------------------------------------

This issue was marked "stale-P2" and has not received a public comment in 14 
days. It is now automatically moved to P3. If you are still affected by it, you 
can comment and move it back to P2.

> KafkaIO: inconsistent behaviour with Beam Row coders
> ----------------------------------------------------
>
>                 Key: BEAM-13320
>                 URL: https://issues.apache.org/jira/browse/BEAM-13320
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>            Reporter: Yaroslav Tkachenko
>            Priority: P3
>
> We've been using KafkaIO's _withValueDeserializerAndCoder_ method to provide 
> a Beam Row Coder for many versions of Beam, however, it stopped working in 
> 2.30 after _ReadFromKafkaViaSDF_ implementation was made the default in 
> BEAM-12114.
> As far as I can see, _ReadFromKafkaViaUnbounded_ simply uses the key and the 
> value coders that were passed with _withValueDeserializerAndCoder._
> But _ReadFromKafkaViaSDF_ relies on an internal _ReadSourceDescriptors_ 
> class, which, for some reason, doesn't receive the provided coders directly. 
> Instead, it leverages _DeserializerProvider_ which uses the _CoderRegistry_ 
> to get the coders. BEAM-9569 added a 
> [check|https://github.com/apache/beam/pull/10990/files#diff-fbe73e228ab8f76836ebfa899382731fe93c739f8cdb84ba9841b8f9591df175]
>  that practically prevents using CoderRegistry with Beam Row objects. {*}The 
> prevents us from using Beam Rows with KafkaIO{*}.
> As a workaround, we can use _--experiments=beam_fn_api_use_deprecated_read_ 
> to force _ReadFromKafkaViaUnbounded_ implementation, but I'm afraid it'll be 
> eventually deleted. 
> So, I feel like either:
>  * The existing KafkaIO documentation or examples needs to be updated to show 
> how to rely on schemas when using Beam Rows, as was suggested in BEAM-9569. 
> But I don't see how it can work with the existing implementation of KafkaIO.
>  * Or _ReadFromKafkaViaSDF_ needs to use the provided coders and not fallback 
> to {_}DeserializerProvider{_}. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to