Unrecoverable exceptions in Kafka fetch should result in IOException in the 
reader.

KafkaConsumer.poll() could throw unrecoverable exceptions, KafkaIO didn't handle
it well. The poll thread just silently died. It should result in an IOException
thrown inside start()/advance() unbounded reader.

Fix: The consumer poll thread saves the exception before exiting, and the 
reader checks for it when before trying to serve more records. Added a unit 
test to verify.





[ Full content available at: https://github.com/apache/beam/pull/6391 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to