becketqin opened a new pull request #9224: [FLINK-13226] [connectors / kafka] 
Fix race condition between transaction commit and produc…
URL: https://github.com/apache/flink/pull/9224
 
 
   …er closure.
   
   ## What is the purpose of the change
   This patch fixes a race condition between the checkpointing thread and main 
thread. The sequence causing the deadlock is the following:
   1. In `FlinkKafkaProducer`, the main thread encounters a problem and closes 
all the producer to start failover.
   2. The previous checkpoint has completed, so the checkpointing thread grabs 
the checkpoint lock and tries to commit the transaction on the producer that 
has been closed in step 1. This commit will never succeed due to 
[KAFKA-6635](https://issues.apache.org/jira/browse/KAFKA-6635). So the 
checkpoint thread blocks forever.
   3. In `StreamTask`, the main thread will eventually try to release all the 
record writer. To do that, it attempts to grab the checkpoint lock which is 
hold by checkpoint thread in step 2 and will never be released. So the main 
thread also blocks forever.
   
   KAFKA-6635 has been fixed in Kafka 2.3.0. But Flink 1.9 does not rely on 
that yet, and we also support Kafka 0.11. So we are just going to fix on the 
Flink side first. The solution is to make sure that in `FlinkKafkaProducer` any 
operation relying on the underlying sender thread to finish throws an exception 
if the producer is closed.
   
   This patch also fixes a minor issue of duplicated static inner class name in 
`KafkaConsumerTestBase`.
   
   ## Brief change log
   - Make `FlinkKafkaProducer` and `FlinkKafkaInternalProducer` thread safe.
   - Fix static inner class name collision in `KafkaConsumerTestBase`.
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Added test to ensure exception will be thrown if a blocking method is 
called on `FlinkKafkaProducer` and `FlinkKafkaInternalProducer` after the 
producer is closed.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to