[
https://issues.apache.org/jira/browse/KAFKA-4767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15870121#comment-15870121
]
Buğra Gedik edited comment on KAFKA-4767 at 2/16/17 3:16 PM:
-------------------------------------------------------------
I understand that it will eventually go away. But that does not cut it for us.
We would like the IO thread to be shutdown after {{close}} returns. And that
does not happen if we get an interrupt during close(). Yes, eventually it will
go away. However, various software such as Tomcat, has thread leak detectors
and they are turned on by this behavior. Once join is interrupted, we (client
code that uses KafkaProducer) have no way of 'waiting' until the IO thread goes
away.
was (Author: bgedik):
I understand that it will eventually go away. But that does not cut it for us.
We would like the IO thread to be shutdown after {{close}} returns. And that
does not happen if we get an interrupt during close(). Yes, eventually it will
go away. However, various software such as Tomcat, has thread leak detectors
and they are turned on by this behavior. Once join is interrupted, we have no
way of 'waiting' until the IO thread goes away.
> KafkaProducer is not joining its IO thread properly
> ---------------------------------------------------
>
> Key: KAFKA-4767
> URL: https://issues.apache.org/jira/browse/KAFKA-4767
> Project: Kafka
> Issue Type: Bug
> Components: producer
> Affects Versions: 0.11.0.0
> Reporter: Buğra Gedik
> Priority: Minor
>
> The {{KafkaProducer}} is not properly joining the thread it creates. The code
> is like this:
> {code}
> try {
> this.ioThread.join(timeUnit.toMillis(timeout));
> } catch (InterruptedException t) {
> firstException.compareAndSet(null, t);
> log.error("Interrupted while joining ioThread", t);
> }
> {code}
> If the code is interrupted while performing the join, it will end up leaving
> the io thread running. The correct way of handling this is a follows:
> {code}
> try {
> this.ioThread.join(timeUnit.toMillis(timeout));
> } catch (InterruptedException t) {
> // propagate the interrupt
> this.ioThread.interrupt();
> try {
> this.ioThread.join();
> } catch (InterruptedException t) {
> firstException.compareAndSet(null, t);
> log.error("Interrupted while joining ioThread", t);
> } finally {
> // make sure we maintain the interrupted status
> Thread.currentThread.interrupt();
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)