This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ee0c2ce  MINOR: Fix logged timeout in KafkaProducer.close() (#4623)
ee0c2ce is described below

commit ee0c2cee214b3d46f83562467bb1e4f5165f172c
Author: wangshao <[email protected]>
AuthorDate: Sat Jul 21 18:13:58 2018 +0800

    MINOR: Fix logged timeout in KafkaProducer.close() (#4623)
    
    The log line says `ms`, but the actual value could represent a
    different time unit depending on what the user provided.
    
    Reviewers: Jason Gustafson <[email protected]>, Ismael Juma 
<[email protected]>
---
 .../org/apache/kafka/clients/producer/KafkaProducer.java    | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3a6717b..cb52941 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1078,21 +1078,24 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
         if (timeout < 0)
             throw new IllegalArgumentException("The timeout cannot be 
negative.");
 
-        log.info("Closing the Kafka producer with timeoutMillis = {} ms.", 
timeUnit.toMillis(timeout));
+        long timeoutMs = timeUnit.toMillis(timeout);
+        log.info("Closing the Kafka producer with timeoutMillis = {} ms.", 
timeoutMs);
+
         // this will keep track of the first encountered exception
         AtomicReference<Throwable> firstException = new AtomicReference<>();
         boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
-        if (timeout > 0) {
+        if (timeoutMs > 0) {
             if (invokedFromCallback) {
                 log.warn("Overriding close timeout {} ms to 0 ms in order to 
prevent useless blocking due to self-join. " +
-                        "This means you have incorrectly invoked close with a 
non-zero timeout from the producer call-back.", timeout);
+                        "This means you have incorrectly invoked close with a 
non-zero timeout from the producer call-back.",
+                        timeoutMs);
             } else {
                 // Try to close gracefully.
                 if (this.sender != null)
                     this.sender.initiateClose();
                 if (this.ioThread != null) {
                     try {
-                        this.ioThread.join(timeUnit.toMillis(timeout));
+                        this.ioThread.join(timeoutMs);
                     } catch (InterruptedException t) {
                         firstException.compareAndSet(null, new 
InterruptException(t));
                         log.error("Interrupted while joining ioThread", t);
@@ -1103,7 +1106,7 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
 
         if (this.sender != null && this.ioThread != null && 
this.ioThread.isAlive()) {
             log.info("Proceeding to force close the producer since pending 
requests could not be completed " +
-                    "within timeout {} ms.", timeout);
+                    "within timeout {} ms.", timeoutMs);
             this.sender.forceClose();
             // Only join the sender thread when not calling from callback.
             if (!invokedFromCallback) {

Reply via email to