[ 
https://issues.apache.org/jira/browse/KAFKA-4767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869382#comment-15869382
 ] 

huxi commented on KAFKA-4767:
-----------------------------

I think I might get the point of what you mean. What you really concerns is 
that IO thread might got failed to be shut down or leave an inconsistent state 
if the user thread was interrupted. Am I right?

1. That `this.sender.initiateClose()` is not interruptible which means user 
thread could always be able to initiate a close to the IO thread even after we 
interrupt the user thread somewhere.
2. I do agree to restore the interruption status of the user thread after 
catching the InterruptedException, which is a really good practice.
3. The current logic already considers the situation where user thread does not 
wait enough time to have the IO thread finish its work, so it adds forceClose 
and corresponding code to force close the IO thread. In this case, we don't 
have to explicitly do the same thing again in the catch clause like what you 
suggest.

Do they make any senses?

> KafkaProducer is not joining its IO thread properly
> ---------------------------------------------------
>
>                 Key: KAFKA-4767
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4767
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.11.0.0
>            Reporter: Buğra Gedik
>            Priority: Minor
>
> The {{KafkaProducer}} is not properly joining the thread it creates. The code 
> is like this:
> {code}
> try {
>     this.ioThread.join(timeUnit.toMillis(timeout));
> } catch (InterruptedException t) {
>     firstException.compareAndSet(null, t);
>     log.error("Interrupted while joining ioThread", t);
> }
> {code}
> If the code is interrupted while performing the join, it will end up leaving 
> the io thread running. The correct way of handling this is a follows:
> {code}
> try {
>     this.ioThread.join(timeUnit.toMillis(timeout));
> } catch (InterruptedException t) {
>     // propagate the interrupt
>     this.ioThread.interrupt();
>     try { 
>          this.ioThread.join();
>     } catch (InterruptedException t) {
>         firstException.compareAndSet(null, t);
>         log.error("Interrupted while joining ioThread", t);
>     } finally {
>         // make sure we maintain the interrupted status
>         Thread.currentThread.interrupt();
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to