radai rosenblatt created KAFKA-9998:
---------------------------------------

             Summary: KafkaProducer.close(timeout) still may block indefinitely
                 Key: KAFKA-9998
                 URL: https://issues.apache.org/jira/browse/KAFKA-9998
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 2.4.1
            Reporter: radai rosenblatt


looking at KafkaProducer.close(timeout), we have this:
{code:java}
private void close(Duration timeout, boolean swallowException) {
    long timeoutMs = timeout.toMillis();
    if (timeoutMs < 0)
        throw new IllegalArgumentException("The timeout cannot be negative.");
    log.info("Closing the Kafka producer with timeoutMillis = {} ms.", 
timeoutMs);

    // this will keep track of the first encountered exception
    AtomicReference<Throwable> firstException = new AtomicReference<>();
    boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
    if (timeoutMs > 0) {
        if (invokedFromCallback) {
            log.warn("Overriding close timeout {} ms to 0 ms in order to 
prevent useless blocking due to self-join. " +
                    "This means you have incorrectly invoked close with a 
non-zero timeout from the producer call-back.",
                    timeoutMs);
        } else {
            // Try to close gracefully.
            if (this.sender != null)
                this.sender.initiateClose();
            if (this.ioThread != null) {
                try {
                    this.ioThread.join(timeoutMs);    <---- GRACEFUL JOIN
                } catch (InterruptedException t) {
                    firstException.compareAndSet(null, new 
InterruptException(t));
                    log.error("Interrupted while joining ioThread", t);
                }
            }
        }
    }

    if (this.sender != null && this.ioThread != null && 
this.ioThread.isAlive()) {
        log.info("Proceeding to force close the producer since pending requests 
could not be completed " +
                "within timeout {} ms.", timeoutMs);
        this.sender.forceClose();
        // Only join the sender thread when not calling from callback.
        if (!invokedFromCallback) {
            try {
                this.ioThread.join();   <----- UNBOUNDED JOIN
            } catch (InterruptedException e) {
                firstException.compareAndSet(null, new InterruptException(e));
            }
        }
    }
...
}

{code}
specifically in our case the ioThread was running a (very) long running 
user-provided callback which was preventing the producer from closing within 
the given timeout.

 

I think the 2nd join() call should either be _VERY_ short (since we're already 
past the timeout at that stage) ir should not happen at all.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to