[ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16689665#comment-16689665
 ] 

ASF GitHub Bot commented on FLINK-10455:
----------------------------------------

pnowojski commented on issue #7108: [FLINK-10455][Kafka Tx] Close transactional 
producers in case of failure and termination
URL: https://github.com/apache/flink/pull/7108#issuecomment-439457861
 
 
   Thanks @azagrebin merged

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Potential Kafka producer leak in case of failures
> -------------------------------------------------
>
>                 Key: FLINK-10455
>                 URL: https://issues.apache.org/jira/browse/FLINK-10455
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.5.2
>            Reporter: Nico Kruber
>            Assignee: Andrey Zagrebin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we 
> may get an {{ProducerFencedException}}. Documentation around 
> {{ProducerFencedException}} explicitly states that we should close the 
> producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in 
> {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in 
> {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an 
> exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} 
> simply iterates over the {{pendingCommitTransactions}} which is not touched 
> during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources 
> from the previous attempt will still linger around.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to