[
https://issues.apache.org/jira/browse/BEAM-13924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jonas Grabber updated BEAM-13924:
---------------------------------
Description:
When upgrading from 2.29.0 to 2.36.0 our Kafka Read transform broke.
While debugging, I saw that information of the key and value coders is lost
after [this statement in
KafkaIO|https://github.com/apache/beam/blob/v2.36.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1459]:
{code:java}
return output.apply(readTransform).setCoder(KafkaRecordCoder.of(keyCoder,
valueCoder));
{code}
The issue seems to be that {{output.apply}} already checks for the presence of
key and value coders and ends up trying to infer them with the help of
[LocalDeserializerProvider|https://github.com/apache/beam/blob/v2.36.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java],
so the process never arrives at {{{}setCoder{}}}.
In our case this inference fails since we implement the {{Deserializer}}
interface in a super class of the instance passed as the deserializer.
This was not yet broken on 2.29.0, so all versions after that could be affected.
was:
When upgrading from 2.29.0 to 2.36.0 our Kafka Read transform broke.
While debugging, I saw that information of the key and value coders is lost
after [this statement in
KafkaIO|https://github.com/apache/beam/blob/v2.36.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1459]:
{code:java}
return output.apply(readTransform).setCoder(KafkaRecordCoder.of(keyCoder,
valueCoder));
{code}
The issue seems to be that output.apply already checks for the presence of key
and value coders and ends up trying to infer them with the help of
[LocalDeserializerProvider|https://github.com/apache/beam/blob/v2.36.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java],
so the process never arrives at setCoder.
In our case this inference fails since we implement the Deserializer interface
in a super class of the instance passed as the Deserializer.
This was not yet broken on 2.29.0, so all versions after that could be affected.
> Coder information lost in Kafka Read
> ------------------------------------
>
> Key: BEAM-13924
> URL: https://issues.apache.org/jira/browse/BEAM-13924
> Project: Beam
> Issue Type: Bug
> Components: io-java-kafka
> Affects Versions: 2.36.0
> Reporter: Jonas Grabber
> Priority: P2
> Labels: bug, kafka
>
> When upgrading from 2.29.0 to 2.36.0 our Kafka Read transform broke.
> While debugging, I saw that information of the key and value coders is lost
> after [this statement in
> KafkaIO|https://github.com/apache/beam/blob/v2.36.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1459]:
> {code:java}
> return output.apply(readTransform).setCoder(KafkaRecordCoder.of(keyCoder,
> valueCoder));
> {code}
> The issue seems to be that {{output.apply}} already checks for the presence
> of key and value coders and ends up trying to infer them with the help of
> [LocalDeserializerProvider|https://github.com/apache/beam/blob/v2.36.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java],
> so the process never arrives at {{{}setCoder{}}}.
> In our case this inference fails since we implement the {{Deserializer}}
> interface in a super class of the instance passed as the deserializer.
> This was not yet broken on 2.29.0, so all versions after that could be
> affected.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)