[ https://issues.apache.org/jira/browse/KAFKA-4767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869382#comment-15869382 ]
huxi commented on KAFKA-4767: ----------------------------- I think I might get the point of what you mean. What you really concerns is that IO thread might got failed to be shut down or leave an inconsistent state if the user thread was interrupted. Am I right? 1. That `this.sender.initiateClose()` is not interruptible which means user thread could always be able to initiate a close to the IO thread even after we interrupt the user thread somewhere. 2. I do agree to restore the interruption status of the user thread after catching the InterruptedException, which is a really good practice. 3. The current logic already considers the situation where user thread does not wait enough time to have the IO thread finish its work, so it adds forceClose and corresponding code to force close the IO thread. In this case, we don't have to explicitly do the same thing again in the catch clause like what you suggest. Do they make any senses? > KafkaProducer is not joining its IO thread properly > --------------------------------------------------- > > Key: KAFKA-4767 > URL: https://issues.apache.org/jira/browse/KAFKA-4767 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 0.11.0.0 > Reporter: Buğra Gedik > Priority: Minor > > The {{KafkaProducer}} is not properly joining the thread it creates. The code > is like this: > {code} > try { > this.ioThread.join(timeUnit.toMillis(timeout)); > } catch (InterruptedException t) { > firstException.compareAndSet(null, t); > log.error("Interrupted while joining ioThread", t); > } > {code} > If the code is interrupted while performing the join, it will end up leaving > the io thread running. The correct way of handling this is a follows: > {code} > try { > this.ioThread.join(timeUnit.toMillis(timeout)); > } catch (InterruptedException t) { > // propagate the interrupt > this.ioThread.interrupt(); > try { > this.ioThread.join(); > } catch (InterruptedException t) { > firstException.compareAndSet(null, t); > log.error("Interrupted while joining ioThread", t); > } finally { > // make sure we maintain the interrupted status > Thread.currentThread.interrupt(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)