nbali edited a comment on pull request #16888:
URL: https://github.com/apache/beam/pull/16888#issuecomment-1048214139


   @jobegrabber
   I doubt this would be your workaround for that issue.
   * If the problem is the SDF implementation and how differently it sets up 
the coders, then you can manually enforce the KafkaIO to use the legacy read. 
You don't need this override for that. Just pick an experiment flag to use:
   
https://github.com/apache/beam/blob/31332b8f0c39e94f4364a5ec1148c6450c5e78f4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1307-L1310
   * If both implementation (legacy and SDF) has got issues with coders, then 
this replacement shouldn't matter. Actually exactly the same coder that was 
used before should be passed when the override this PR is trying to get rid of 
happens. So I don't see how having/not having this override messes with your 
coders if you provided them with the "intended" way.
   
   I think it would be beneficial to copy the code snippet where your 
KafkaIO.Read transformations is being configured to see how exactly are you 
providing serializers/coders.
   
   You can check yourself how it keeps the same coder during the override:
   
https://github.com/apache/beam/blob/81f3a16bccce6b2a5cb0978e71856ea53eb3bbde/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1342-L1348
   
   ... and these coders are being calculated at:
   
https://github.com/apache/beam/blob/81f3a16bccce6b2a5cb0978e71856ea53eb3bbde/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1511-L1521
   
   Are you using the proper methods to set these up 
(`withKeyDeserializer`/`withKeyDeserializerAndCoder`/etc)?  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to