damccorm opened a new issue, #21189: URL: https://github.com/apache/beam/issues/21189
We've been using KafkaIO's _withValueDeserializerAndCoder_ method to provide a Beam Row Coder for many versions of Beam, however, it stopped working in 2.30 after _ReadFromKafkaViaSDF_ implementation was made the default in BEAM-12114. As far as I can see, _ReadFromKafkaViaUnbounded_ simply uses the key and the value coders that were passed with _withValueDeserializerAndCoder._ But _ReadFromKafkaViaSDF_ relies on an internal _ReadSourceDescriptors_ class, which, for some reason, doesn't receive the provided coders directly. Instead, it leverages _DeserializerProvider_ which uses the _CoderRegistry_ to get the coders. BEAM-9569 added a [check](https://github.com/apache/beam/pull/10990/files#diff-fbe73e228ab8f76836ebfa899382731fe93c739f8cdb84ba9841b8f9591df175) that practically prevents using CoderRegistry with Beam Row objects. __The prevents us from using Beam Rows with KafkaIO__. As a workaround, we can use _\--experiments=beam_fn_api_use_deprecated_read_ to force _ReadFromKafkaViaUnbounded_ implementation, but I'm afraid it'll be eventually deleted. So, I feel like either: * The existing KafkaIO documentation or examples needs to be updated to show how to rely on schemas when using Beam Rows, as was suggested in BEAM-9569. But I don't see how it can work with the existing implementation of KafkaIO. * Or _ReadFromKafkaViaSDF_ needs to use the provided coders and not fallback to __DeserializerProvider__. Imported from Jira [BEAM-13320](https://issues.apache.org/jira/browse/BEAM-13320). Original Jira may contain additional context. Reported by: sap1ens. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
