aljoscha opened a new pull request #12589:
URL: https://github.com/apache/flink/pull/12589


   ## What is the purpose of the change
   
   This fixes thread leaks that will lead to Flink Task Managers being killed 
because of the watchdog.
   
   A summary of the bug:
    - our code calls close() on the KafkaProducer (Kafka code), which is 
equivalent to calling close with a timeout of Long.MAX_VALUE
    - this means threads will leak when a failure happens, for example because 
of Broker downtime
    - the Flink Task Watchdog will kill the Task Manager because of these 
threads after a timeout
   The fix is to always call close() with a reasonable timeout.
   
   The fix also requires a Kafka version bump because of KAFKA-6635/KAFKA-7763, 
which mean that resources still leak even when closing with a timeout. 
Additionally, we need to close with exactly zero as timeout, because otherwise 
in-flight transactions will be aborted.
   
   ## Brief change log
   
   - always call close with zero timeout
   - bump Kafka version, this requires some changes to how we use reflection on 
the Kafka Producer because the internals changed
   
   I add code that forbids calling `close()` with zero timeout by throwing an 
exception. Alternatively, I could change that to call `close(0)` under the 
hood, i.e. always enforce calling with zero timeout from the "inside".
   
   ## Verifying this change
   
   This is covered by existing threads. I didn't add a test that verifies that 
we fix the original bug because we would essentially be testing Kafka 
internals. If I have to I might be able to whip something up.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): yes, Kafka
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
   Doesn't introduce a new feature.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to