This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7fc7136  KAFKA-5886; Introduce delivery.timeout.ms producer config 
(KIP-91) (#5270)
7fc7136 is described below

commit 7fc7136ffd28245629f00870fc7f7cbe1fe1c6ce
Author: Yu Yang <[email protected]>
AuthorDate: Thu Jul 26 09:13:50 2018 -0700

    KAFKA-5886; Introduce delivery.timeout.ms producer config (KIP-91) (#5270)
    
    Co-authored-by: Sumant Tambe <[email protected]>
    Co-authored-by: Yu Yang <[email protected]>
    
    Reviewers: Ted Yu <[email protected]>, Apurva Mehta 
<[email protected]>, Jason Gustafson <[email protected]>
---
 .../kafka/clients/producer/KafkaProducer.java      |  58 ++--
 .../kafka/clients/producer/ProducerConfig.java     |  24 +-
 .../clients/producer/internals/ProducerBatch.java  |  95 +++---
 .../producer/internals/RecordAccumulator.java      | 318 +++++++++++----------
 .../kafka/clients/producer/internals/Sender.java   | 188 ++++++++----
 .../apache/kafka/common/config/AbstractConfig.java |   2 +-
 .../org/apache/kafka/common/config/ConfigDef.java  |  11 +-
 .../java/org/apache/kafka/clients/MockClient.java  |   5 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  |   4 +-
 .../producer/internals/ProducerBatchTest.java      |  32 +--
 .../producer/internals/RecordAccumulatorTest.java  | 248 ++++++++++++----
 .../clients/producer/internals/SenderTest.java     | 291 +++++++++++++++----
 .../producer/internals/TransactionManagerTest.java |   6 +-
 .../org/apache/kafka/connect/runtime/Worker.java   |   1 +
 .../kafka/api/BaseProducerSendTest.scala           |  12 +-
 .../kafka/api/PlaintextConsumerTest.scala          |   4 +-
 .../kafka/api/PlaintextProducerSendTest.scala      |   6 +-
 .../kafka/api/ProducerFailureHandlingTest.scala    |   6 +-
 .../server/DynamicBrokerReconfigurationTest.scala  |   4 +-
 .../scala/unit/kafka/server/FetchRequestTest.scala |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   9 +-
 docs/upgrade.html                                  |   5 +
 22 files changed, 900 insertions(+), 431 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3991467..b40b09a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -16,6 +16,17 @@
  */
 package org.apache.kafka.clients.producer;
 
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.KafkaClient;
@@ -24,6 +35,7 @@ import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.producer.internals.BufferPool;
 import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
 import org.apache.kafka.clients.producer.internals.ProducerMetrics;
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
@@ -69,18 +81,6 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
 import static 
org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.ensureExtended;
 
 /**
@@ -235,6 +235,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new 
AtomicInteger(1);
     private static final String JMX_PREFIX = "kafka.producer";
     public static final String NETWORK_THREAD_PREFIX = 
"kafka-producer-network-thread";
+    public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";
 
     private final String clientId;
     // Visible for testing
@@ -392,18 +393,21 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
             int retries = configureRetries(config, transactionManager != null, 
log);
             int maxInflightRequests = configureInflightRequests(config, 
transactionManager != null);
             short acks = configureAcks(config, transactionManager != null, 
log);
+            int deliveryTimeoutMs = configureDeliveryTimeout(config, log);
 
             this.apiVersions = new ApiVersions();
             this.accumulator = new RecordAccumulator(logContext,
                     config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
-                    this.totalMemorySize,
                     this.compressionType,
-                    config.getLong(ProducerConfig.LINGER_MS_CONFIG),
+                    config.getInt(ProducerConfig.LINGER_MS_CONFIG),
                     retryBackoffMs,
+                    deliveryTimeoutMs,
                     metrics,
+                    PRODUCER_METRIC_GROUP_NAME,
                     time,
                     apiVersions,
-                    transactionManager);
+                    transactionManager,
+                    new BufferPool(this.totalMemorySize, 
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, 
PRODUCER_METRIC_GROUP_NAME));
             List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
             if (metadata != null) {
                 this.metadata = metadata;
@@ -459,10 +463,30 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
         }
     }
 
-    private static TransactionManager configureTransactionState(ProducerConfig 
config, LogContext logContext, Logger log) {
+    private static int configureDeliveryTimeout(ProducerConfig config, Logger 
log) {
+        int deliveryTimeoutMs = 
config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG);
+        int lingerMs = config.getInt(ProducerConfig.LINGER_MS_CONFIG);
+        int requestTimeoutMs = 
config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+
+        if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < 
lingerMs + requestTimeoutMs) {
+            if 
(config.originals().containsKey(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)) {
+                // throw an exception if the user explicitly set an 
inconsistent value
+                throw new 
ConfigException(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG
+                    + " should be equal to or larger than " + 
ProducerConfig.LINGER_MS_CONFIG
+                    + " + " + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+            } else {
+                // override deliveryTimeoutMs default value to lingerMs + 
requestTimeoutMs for backward compatibility
+                deliveryTimeoutMs = lingerMs + requestTimeoutMs;
+                log.warn("{} should be equal to or larger than {} + {}. 
Setting it to {}.",
+                    ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
ProducerConfig.LINGER_MS_CONFIG,
+                    ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
deliveryTimeoutMs);
+            }
+        }
+        return deliveryTimeoutMs;
+    }
 
+    private static TransactionManager configureTransactionState(ProducerConfig 
config, LogContext logContext, Logger log) {
         TransactionManager transactionManager = null;
-
         boolean userConfiguredIdempotence = false;
         if 
(config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG))
             userConfiguredIdempotence = true;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 8e7b662..ab55353 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -99,6 +99,19 @@ public class ProducerConfig extends AbstractConfig {
                                                 + "specified time waiting for 
more records to show up. This setting defaults to 0 (i.e. no delay). Setting 
<code>" + LINGER_MS_CONFIG + "=5</code>, "
                                                 + "for example, would have the 
effect of reducing the number of requests sent but would add up to 5ms of 
latency to records sent in the absence of load.";
 
+    /** <code>request.timeout.ms</code> */
+    public static final String REQUEST_TIMEOUT_MS_CONFIG = 
CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
+    private static final String REQUEST_TIMEOUT_MS_DOC = 
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
+        + " This should be larger than replica.lag.time.max.ms (a broker 
configuration)"
+        + " to reduce the possibility of message duplication due to 
unnecessary producer retries.";
+
+    /** <code>delivery.timeout.ms</code> */
+    public static final String DELIVERY_TIMEOUT_MS_CONFIG = 
"delivery.timeout.ms";
+    private static final String DELIVERY_TIMEOUT_MS_DOC = "An upper bound on 
the time to report success or failure after Producer.send() returns. "
+                                                          + "Producer may 
report failure to send a message earlier than this config if all the retries 
are exhausted or "
+                                                          + "a record is added 
to a batch nearing expiration. " + DELIVERY_TIMEOUT_MS_CONFIG + "should be 
equal to or "
+                                                          + "greater than " + 
REQUEST_TIMEOUT_MS_CONFIG + " + " + LINGER_MS_CONFIG;
+
     /** <code>client.id</code> */
     public static final String CLIENT_ID_CONFIG = 
CommonClientConfigs.CLIENT_ID_CONFIG;
 
@@ -188,12 +201,6 @@ public class ProducerConfig extends AbstractConfig {
     public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
     private static final String PARTITIONER_CLASS_DOC = "Partitioner class 
that implements the <code>org.apache.kafka.clients.producer.Partitioner</code> 
interface.";
 
-    /** <code>request.timeout.ms</code> */
-    public static final String REQUEST_TIMEOUT_MS_CONFIG = 
CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
-    private static final String REQUEST_TIMEOUT_MS_DOC = 
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
-                                                        + " This should be 
larger than replica.lag.time.max.ms (a broker configuration)"
-                                                        + " to reduce the 
possibility of message duplication due to unnecessary producer retries.";
-
     /** <code>interceptor.classes</code> */
     public static final String INTERCEPTOR_CLASSES_CONFIG = 
"interceptor.classes";
     public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to 
use as interceptors. "
@@ -224,7 +231,7 @@ public class ProducerConfig extends AbstractConfig {
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, 
Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, 
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 
1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
-                                .define(RETRIES_CONFIG, Type.INT, 0, 
between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
+                                .define(RETRIES_CONFIG, Type.INT, 
Integer.MAX_VALUE, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
                                 .define(ACKS_CONFIG,
                                         Type.STRING,
                                         "1",
@@ -233,7 +240,8 @@ public class ProducerConfig extends AbstractConfig {
                                         ACKS_DOC)
                                 .define(COMPRESSION_TYPE_CONFIG, Type.STRING, 
"none", Importance.HIGH, COMPRESSION_TYPE_DOC)
                                 .define(BATCH_SIZE_CONFIG, Type.INT, 16384, 
atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
-                                .define(LINGER_MS_CONFIG, Type.LONG, 0, 
atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC)
+                                .define(LINGER_MS_CONFIG, Type.INT, 0, 
atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
+                                .define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 
120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
                                 .define(CLIENT_ID_CONFIG, Type.STRING, "", 
Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
                                 .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 
1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
                                 .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 
1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index ea0f0f7..6e08185 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -20,7 +20,6 @@ import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
-import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.CompressionRatioEstimator;
@@ -77,13 +76,13 @@ public final class ProducerBatch {
     private boolean retry;
     private boolean reopened = false;
 
-    public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder 
recordsBuilder, long now) {
-        this(tp, recordsBuilder, now, false);
+    public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder 
recordsBuilder, long createdMs) {
+        this(tp, recordsBuilder, createdMs, false);
     }
 
-    public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder 
recordsBuilder, long now, boolean isSplitBatch) {
-        this.createdMs = now;
-        this.lastAttemptMs = now;
+    public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder 
recordsBuilder, long createdMs, boolean isSplitBatch) {
+        this.createdMs = createdMs;
+        this.lastAttemptMs = createdMs;
         this.recordsBuilder = recordsBuilder;
         this.topicPartition = tp;
         this.lastAppendTime = createdMs;
@@ -158,7 +157,17 @@ public final class ProducerBatch {
     }
 
     /**
-     * Complete the request. If the batch was previously aborted, this is a 
no-op.
+     * Finalize the state of a batch. Final state, once set, is immutable. 
This function may be called
+     * once or twice on a batch. It may be called twice if
+     * 1. An inflight batch expires before a response from the broker is 
received. The batch's final
+     * state is set to FAILED. But it could succeed on the broker and second 
time around batch.done() may
+     * try to set SUCCEEDED final state.
+     * 2. If a transaction abortion happens or if the producer is closed 
forcefully, the final state is
+     * ABORTED but again it could succeed if broker responds with a success.
+     *
+     * Attempted transitions from [FAILED | ABORTED] --> SUCCEEDED are logged.
+     * Attempted transitions from one failure state to the same or a different 
failed state are ignored.
+     * Attempted transitions from SUCCEEDED to the same or a failed state 
throw an exception.
      *
      * @param baseOffset The base offset of the messages assigned by the server
      * @param logAppendTime The log append time or -1 if CreateTime is being 
used
@@ -166,26 +175,34 @@ public final class ProducerBatch {
      * @return true if the batch was completed successfully and false if the 
batch was previously aborted
      */
     public boolean done(long baseOffset, long logAppendTime, RuntimeException 
exception) {
-        final FinalState finalState;
-        if (exception == null) {
+        final FinalState tryFinalState = (exception == null) ? 
FinalState.SUCCEEDED : FinalState.FAILED;
+
+        if (tryFinalState == FinalState.SUCCEEDED) {
             log.trace("Successfully produced messages to {} with base offset 
{}.", topicPartition, baseOffset);
-            finalState = FinalState.SUCCEEDED;
         } else {
-            log.trace("Failed to produce messages to {}.", topicPartition, 
exception);
-            finalState = FinalState.FAILED;
+            log.trace("Failed to produce messages to {} with base offset {}.", 
topicPartition, baseOffset, exception);
+        }
+
+        if (this.finalState.compareAndSet(null, tryFinalState)) {
+            completeFutureAndFireCallbacks(baseOffset, logAppendTime, 
exception);
+            return true;
         }
 
-        if (!this.finalState.compareAndSet(null, finalState)) {
-            if (this.finalState.get() == FinalState.ABORTED) {
-                log.debug("ProduceResponse returned for {} after batch had 
already been aborted.", topicPartition);
-                return false;
+        if (this.finalState.get() != FinalState.SUCCEEDED) {
+            if (tryFinalState == FinalState.SUCCEEDED) {
+                // Log if a previously unsuccessful batch succeeded later on.
+                log.debug("ProduceResponse returned {} for {} after batch with 
base offset {} had already been {}.",
+                    tryFinalState, topicPartition, baseOffset, 
this.finalState.get());
             } else {
-                throw new IllegalStateException("Batch has already been 
completed in final state " + this.finalState.get());
+                // FAILED --> FAILED and ABORTED --> FAILED transitions are 
ignored.
+                log.debug("Ignored state transition {} -> {} for {} batch with 
base offset {}",
+                    this.finalState.get(), tryFinalState, topicPartition, 
baseOffset);
             }
+        } else {
+            // A SUCCESSFUL batch must not attempt another state change.
+            throw new IllegalStateException("A " + this.finalState.get() + " 
batch must not attempt another state change to " + tryFinalState);
         }
-
-        completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
-        return true;
+        return false;
     }
 
     private void completeFutureAndFireCallbacks(long baseOffset, long 
logAppendTime, RuntimeException exception) {
@@ -299,37 +316,12 @@ public final class ProducerBatch {
         return "ProducerBatch(topicPartition=" + topicPartition + ", 
recordCount=" + recordCount + ")";
     }
 
-    /**
-     * A batch whose metadata is not available should be expired if one of the 
following is true:
-     * <ol>
-     *     <li> the batch is not in retry AND request timeout has elapsed 
after it is ready (full or linger.ms has reached).
-     *     <li> the batch is in retry AND request timeout has elapsed after 
the backoff period ended.
-     * </ol>
-     * This methods closes this batch and sets {@code expiryErrorMessage} if 
the batch has timed out.
-     */
-    boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, 
long lingerMs, boolean isFull) {
-        if (!this.inRetry() && isFull && requestTimeoutMs < (now - 
this.lastAppendTime))
-            expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed 
since last append";
-        else if (!this.inRetry() && requestTimeoutMs < (createdTimeMs(now) - 
lingerMs))
-            expiryErrorMessage = (createdTimeMs(now) - lingerMs) + " ms has 
passed since batch creation plus linger time";
-        else if (this.inRetry() && requestTimeoutMs < (waitedTimeMs(now) - 
retryBackoffMs))
-            expiryErrorMessage = (waitedTimeMs(now) - retryBackoffMs) + " ms 
has passed since last attempt plus backoff time";
-
-        boolean expired = expiryErrorMessage != null;
-        if (expired)
-            abortRecordAppends();
-        return expired;
+    boolean hasReachedDeliveryTimeout(long deliveryTimeoutMs, long now) {
+        return deliveryTimeoutMs <= now - this.createdMs;
     }
 
-    /**
-     * If {@link #maybeExpire(int, long, long, long, boolean)} returned true, 
the sender will fail the batch with
-     * the exception returned by this method.
-     * @return An exception indicating the batch expired.
-     */
-    TimeoutException timeoutException() {
-        if (expiryErrorMessage == null)
-            throw new IllegalStateException("Batch has not expired");
-        return new TimeoutException("Expiring " + recordCount + " record(s) 
for " + topicPartition + ": " + expiryErrorMessage);
+    public FinalState finalState() {
+        return this.finalState.get();
     }
 
     int attempts() {
@@ -347,10 +339,6 @@ public final class ProducerBatch {
         return drainedMs - createdMs;
     }
 
-    long createdTimeMs(long nowMs) {
-        return Math.max(0, nowMs - createdMs);
-    }
-
     long waitedTimeMs(long nowMs) {
         return Math.max(0, nowMs - lastAttemptMs);
     }
@@ -467,5 +455,4 @@ public final class ProducerBatch {
     public boolean sequenceHasBeenReset() {
         return reopened;
     }
-
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 31c6d75..964ac3c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -16,6 +16,18 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.common.Cluster;
@@ -34,10 +46,10 @@ import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.CompressionRatioEstimator;
 import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.CopyOnWriteMap;
 import org.apache.kafka.common.utils.LogContext;
@@ -45,20 +57,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  * This class acts as a queue that accumulates records into {@link 
MemoryRecords}
  * instances to be sent to the server.
@@ -76,6 +74,7 @@ public final class RecordAccumulator {
     private final CompressionType compression;
     private final long lingerMs;
     private final long retryBackoffMs;
+    private final long deliveryTimeoutMs;
     private final BufferPool free;
     private final Time time;
     private final ApiVersions apiVersions;
@@ -85,13 +84,13 @@ public final class RecordAccumulator {
     private final Map<TopicPartition, Long> muted;
     private int drainIndex;
     private final TransactionManager transactionManager;
+    private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time 
(absolute) a batch will expire.
 
     /**
      * Create a new record accumulator
      *
      * @param logContext The log context used for logging
      * @param batchSize The size to use when allocating {@link MemoryRecords} 
instances
-     * @param totalSize The maximum memory the record accumulator can use.
      * @param compression The compression codec for the records
      * @param lingerMs An artificial delay time to add before declaring a 
records instance that isn't full ready for
      *        sending. This allows time for more records to arrive. Setting a 
non-zero lingerMs will trade off some
@@ -106,14 +105,16 @@ public final class RecordAccumulator {
      */
     public RecordAccumulator(LogContext logContext,
                              int batchSize,
-                             long totalSize,
                              CompressionType compression,
                              long lingerMs,
                              long retryBackoffMs,
+                             long deliveryTimeoutMs,
                              Metrics metrics,
+                             String metricGrpName,
                              Time time,
                              ApiVersions apiVersions,
-                             TransactionManager transactionManager) {
+                             TransactionManager transactionManager,
+                             BufferPool bufferPool) {
         this.log = logContext.logger(RecordAccumulator.class);
         this.drainIndex = 0;
         this.closed = false;
@@ -123,9 +124,9 @@ public final class RecordAccumulator {
         this.compression = compression;
         this.lingerMs = lingerMs;
         this.retryBackoffMs = retryBackoffMs;
+        this.deliveryTimeoutMs = deliveryTimeoutMs;
         this.batches = new CopyOnWriteMap<>();
-        String metricGrpName = "producer-metrics";
-        this.free = new BufferPool(totalSize, batchSize, metrics, time, 
metricGrpName);
+        this.free = bufferPool;
         this.incomplete = new IncompleteBatches();
         this.muted = new HashMap<>();
         this.time = time;
@@ -227,7 +228,6 @@ public final class RecordAccumulator {
 
                 // Don't deallocate this buffer in the finally block as it's 
being used in the record batch
                 buffer = null;
-
                 return new RecordAppendResult(future, dq.size() > 1 || 
batch.isFull(), true);
             }
         } finally {
@@ -240,7 +240,7 @@ public final class RecordAccumulator {
     private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte 
maxUsableMagic) {
         if (transactionManager != null && maxUsableMagic < 
RecordBatch.MAGIC_VALUE_V2) {
             throw new UnsupportedVersionException("Attempting to use 
idempotence with a broker which does not " +
-                    "support the required message format (v2). The broker must 
be version 0.11 or later.");
+                "support the required message format (v2). The broker must be 
version 0.11 or later.");
         }
         return MemoryRecords.builder(buffer, maxUsableMagic, compression, 
TimestampType.CREATE_TIME, 0L);
     }
@@ -273,37 +273,35 @@ public final class RecordAccumulator {
         return result;
     }
 
+    public void maybeUpdateNextBatchExpiryTime(ProducerBatch batch) {
+        if (batch.createdMs + deliveryTimeoutMs  > 0) {
+            // the non-negative check is to guard us against potential 
overflow due to setting
+            // a large value for deliveryTimeoutMs
+            nextBatchExpiryTimeMs = Math.min(nextBatchExpiryTimeMs, 
batch.createdMs + deliveryTimeoutMs);
+        } else {
+            log.warn("Skipping next batch expiry time update due to addition 
overflow: "
+                + "batch.createMs={}, deliveryTimeoutMs={}", batch.createdMs, 
deliveryTimeoutMs);
+        }
+    }
+
     /**
      * Get a list of batches which have been sitting in the accumulator too 
long and need to be expired.
      */
-    public List<ProducerBatch> expiredBatches(int requestTimeout, long now) {
+    public List<ProducerBatch> expiredBatches(long now) {
         List<ProducerBatch> expiredBatches = new ArrayList<>();
         for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : 
this.batches.entrySet()) {
-            Deque<ProducerBatch> dq = entry.getValue();
-            TopicPartition tp = entry.getKey();
-            // We only check if the batch should be expired if the partition 
does not have a batch in flight.
-            // This is to prevent later batches from being expired while an 
earlier batch is still in progress.
-            // Note that `muted` is only ever populated if 
`max.in.flight.request.per.connection=1` so this protection
-            // is only active in this case. Otherwise the expiration order is 
not guaranteed.
-            if (!isMuted(tp, now)) {
-                synchronized (dq) {
-                    // iterate over the batches and expire them if they have 
been in the accumulator for more than requestTimeOut
-                    ProducerBatch lastBatch = dq.peekLast();
-                    Iterator<ProducerBatch> batchIterator = dq.iterator();
-                    while (batchIterator.hasNext()) {
-                        ProducerBatch batch = batchIterator.next();
-                        boolean isFull = batch != lastBatch || batch.isFull();
-                        // Check if the batch has expired. Expired batches are 
closed by maybeExpire, but callbacks
-                        // are invoked after completing the iterations, since 
sends invoked from callbacks
-                        // may append more batches to the deque being 
iterated. The batch is deallocated after
-                        // callbacks are invoked.
-                        if (batch.maybeExpire(requestTimeout, retryBackoffMs, 
now, this.lingerMs, isFull)) {
-                            expiredBatches.add(batch);
-                            batchIterator.remove();
-                        } else {
-                            // Stop at the first batch that has not expired.
-                            break;
-                        }
+            // expire the batches in the order of sending
+            Deque<ProducerBatch> deque = entry.getValue();
+            synchronized (deque) {
+                while (!deque.isEmpty()) {
+                    ProducerBatch batch = deque.getFirst();
+                    if (batch.hasReachedDeliveryTimeout(deliveryTimeoutMs, 
now)) {
+                        deque.poll();
+                        batch.abortRecordAppends();
+                        expiredBatches.add(batch);
+                    } else {
+                        maybeUpdateNextBatchExpiryTime(batch);
+                        break;
                     }
                 }
             }
@@ -311,8 +309,13 @@ public final class RecordAccumulator {
         return expiredBatches;
     }
 
+    public long getDeliveryTimeoutMs() {
+        return deliveryTimeoutMs;
+    }
+
     /**
-     * Re-enqueue the given record batch in the accumulator to retry
+     * Re-enqueue the given record batch in the accumulator. In 
Sender.completeBatch method, we check
+     * whether the batch has reached deliveryTimeoutMs or not. Hence we do not 
do the delivery timeout check here.
      */
     public void reenqueue(ProducerBatch batch, long now) {
         batch.reenqueued(now);
@@ -356,8 +359,8 @@ public final class RecordAccumulator {
     }
 
     // We will have to do extra work to ensure the queue is in order when 
requests are being retried and there are
-    // multiple requests in flight to that partition. If the first inflight 
request fails to append, then all the subsequent
-    // in flight requests will also fail because the sequence numbers will not 
be accepted.
+    // multiple requests in flight to that partition. If the first in flight 
request fails to append, then all the
+    // subsequent in flight requests will also fail because the sequence 
numbers will not be accepted.
     //
     // Further, once batches are being retried, we are reduced to a single in 
flight request for that partition. So when
     // the subsequent batches come back in sequence order, they will have to 
be placed further back in the queue.
@@ -368,12 +371,12 @@ public final class RecordAccumulator {
     private void insertInSequenceOrder(Deque<ProducerBatch> deque, 
ProducerBatch batch) {
         // When we are requeing and have enabled idempotence, the reenqueued 
batch must always have a sequence.
         if (batch.baseSequence() == RecordBatch.NO_SEQUENCE)
-            throw new IllegalStateException("Trying to reenqueue a batch which 
doesn't have a sequence even " +
-                    "though idempotence is enabled.");
+            throw new IllegalStateException("Trying to re-enqueue a batch 
which doesn't have a sequence even " +
+                "though idempotency is enabled.");
 
         if (transactionManager.nextBatchBySequence(batch.topicPartition) == 
null)
-            throw new IllegalStateException("We are reenqueueing a batch which 
is not tracked as part of the in flight " +
-                    "requests. batch.topicPartition: " + batch.topicPartition 
+ "; batch.baseSequence: " + batch.baseSequence());
+            throw new IllegalStateException("We are re-enqueueing a batch 
which is not tracked as part of the in flight " +
+                "requests. batch.topicPartition: " + batch.topicPartition + "; 
batch.baseSequence: " + batch.baseSequence());
 
         ProducerBatch firstBatchInQueue = deque.peekFirst();
         if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && 
firstBatchInQueue.baseSequence() < batch.baseSequence()) {
@@ -390,7 +393,7 @@ public final class RecordAccumulator {
                 orderedBatches.add(deque.pollFirst());
 
             log.debug("Reordered incoming batch with sequence {} for partition 
{}. It was placed in the queue at " +
-                    "position {}", batch.baseSequence(), batch.topicPartition, 
orderedBatches.size());
+                "position {}", batch.baseSequence(), batch.topicPartition, 
orderedBatches.size());
             // Either we have reached a point where there are batches without 
a sequence (ie. never been drained
             // and are hence in order by default), or the batch at the front 
of the queue has a sequence greater
             // than the incoming batch. This is the right place to add the 
incoming batch.
@@ -466,7 +469,6 @@ public final class RecordAccumulator {
                 }
             }
         }
-
         return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, 
unknownLeaderTopics);
     }
 
@@ -484,6 +486,106 @@ public final class RecordAccumulator {
         return false;
     }
 
+    private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, 
TopicPartition tp) {
+        ProducerIdAndEpoch producerIdAndEpoch = null;
+        if (transactionManager != null) {
+            if (!transactionManager.isSendToPartitionAllowed(tp))
+                return true;
+
+            producerIdAndEpoch = transactionManager.producerIdAndEpoch();
+            if (!producerIdAndEpoch.isValid())
+                // we cannot send the batch until we have refreshed the 
producer id
+                return true;
+
+            if (!first.hasSequence() && 
transactionManager.hasUnresolvedSequence(first.topicPartition))
+                // Don't drain any new batches while the state of previous 
sequence numbers
+                // is unknown. The previous batches would be unknown if they 
were aborted
+                // on the client after being sent to the broker at least once.
+                return true;
+
+            int firstInFlightSequence = 
transactionManager.firstInFlightSequence(first.topicPartition);
+            if (firstInFlightSequence != RecordBatch.NO_SEQUENCE && 
first.hasSequence()
+                && first.baseSequence() != firstInFlightSequence)
+                // If the queued batch already has an assigned sequence, then 
it is being retried.
+                // In this case, we wait until the next immediate batch is 
ready and drain that.
+                // We only move on when the next in line batch is complete 
(either successfully or due to
+                // a fatal broker error). This effectively reduces our in 
flight request count to 1.
+                return true;
+        }
+        return false;
+    }
+
+    private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node 
node, int maxSize, long now) {
+        int size = 0;
+        List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
+        List<ProducerBatch> ready = new ArrayList<>();
+        /* to make starvation less likely this loop doesn't start at 0 */
+        int start = drainIndex = drainIndex % parts.size();
+        do {
+            PartitionInfo part = parts.get(drainIndex);
+            TopicPartition tp = new TopicPartition(part.topic(), 
part.partition());
+            this.drainIndex = (this.drainIndex + 1) % parts.size();
+
+            // Only proceed if the partition has no in-flight batches.
+            if (isMuted(tp, now))
+                continue;
+
+            Deque<ProducerBatch> deque = getDeque(tp);
+            if (deque == null)
+                continue;
+
+            synchronized (deque) {
+                // invariant: !isMuted(tp,now) && deque != null
+                ProducerBatch first = deque.peekFirst();
+                if (first == null)
+                    continue;
+
+                // first != null
+                boolean backoff = first.attempts() > 0 && 
first.waitedTimeMs(now) < retryBackoffMs;
+                // Only drain the batch if it is not during backoff period.
+                if (backoff)
+                    continue;
+
+                if (size + first.estimatedSizeInBytes() > maxSize && 
!ready.isEmpty()) {
+                    // there is a rare case that a single batch size is larger 
than the request size due to
+                    // compression; in this case we will still eventually send 
this batch in a single request
+                    break;
+                } else {
+                    if (shouldStopDrainBatchesForPartition(first, tp))
+                        break;
+
+                    boolean isTransactional = transactionManager != null ? 
transactionManager.isTransactional() : false;
+                    ProducerIdAndEpoch producerIdAndEpoch =
+                        transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
+                    ProducerBatch batch = deque.pollFirst();
+                    if (producerIdAndEpoch != null && !batch.hasSequence()) {
+                        // If the batch already has an assigned sequence, then 
we should not change the producer id and
+                        // sequence number, since this may introduce 
duplicates. In particular, the previous attempt
+                        // may actually have been accepted, and if we change 
the producer id and sequence here, this
+                        // attempt will also be accepted, causing a duplicate.
+                        //
+                        // Additionally, we update the next sequence number 
bound for the partition, and also have
+                        // the transaction manager track the batch so as to 
ensure that sequence ordering is maintained
+                        // even if we receive out of order responses.
+                        batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
+                        
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);
+                        log.debug("Assigned producerId {} and producerEpoch {} 
to batch with base sequence " +
+                                "{} being sent to partition {}", 
producerIdAndEpoch.producerId,
+                            producerIdAndEpoch.epoch, batch.baseSequence(), 
tp);
+
+                        transactionManager.addInFlightBatch(batch);
+                    }
+                    batch.close();
+                    size += batch.records().sizeInBytes();
+                    ready.add(batch);
+
+                    batch.drained(now);
+                }
+            }
+        } while (start != drainIndex);
+        return ready;
+    }
+
     /**
      * Drain all the data for the given nodes and collate them into a list of 
batches that will fit within the specified
      * size on a per-node basis. This method attempts to avoid choosing the 
same topic-node over and over.
@@ -494,106 +596,25 @@ public final class RecordAccumulator {
      * @param now The current unix time in milliseconds
      * @return A list of {@link ProducerBatch} for each node specified with 
total size less than the requested maxSize.
      */
-    public Map<Integer, List<ProducerBatch>> drain(Cluster cluster,
-                                                   Set<Node> nodes,
-                                                   int maxSize,
-                                                   long now) {
+    public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> 
nodes, int maxSize, long now) {
         if (nodes.isEmpty())
             return Collections.emptyMap();
 
         Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
         for (Node node : nodes) {
-            int size = 0;
-            List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
-            List<ProducerBatch> ready = new ArrayList<>();
-            /* to make starvation less likely this loop doesn't start at 0 */
-            int start = drainIndex = drainIndex % parts.size();
-            do {
-                PartitionInfo part = parts.get(drainIndex);
-                TopicPartition tp = new TopicPartition(part.topic(), 
part.partition());
-                // Only proceed if the partition has no in-flight batches.
-                if (!isMuted(tp, now)) {
-                    Deque<ProducerBatch> deque = getDeque(tp);
-                    if (deque != null) {
-                        synchronized (deque) {
-                            ProducerBatch first = deque.peekFirst();
-                            if (first != null) {
-                                boolean backoff = first.attempts() > 0 && 
first.waitedTimeMs(now) < retryBackoffMs;
-                                // Only drain the batch if it is not during 
backoff period.
-                                if (!backoff) {
-                                    if (size + first.estimatedSizeInBytes() > 
maxSize && !ready.isEmpty()) {
-                                        // there is a rare case that a single 
batch size is larger than the request size due
-                                        // to compression; in this case we 
will still eventually send this batch in a single
-                                        // request
-                                        break;
-                                    } else {
-                                        ProducerIdAndEpoch producerIdAndEpoch 
= null;
-                                        boolean isTransactional = false;
-                                        if (transactionManager != null) {
-                                            if 
(!transactionManager.isSendToPartitionAllowed(tp))
-                                                break;
-
-                                            producerIdAndEpoch = 
transactionManager.producerIdAndEpoch();
-                                            if (!producerIdAndEpoch.isValid())
-                                                // we cannot send the batch 
until we have refreshed the producer id
-                                                break;
-
-                                            isTransactional = 
transactionManager.isTransactional();
-
-                                            if (!first.hasSequence() && 
transactionManager.hasUnresolvedSequence(first.topicPartition))
-                                                // Don't drain any new batches 
while the state of previous sequence numbers
-                                                // is unknown. The previous 
batches would be unknown if they were aborted
-                                                // on the client after being 
sent to the broker at least once.
-                                                break;
-
-                                            int firstInFlightSequence = 
transactionManager.firstInFlightSequence(first.topicPartition);
-                                            if (firstInFlightSequence != 
RecordBatch.NO_SEQUENCE && first.hasSequence()
-                                                    && first.baseSequence() != 
firstInFlightSequence)
-                                                // If the queued batch already 
has an assigned sequence, then it is being
-                                                // retried. In this case, we 
wait until the next immediate batch is ready
-                                                // and drain that. We only 
move on when the next in line batch is complete (either successfully
-                                                // or due to a fatal broker 
error). This effectively reduces our
-                                                // in flight request count to 
1.
-                                                break;
-                                        }
-
-                                        ProducerBatch batch = 
deque.pollFirst();
-                                        if (producerIdAndEpoch != null && 
!batch.hasSequence()) {
-                                            // If the batch already has an 
assigned sequence, then we should not change the producer id and
-                                            // sequence number, since this may 
introduce duplicates. In particular,
-                                            // the previous attempt may 
actually have been accepted, and if we change
-                                            // the producer id and sequence 
here, this attempt will also be accepted,
-                                            // causing a duplicate.
-                                            //
-                                            // Additionally, we update the 
next sequence number bound for the partition,
-                                            // and also have the transaction 
manager track the batch so as to ensure
-                                            // that sequence ordering is 
maintained even if we receive out of order
-                                            // responses.
-                                            
batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
-                                            
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);
-                                            log.debug("Assigned producerId {} 
and producerEpoch {} to batch with base sequence " +
-                                                            "{} being sent to 
partition {}", producerIdAndEpoch.producerId,
-                                                    producerIdAndEpoch.epoch, 
batch.baseSequence(), tp);
-
-                                            
transactionManager.addInFlightBatch(batch);
-                                        }
-                                        batch.close();
-                                        size += batch.records().sizeInBytes();
-                                        ready.add(batch);
-                                        batch.drained(now);
-                                    }
-                                }
-                            }
-                        }
-                    }
-                }
-                this.drainIndex = (this.drainIndex + 1) % parts.size();
-            } while (start != drainIndex);
+            List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, 
maxSize, now);
             batches.put(node.id(), ready);
         }
         return batches;
     }
 
+    /**
+     * The earliest absolute time a batch will expire (in milliseconds)
+     */
+    public Long nextExpiryTimeMs() {
+        return this.nextBatchExpiryTimeMs;
+    }
+
     private Deque<ProducerBatch> getDeque(TopicPartition tp) {
         return batches.get(tp);
     }
@@ -784,5 +805,4 @@ public final class RecordAccumulator {
             this.unknownLeaderTopics = unknownLeaderTopics;
         }
     }
-
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index d1a7bc9..7077f15 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
+import java.util.ArrayList;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
@@ -34,6 +35,7 @@ import 
org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -120,6 +122,9 @@ public class Sender implements Runnable {
     /* all the state related to transactions, in particular the producer id, 
producer epoch, and sequence numbers */
     private final TransactionManager transactionManager;
 
+    // A per-partition queue of batches ordered by creation time for tracking 
the in-flight batches
+    private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches;
+
     public Sender(LogContext logContext,
                   KafkaClient client,
                   Metadata metadata,
@@ -149,6 +154,73 @@ public class Sender implements Runnable {
         this.retryBackoffMs = retryBackoffMs;
         this.apiVersions = apiVersions;
         this.transactionManager = transactionManager;
+        this.inFlightBatches = new HashMap<>();
+    }
+
+    public List<ProducerBatch> inFlightBatches(TopicPartition tp) {
+        return inFlightBatches.containsKey(tp) ? inFlightBatches.get(tp) : new 
ArrayList<>();
+    }
+
+    public void maybeRemoveFromInflightBatches(ProducerBatch batch) {
+        List<ProducerBatch> batches = 
inFlightBatches.get(batch.topicPartition);
+        if (batches != null) {
+            batches.remove(batch);
+            if (batches.isEmpty()) {
+                inFlightBatches.remove(batch.topicPartition);
+            }
+        }
+    }
+
+    /**
+     *  Get the in-flight batches that has reached delivery timeout.
+     */
+    private List<ProducerBatch> getExpiredInflightBatches(long now) {
+        List<ProducerBatch> expiredBatches = new ArrayList<>();
+        for (Map.Entry<TopicPartition, List<ProducerBatch>> entry : 
inFlightBatches.entrySet()) {
+            TopicPartition topicPartition = entry.getKey();
+            List<ProducerBatch> partitionInFlightBatches = entry.getValue();
+            if (partitionInFlightBatches != null) {
+                Iterator<ProducerBatch> iter = 
partitionInFlightBatches.iterator();
+                while (iter.hasNext()) {
+                    ProducerBatch batch = iter.next();
+                    if 
(batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now)) {
+                        iter.remove();
+                        // expireBatches is called in Sender.sendProducerData, 
before client.poll.
+                        // The batch.finalState() == null invariant should 
always hold. An IllegalStateException
+                        // exception will be thrown if the invariant is 
violated.
+                        if (batch.finalState() == null) {
+                            expiredBatches.add(batch);
+                        } else {
+                            throw new 
IllegalStateException(batch.topicPartition + " batch created at " +
+                                batch.createdMs + " gets unexpected final 
state " + batch.finalState());
+                        }
+                    } else {
+                        accumulator.maybeUpdateNextBatchExpiryTime(batch);
+                        break;
+                    }
+                }
+                if (partitionInFlightBatches.isEmpty())
+                    inFlightBatches.remove(topicPartition);
+            }
+        }
+        return expiredBatches;
+    }
+
+    private void addToInflightBatches(List<ProducerBatch> batches) {
+        for (ProducerBatch batch : batches) {
+            List<ProducerBatch> inflightBatchList = 
inFlightBatches.get(batch.topicPartition);
+            if (inflightBatchList == null) {
+                inflightBatchList = new ArrayList<>();
+                inFlightBatches.put(batch.topicPartition, inflightBatchList);
+            }
+            inflightBatchList.add(batch);
+        }
+    }
+
+    public void addToInflightBatches(Map<Integer, List<ProducerBatch>> 
batches) {
+        for (List<ProducerBatch> batchList : batches.values()) {
+            addToInflightBatches(batchList);
+        }
     }
 
     /**
@@ -204,12 +276,12 @@ public class Sender implements Runnable {
                 if 
(transactionManager.shouldResetProducerStateAfterResolvingSequences())
                     // Check if the previous run expired batches which 
requires a reset of the producer state.
                     transactionManager.resetProducerId();
-
                 if (!transactionManager.isTransactional()) {
                     // this is an idempotent producer, so make sure we have a 
producer id
                     maybeWaitForProducerId();
                 } else if (transactionManager.hasUnresolvedSequences() && 
!transactionManager.hasFatalError()) {
-                    transactionManager.transitionToFatalError(new 
KafkaException("The client hasn't received acknowledgment for " +
+                    transactionManager.transitionToFatalError(
+                        new KafkaException("The client hasn't received 
acknowledgment for " +
                             "some previously sent messages and can no longer 
retry them. It isn't safe to continue."));
                 } else if 
(transactionManager.hasInFlightTransactionalRequest() || 
maybeSendTransactionalRequest(now)) {
                     // as long as there are outstanding transactional 
requests, we simply wait for them to return
@@ -241,7 +313,6 @@ public class Sender implements Runnable {
 
     private long sendProducerData(long now) {
         Cluster cluster = metadata.fetch();
-
         // get the list of partitions with data ready to send
         RecordAccumulator.ReadyCheckResult result = 
this.accumulator.ready(cluster, now);
 
@@ -253,8 +324,8 @@ public class Sender implements Runnable {
             for (String topic : result.unknownLeaderTopics)
                 this.metadata.add(topic);
 
-            log.debug("Requesting metadata update due to unknown leader topics 
from the batched records: {}", result.unknownLeaderTopics);
-
+            log.debug("Requesting metadata update due to unknown leader topics 
from the batched records: {}",
+                result.unknownLeaderTopics);
             this.metadata.requestUpdate();
         }
 
@@ -270,8 +341,8 @@ public class Sender implements Runnable {
         }
 
         // create produce requests
-        Map<Integer, List<ProducerBatch>> batches = 
this.accumulator.drain(cluster, result.readyNodes,
-                this.maxRequestSize, now);
+        Map<Integer, List<ProducerBatch>> batches = 
this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
+        addToInflightBatches(batches);
         if (guaranteeMessageOrder) {
             // Mute all the partitions drained
             for (List<ProducerBatch> batchList : batches.values()) {
@@ -280,27 +351,34 @@ public class Sender implements Runnable {
             }
         }
 
-        List<ProducerBatch> expiredBatches = 
this.accumulator.expiredBatches(this.requestTimeoutMs, now);
+        List<ProducerBatch> expiredInflightBatches = 
getExpiredInflightBatches(now);
+        List<ProducerBatch> expiredBatches = 
this.accumulator.expiredBatches(now);
+        expiredBatches.addAll(expiredInflightBatches);
+
         // Reset the producer id if an expired batch has previously been sent 
to the broker. Also update the metrics
         // for expired batches. see the documentation of 
@TransactionState.resetProducerId to understand why
         // we need to reset the producer id here.
         if (!expiredBatches.isEmpty())
             log.trace("Expired {} batches in accumulator", 
expiredBatches.size());
         for (ProducerBatch expiredBatch : expiredBatches) {
-            failBatch(expiredBatch, -1, NO_TIMESTAMP, 
expiredBatch.timeoutException(), false);
+            String errorMessage = "Expiring " + expiredBatch.recordCount + " 
record(s) for " + expiredBatch.topicPartition
+                + ":" + (now - expiredBatch.createdMs) + " ms has passed since 
batch creation";
+            failBatch(expiredBatch, -1, NO_TIMESTAMP, new 
TimeoutException(errorMessage), false);
             if (transactionManager != null && expiredBatch.inRetry()) {
                 // This ensures that no new batches are drained until the 
current in flight batches are fully resolved.
                 
transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
             }
         }
-
         sensors.updateProduceRequestMetrics(batches);
 
         // If we have any nodes that are ready to send + have sendable data, 
poll with 0 timeout so this can immediately
-        // loop and try sending more data. Otherwise, the timeout is 
determined by nodes that have partitions with data
-        // that isn't yet sendable (e.g. lingering, backing off). Note that 
this specifically does not include nodes
-        // with sendable data that aren't ready to send since they would cause 
busy looping.
+        // loop and try sending more data. Otherwise, the timeout will be the 
smaller value between next batch expiry
+        // time, and the delay time for checking data availability. Note that 
the nodes may have data that isn't yet
+        // sendable due to lingering, backing off, etc. This specifically does 
not include nodes with sendable data
+        // that aren't ready to send since they would cause busy looping.
         long pollTimeout = Math.min(result.nextReadyCheckDelayMs, 
notReadyTimeout);
+        pollTimeout = Math.min(pollTimeout, 
this.accumulator.nextExpiryTimeMs() - now);
+        pollTimeout = Math.max(pollTimeout, 0);
         if (!result.readyNodes.isEmpty()) {
             log.trace("Nodes with data ready to send: {}", result.readyNodes);
             // if some partitions are already ready to be sent, the select 
time would be 0;
@@ -310,7 +388,6 @@ public class Sender implements Runnable {
             pollTimeout = 0;
         }
         sendProduceRequests(batches, now);
-
         return pollTimeout;
     }
 
@@ -318,7 +395,6 @@ public class Sender implements Runnable {
         if (transactionManager.isCompleting() && accumulator.hasIncomplete()) {
             if (transactionManager.isAborting())
                 accumulator.abortUndrainedBatches(new KafkaException("Failing 
batch since transaction was aborted"));
-
             // There may still be requests left which are being retried. Since 
we do not know whether they had
             // been successfully appended to the broker log, we must resend 
them until their final status is clear.
             // If they had been appended and we did not receive the error, 
then our sequence number would no longer
@@ -341,7 +417,6 @@ public class Sender implements Runnable {
                         
transactionManager.lookupCoordinator(nextRequestHandler);
                         break;
                     }
-
                     if (!NetworkClientUtils.awaitReady(client, targetNode, 
time, requestTimeoutMs)) {
                         
transactionManager.lookupCoordinator(nextRequestHandler);
                         break;
@@ -353,12 +428,10 @@ public class Sender implements Runnable {
                 if (targetNode != null) {
                     if (nextRequestHandler.isRetry())
                         time.sleep(nextRequestHandler.retryBackoffMs());
-
-                    ClientRequest clientRequest = 
client.newClientRequest(targetNode.idString(),
-                            requestBuilder, now, true, requestTimeoutMs, 
nextRequestHandler);
+                    ClientRequest clientRequest = client.newClientRequest(
+                        targetNode.idString(), requestBuilder, now, true, 
requestTimeoutMs, nextRequestHandler);
                     
transactionManager.setInFlightTransactionalRequestCorrelationId(clientRequest.correlationId());
                     log.debug("Sending transactional request {} to node {}", 
requestBuilder, targetNode);
-
                     client.send(clientRequest, now);
                     return true;
                 }
@@ -371,11 +444,9 @@ public class Sender implements Runnable {
                     break;
                 }
             }
-
             time.sleep(retryBackoffMs);
             metadata.requestUpdate();
         }
-
         transactionManager.retry(nextRequestHandler);
         return true;
     }
@@ -442,8 +513,7 @@ public class Sender implements Runnable {
                         break;
                     }
                 } else {
-                    log.debug("Could not find an available broker to send 
InitProducerIdRequest to. " +
-                            "We will back off and try again.");
+                    log.debug("Could not find an available broker to send 
InitProducerIdRequest to. Will back off and retry.");
                 }
             } catch (UnsupportedVersionException e) {
                 transactionManager.transitionToFatalError(e);
@@ -466,7 +536,7 @@ public class Sender implements Runnable {
         int correlationId = requestHeader.correlationId();
         if (response.wasDisconnected()) {
             log.trace("Cancelled request with header {} due to node {} being 
disconnected",
-                    requestHeader, response.destination());
+                requestHeader, response.destination());
             for (ProducerBatch batch : batches.values())
                 completeBatch(batch, new 
ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, 
now, 0L);
         } else if (response.versionMismatch() != null) {
@@ -511,23 +581,25 @@ public class Sender implements Runnable {
                 (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || 
batch.isCompressed())) {
             // If the batch is too large, we split the batch and send the 
split batches again. We do not decrement
             // the retry attempts in this case.
-            log.warn("Got error produce response in correlation id {} on 
topic-partition {}, splitting and retrying ({} attempts left). Error: {}",
-                     correlationId,
-                     batch.topicPartition,
-                     this.retries - batch.attempts(),
-                     error);
+            log.warn(
+                "Got error produce response in correlation id {} on 
topic-partition {}, splitting and retrying ({} attempts left). Error: {}",
+                correlationId,
+                batch.topicPartition,
+                this.retries - batch.attempts(),
+                error);
             if (transactionManager != null)
                 transactionManager.removeInFlightBatch(batch);
             this.accumulator.splitAndReenqueue(batch);
             this.accumulator.deallocate(batch);
             this.sensors.recordBatchSplit();
         } else if (error != Errors.NONE) {
-            if (canRetry(batch, response)) {
-                log.warn("Got error produce response with correlation id {} on 
topic-partition {}, retrying ({} attempts left). Error: {}",
-                        correlationId,
-                        batch.topicPartition,
-                        this.retries - batch.attempts() - 1,
-                        error);
+            if (canRetry(batch, response, now)) {
+                log.warn(
+                    "Got error produce response with correlation id {} on 
topic-partition {}, retrying ({} attempts left). Error: {}",
+                    correlationId,
+                    batch.topicPartition,
+                    this.retries - batch.attempts() - 1,
+                    error);
                 if (transactionManager == null) {
                     reenqueueBatch(batch, now);
                 } else if 
(transactionManager.hasProducerIdAndEpoch(batch.producerId(), 
batch.producerEpoch())) {
@@ -564,14 +636,14 @@ public class Sender implements Runnable {
             if (error.exception() instanceof InvalidMetadataException) {
                 if (error.exception() instanceof 
UnknownTopicOrPartitionException) {
                     log.warn("Received unknown topic or partition error in 
produce request on partition {}. The " +
-                            "topic/partition may not exist or the user may not 
have Describe access to it", batch.topicPartition);
+                            "topic-partition may not exist or the user may not 
have Describe access to it",
+                        batch.topicPartition);
                 } else {
                     log.warn("Received invalid metadata error in produce 
request on partition {} due to {}. Going " +
                             "to request metadata update now", 
batch.topicPartition, error.exception().toString());
                 }
                 metadata.requestUpdate();
             }
-
         } else {
             completeBatch(batch, response);
         }
@@ -583,35 +655,43 @@ public class Sender implements Runnable {
 
     private void reenqueueBatch(ProducerBatch batch, long currentTimeMs) {
         this.accumulator.reenqueue(batch, currentTimeMs);
+        maybeRemoveFromInflightBatches(batch);
         this.sensors.recordRetries(batch.topicPartition.topic(), 
batch.recordCount);
     }
 
     private void completeBatch(ProducerBatch batch, 
ProduceResponse.PartitionResponse response) {
         if (transactionManager != null) {
             if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), 
batch.producerEpoch())) {
-                
transactionManager.maybeUpdateLastAckedSequence(batch.topicPartition, 
batch.baseSequence() + batch.recordCount - 1);
-                log.debug("ProducerId: {}; Set last ack'd sequence number for 
topic-partition {} to {}", batch.producerId(), batch.topicPartition,
-                        
transactionManager.lastAckedSequence(batch.topicPartition));
+                transactionManager
+                    .maybeUpdateLastAckedSequence(batch.topicPartition, 
batch.baseSequence() + batch.recordCount - 1);
+                log.debug("ProducerId: {}; Set last ack'd sequence number for 
topic-partition {} to {}",
+                    batch.producerId(),
+                    batch.topicPartition,
+                    
transactionManager.lastAckedSequence(batch.topicPartition));
             }
             transactionManager.updateLastAckedOffset(response, batch);
             transactionManager.removeInFlightBatch(batch);
         }
 
-        if (batch.done(response.baseOffset, response.logAppendTime, null))
+        if (batch.done(response.baseOffset, response.logAppendTime, null)) {
+            maybeRemoveFromInflightBatches(batch);
             this.accumulator.deallocate(batch);
+        }
     }
 
-    private void failBatch(ProducerBatch batch, 
ProduceResponse.PartitionResponse response, RuntimeException exception, boolean 
adjustSequenceNumbers) {
+    private void failBatch(ProducerBatch batch, 
ProduceResponse.PartitionResponse response, RuntimeException exception,
+                           boolean adjustSequenceNumbers) {
         failBatch(batch, response.baseOffset, response.logAppendTime, 
exception, adjustSequenceNumbers);
     }
 
-    private void failBatch(ProducerBatch batch, long baseOffset, long 
logAppendTime, RuntimeException exception, boolean adjustSequenceNumbers) {
+    private void failBatch(ProducerBatch batch, long baseOffset, long 
logAppendTime, RuntimeException exception,
+        boolean adjustSequenceNumbers) {
         if (transactionManager != null) {
             if (exception instanceof OutOfOrderSequenceException
                     && !transactionManager.isTransactional()
                     && transactionManager.hasProducerId(batch.producerId())) {
                 log.error("The broker returned {} for topic-partition " +
-                                "{} at offset {}. This indicates data loss on 
the broker, and should be investigated.",
+                            "{} at offset {}. This indicates data loss on the 
broker, and should be investigated.",
                         exception, batch.topicPartition, baseOffset);
 
                 // Reset the transaction state since we have hit an 
irrecoverable exception and cannot make any guarantees
@@ -633,19 +713,23 @@ public class Sender implements Runnable {
 
         this.sensors.recordErrors(batch.topicPartition.topic(), 
batch.recordCount);
 
-        if (batch.done(baseOffset, logAppendTime, exception))
+        if (batch.done(baseOffset, logAppendTime, exception)) {
+            maybeRemoveFromInflightBatches(batch);
             this.accumulator.deallocate(batch);
+        }
     }
 
     /**
      * We can retry a send if the error is transient and the number of 
attempts taken is fewer than the maximum allowed.
-     * We can also retry OutOfOrderSequence exceptions for future batches, 
since if the first batch has failed, the future
-     * batches are certain to fail with an OutOfOrderSequence exception.
+     * We can also retry OutOfOrderSequence exceptions for future batches, 
since if the first batch has failed, the
+     * future batches are certain to fail with an OutOfOrderSequence exception.
      */
-    private boolean canRetry(ProducerBatch batch, 
ProduceResponse.PartitionResponse response) {
-        return batch.attempts() < this.retries &&
-                ((response.error.exception() instanceof RetriableException) ||
-                        (transactionManager != null && 
transactionManager.canRetry(response, batch)));
+    private boolean canRetry(ProducerBatch batch, 
ProduceResponse.PartitionResponse response, long now) {
+        return 
!batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now) &&
+            batch.attempts() < this.retries &&
+            batch.finalState() == null &&
+            ((response.error.exception() instanceof RetriableException) ||
+                (transactionManager != null && 
transactionManager.canRetry(response, batch)));
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 1713d78..4f8420b 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -65,7 +65,7 @@ public class AbstractConfig {
             this.values.put(update.getKey(), update.getValue());
         }
         definition.parse(this.values);
-        this.used = Collections.synchronizedSet(new HashSet<String>());
+        this.used = Collections.synchronizedSet(new HashSet<>());
         this.definition = definition;
         if (doLog)
             logAll();
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index c9efb82..12e467c 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -57,7 +57,7 @@ import java.util.Set;
  * Map&lt;String, String&gt; props = new HashMap&lt;&gt();
  * props.put(&quot;config_with_default&quot;, &quot;some value&quot;);
  * props.put(&quot;config_with_dependents&quot;, &quot;some other value&quot;);
- * 
+ *
  * Map&lt;String, Object&gt; configs = defs.parse(props);
  * // will return &quot;some value&quot;
  * String someConfig = (String) configs.get(&quot;config_with_default&quot;);
@@ -595,10 +595,8 @@ public class ConfigDef {
         if (!configKeys.containsKey(name)) {
             return;
         }
-        
         ConfigKey key = configKeys.get(name);
         ConfigValue value = configs.get(name);
-        
         if (key.recommender != null) {
             try {
                 List<Object> recommendedValues = 
key.recommender.validValues(name, parsed);
@@ -845,6 +843,11 @@ public class ConfigDef {
         private final Number min;
         private final Number max;
 
+        /**
+         *  A numeric range with inclusive upper bound and inclusive lower 
bound
+         * @param min  the lower bound
+         * @param max  the upper bound
+         */
         private Range(Number min, Number max) {
             this.min = min;
             this.max = max;
@@ -860,7 +863,7 @@ public class ConfigDef {
         }
 
         /**
-         * A numeric range that checks both the upper and lower bound
+         * A numeric range that checks both the upper (inclusive) and lower 
bound
          */
         public static Range between(Number min, Number max) {
             return new Range(min, max);
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 6b41a9e..a586af8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -280,7 +280,6 @@ public class MockClient implements KafkaClient {
         checkTimeoutOfPendingRequests(now);
 
         List<ClientResponse> copy = new ArrayList<>(this.responses);
-
         if (metadata != null && metadata.updateRequested()) {
             MetadataUpdate metadataUpdate = metadataUpdates.poll();
             if (cluster != null)
@@ -351,7 +350,9 @@ public class MockClient implements KafkaClient {
 
 
     public void respond(AbstractResponse response, boolean disconnected) {
-        ClientRequest request = requests.remove();
+        ClientRequest request = null;
+        if (requests.size() > 0)
+            request = requests.remove();
         short version = request.requestBuilder().latestAllowedVersion();
         responses.add(new ClientResponse(request.makeHeader(version), 
request.callback(), request.destination(),
                 request.createdTimeMs(), time.milliseconds(), disconnected, 
null, null, response));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index c83fe06..634a1ab 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1679,7 +1679,7 @@ public class KafkaConsumerTest {
     }
 
     private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long> 
offsets) {
-        return listOffsetsResponse(offsets, Collections.<TopicPartition, 
Errors>emptyMap());
+        return listOffsetsResponse(offsets, Collections.emptyMap());
     }
 
     private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long> 
partitionOffsets,
@@ -1818,7 +1818,7 @@ public class KafkaConsumerTest {
                 requestTimeoutMs,
                 IsolationLevel.READ_UNCOMMITTED);
 
-        return new KafkaConsumer<String, String>(
+        return new KafkaConsumer<>(
                 loggerFactory,
                 clientId,
                 consumerCoordinator,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
index 2f89d79..6a85449 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
@@ -226,40 +226,30 @@ public class ProducerBatchTest {
     }
 
     /**
-     * A {@link ProducerBatch} configured using a very large linger value and 
a timestamp preceding its create
-     * time is interpreted correctly as not expired when the linger time is 
larger than the difference
-     * between now and create time by {@link ProducerBatch#maybeExpire(int, 
long, long, long, boolean)}.
+     * A {@link ProducerBatch} configured using a timestamp preceding its 
create time is interpreted correctly
+     * as not expired by {@link ProducerBatch#hasReachedDeliveryTimeout(long, 
long)}.
      */
     @Test
-    public void testLargeLingerOldNowExpire() {
+    public void testBatchExpiration() {
+        long deliveryTimeoutMs = 10240;
         ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 
1), memoryRecordsBuilder, now);
         // Set `now` to 2ms before the create time.
-        assertFalse(batch.maybeExpire(10240, 100L, now - 2L, Long.MAX_VALUE, 
false));
+        assertFalse(batch.hasReachedDeliveryTimeout(deliveryTimeoutMs, now - 
2));
+        // Set `now` to deliveryTimeoutMs.
+        assertTrue(batch.hasReachedDeliveryTimeout(deliveryTimeoutMs, now + 
deliveryTimeoutMs));
     }
 
     /**
-     * A {@link ProducerBatch} configured using a very large retryBackoff 
value with retry = true and a timestamp
-     * preceding its create time is interpreted correctly as not expired when 
the retryBackoff time is larger than the
-     * difference between now and create time by {@link 
ProducerBatch#maybeExpire(int, long, long, long, boolean)}.
+     * A {@link ProducerBatch} configured using a timestamp preceding its 
create time is interpreted correctly
+     * * as not expired by {@link 
ProducerBatch#hasReachedDeliveryTimeout(long, long)}.
      */
     @Test
-    public void testLargeRetryBackoffOldNowExpire() {
+    public void testBatchExpirationAfterReenqueue() {
         ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 
1), memoryRecordsBuilder, now);
         // Set batch.retry = true
         batch.reenqueued(now);
         // Set `now` to 2ms before the create time.
-        assertFalse(batch.maybeExpire(10240, Long.MAX_VALUE, now - 2L, 10240L, 
false));
-    }
-
-    /**
-     * A {@link ProducerBatch#maybeExpire(int, long, long, long, boolean)} 
call with a now value before the create
-     * time of the ProducerBatch is correctly recognized as not expired when 
invoked with parameter isFull = true.
-     */
-    @Test
-    public void testLargeFullOldNowExpire() {
-        ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 
1), memoryRecordsBuilder, now);
-        // Set `now` to 2ms before the create time.
-        assertFalse(batch.maybeExpire(10240, 10240L, now - 2L, 10240L, true));
+        assertFalse(batch.hasReachedDeliveryTimeout(10240, now - 2L));
     }
 
     @Test
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 5f48410..13b0d1b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -83,10 +83,9 @@ public class RecordAccumulatorTest {
     private MockTime time = new MockTime();
     private byte[] key = "key".getBytes();
     private byte[] value = "value".getBytes();
-    private int msgSize = DefaultRecord.sizeInBytes(0, 0, key.length, 
value.length,
-            Record.EMPTY_HEADERS);
+    private int msgSize = DefaultRecord.sizeInBytes(0, 0, key.length, 
value.length, Record.EMPTY_HEADERS);
     private Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1, part2, part3),
-            Collections.<String>emptySet(), Collections.<String>emptySet());
+        Collections.emptySet(), Collections.emptySet());
     private Metrics metrics = new Metrics(time);
     private final long maxBlockTimeMs = 1000;
     private final LogContext logContext = new LogContext();
@@ -255,7 +254,7 @@ public class RecordAccumulatorTest {
         final int msgs = 10000;
         final int numParts = 2;
         final RecordAccumulator accum = createTestRecordAccumulator(
-                1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, 
CompressionType.NONE, 0L);
+            1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, 
CompressionType.NONE, 0L);
         List<Thread> threads = new ArrayList<>();
         for (int i = 0; i < numThreads; i++) {
             threads.add(new Thread() {
@@ -299,8 +298,8 @@ public class RecordAccumulatorTest {
         // test case assumes that the records do not fill the batch completely
         int batchSize = 1025;
 
-        RecordAccumulator accum = createTestRecordAccumulator(
-                batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 
batchSize, CompressionType.NONE, lingerMs);
+        RecordAccumulator accum = createTestRecordAccumulator(batchSize + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD,
+                10 * batchSize, CompressionType.NONE, lingerMs);
         // Just short of going over the limit so we trigger linger time
         int appends = expectedNumAppends(batchSize);
 
@@ -332,10 +331,17 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testRetryBackoff() throws Exception {
-        long lingerMs = Long.MAX_VALUE / 4;
-        long retryBackoffMs = Long.MAX_VALUE / 2;
-        final RecordAccumulator accum = new RecordAccumulator(logContext, 1024 
+ DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
-                CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, 
new ApiVersions(), null);
+        long lingerMs = Integer.MAX_VALUE / 16;
+        long retryBackoffMs = Integer.MAX_VALUE / 8;
+        int requestTimeoutMs = Integer.MAX_VALUE / 4;
+        long deliveryTimeoutMs = Integer.MAX_VALUE;
+        long totalSize = 10 * 1024;
+        int batchSize = 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
+        String metricGrpName = "producer-metrics";
+
+        final RecordAccumulator accum = new RecordAccumulator(logContext, 
batchSize,
+            CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, 
metrics, metricGrpName, time, new ApiVersions(), null,
+            new BufferPool(totalSize, batchSize, metrics, time, 
metricGrpName));
 
         long now = time.milliseconds();
         accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs);
@@ -371,7 +377,7 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testFlush() throws Exception {
-        long lingerMs = Long.MAX_VALUE;
+        long lingerMs = Integer.MAX_VALUE;
         final RecordAccumulator accum = createTestRecordAccumulator(
                 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 
1024, CompressionType.NONE, lingerMs);
 
@@ -413,7 +419,7 @@ public class RecordAccumulatorTest {
     @Test
     public void testAwaitFlushComplete() throws Exception {
         RecordAccumulator accum = createTestRecordAccumulator(
-                4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 
1024, CompressionType.NONE, Long.MAX_VALUE);
+            4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, 
CompressionType.NONE, Long.MAX_VALUE);
         accum.append(new TopicPartition(topic, 0), 0L, key, value, 
Record.EMPTY_HEADERS, null, maxBlockTimeMs);
 
         accum.beginFlush();
@@ -429,12 +435,12 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testAbortIncompleteBatches() throws Exception {
-        long lingerMs = Long.MAX_VALUE;
+        int lingerMs = Integer.MAX_VALUE;
         int numRecords = 100;
 
         final AtomicInteger numExceptionReceivedInCallback = new 
AtomicInteger(0);
         final RecordAccumulator accum = createTestRecordAccumulator(
-                128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, 
CompressionType.NONE, lingerMs);
+            128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, 
CompressionType.NONE, lingerMs);
         class TestCallback implements Callback {
             @Override
             public void onCompletion(RecordMetadata metadata, Exception 
exception) {
@@ -468,7 +474,7 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testAbortUnsentBatches() throws Exception {
-        long lingerMs = Long.MAX_VALUE;
+        int lingerMs = Integer.MAX_VALUE;
         int numRecords = 100;
 
         final AtomicInteger numExceptionReceivedInCallback = new 
AtomicInteger(0);
@@ -509,17 +515,65 @@ public class RecordAccumulatorTest {
         assertTrue(accum.hasIncomplete());
     }
 
+    private void doExpireBatchSingle(long deliveryTimeoutMs) throws 
InterruptedException {
+        long lingerMs = 300L;
+        List<Boolean> muteStates = Arrays.asList(false, true);
+        Set<Node> readyNodes = null;
+        List<ProducerBatch> expiredBatches = new ArrayList<>();
+        // test case assumes that the records do not fill the batch completely
+        int batchSize = 1025;
+        RecordAccumulator accum = 
createTestRecordAccumulator(deliveryTimeoutMs,
+            batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 
batchSize, CompressionType.NONE, lingerMs);
+
+        // Make the batches ready due to linger. These batches are not in retry
+        for (Boolean mute: muteStates) {
+            if (time.milliseconds() < System.currentTimeMillis())
+                time.setCurrentTimeMs(System.currentTimeMillis());
+            accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs);
+            assertEquals("No partition should be ready.", 0, 
accum.ready(cluster, time.milliseconds()).readyNodes.size());
+
+            time.sleep(lingerMs);
+            readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+            assertEquals("Our partition's leader should be ready", 
Collections.singleton(node1), readyNodes);
+
+            expiredBatches = accum.expiredBatches(time.milliseconds());
+            assertEquals("The batch should not expire when just linger has 
passed", 0, expiredBatches.size());
+
+            if (mute)
+                accum.mutePartition(tp1);
+            else
+                accum.unmutePartition(tp1, 0L);
+
+            // Advance the clock to expire the batch.
+            time.sleep(deliveryTimeoutMs - lingerMs);
+            expiredBatches = accum.expiredBatches(time.milliseconds());
+            assertEquals("The batch may expire when the partition is muted", 
1, expiredBatches.size());
+            assertEquals("No partitions should be ready.", 0, 
accum.ready(cluster, time.milliseconds()).readyNodes.size());
+        }
+    }
+
+    @Test
+    public void testExpiredBatchSingle() throws InterruptedException {
+        doExpireBatchSingle(3200L);
+    }
+
+    @Test
+    public void testExpiredBatchSingleMaxValue() throws InterruptedException {
+        doExpireBatchSingle(Long.MAX_VALUE);
+    }
+
     @Test
     public void testExpiredBatches() throws InterruptedException {
         long retryBackoffMs = 100L;
-        long lingerMs = 3000L;
+        long lingerMs = 30L;
         int requestTimeout = 60;
+        long deliveryTimeoutMs = 3200L;
 
         // test case assumes that the records do not fill the batch completely
         int batchSize = 1025;
 
         RecordAccumulator accum = createTestRecordAccumulator(
-                batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 
batchSize, CompressionType.NONE, lingerMs);
+            deliveryTimeoutMs, batchSize + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, 
lingerMs);
         int appends = expectedNumAppends(batchSize);
 
         // Test batches not in retry
@@ -532,14 +586,14 @@ public class RecordAccumulatorTest {
         Set<Node> readyNodes = accum.ready(cluster, 
time.milliseconds()).readyNodes;
         assertEquals("Our partition's leader should be ready", 
Collections.singleton(node1), readyNodes);
         // Advance the clock to expire the batch.
-        time.sleep(requestTimeout + 1);
+        time.sleep(deliveryTimeoutMs + 1);
         accum.mutePartition(tp1);
-        List<ProducerBatch> expiredBatches = 
accum.expiredBatches(requestTimeout, time.milliseconds());
-        assertEquals("The batch should not be expired when the partition is 
muted", 0, expiredBatches.size());
+        List<ProducerBatch> expiredBatches = 
accum.expiredBatches(time.milliseconds());
+        assertEquals("The batches will be muted no matter if the partition is 
muted or not", 2, expiredBatches.size());
 
         accum.unmutePartition(tp1, 0L);
-        expiredBatches = accum.expiredBatches(requestTimeout, 
time.milliseconds());
-        assertEquals("The batch should be expired", 1, expiredBatches.size());
+        expiredBatches = accum.expiredBatches(time.milliseconds());
+        assertEquals("All batches should have been expired earlier", 0, 
expiredBatches.size());
         assertEquals("No partitions should be ready.", 0, accum.ready(cluster, 
time.milliseconds()).readyNodes.size());
 
         // Advance the clock to make the next batch ready due to linger.ms
@@ -548,12 +602,12 @@ public class RecordAccumulatorTest {
         time.sleep(requestTimeout + 1);
 
         accum.mutePartition(tp1);
-        expiredBatches = accum.expiredBatches(requestTimeout, 
time.milliseconds());
+        expiredBatches = accum.expiredBatches(time.milliseconds());
         assertEquals("The batch should not be expired when metadata is still 
available and partition is muted", 0, expiredBatches.size());
 
         accum.unmutePartition(tp1, 0L);
-        expiredBatches = accum.expiredBatches(requestTimeout, 
time.milliseconds());
-        assertEquals("The batch should be expired when the partition is not 
muted", 1, expiredBatches.size());
+        expiredBatches = accum.expiredBatches(time.milliseconds());
+        assertEquals("All batches should have been expired", 0, 
expiredBatches.size());
         assertEquals("No partitions should be ready.", 0, accum.ready(cluster, 
time.milliseconds()).readyNodes.size());
 
         // Test batches in retry.
@@ -569,17 +623,17 @@ public class RecordAccumulatorTest {
 
         // test expiration.
         time.sleep(requestTimeout + retryBackoffMs);
-        expiredBatches = accum.expiredBatches(requestTimeout, 
time.milliseconds());
+        expiredBatches = accum.expiredBatches(time.milliseconds());
         assertEquals("The batch should not be expired.", 0, 
expiredBatches.size());
         time.sleep(1L);
 
         accum.mutePartition(tp1);
-        expiredBatches = accum.expiredBatches(requestTimeout, 
time.milliseconds());
+        expiredBatches = accum.expiredBatches(time.milliseconds());
         assertEquals("The batch should not be expired when the partition is 
muted", 0, expiredBatches.size());
 
         accum.unmutePartition(tp1, 0L);
-        expiredBatches = accum.expiredBatches(requestTimeout, 
time.milliseconds());
-        assertEquals("The batch should be expired when the partition is not 
muted.", 1, expiredBatches.size());
+        expiredBatches = accum.expiredBatches(time.milliseconds());
+        assertEquals("All batches should have been expired.", 0, 
expiredBatches.size());
 
         // Test that when being throttled muted batches are expired before the 
throttle time is over.
         accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0);
@@ -589,20 +643,20 @@ public class RecordAccumulatorTest {
         // Advance the clock to expire the batch.
         time.sleep(requestTimeout + 1);
         accum.mutePartition(tp1);
-        expiredBatches = accum.expiredBatches(requestTimeout, 
time.milliseconds());
+        expiredBatches = accum.expiredBatches(time.milliseconds());
         assertEquals("The batch should not be expired when the partition is 
muted", 0, expiredBatches.size());
 
         long throttleTimeMs = 100L;
         accum.unmutePartition(tp1, time.milliseconds() + throttleTimeMs);
         // The batch shouldn't be expired yet.
-        expiredBatches = accum.expiredBatches(requestTimeout, 
time.milliseconds());
+        expiredBatches = accum.expiredBatches(time.milliseconds());
         assertEquals("The batch should not be expired when the partition is 
muted", 0, expiredBatches.size());
 
         // Once the throttle time is over, the batch can be expired.
         time.sleep(throttleTimeMs);
-        expiredBatches = accum.expiredBatches(requestTimeout, 
time.milliseconds());
-        assertEquals("The batch should be expired", 1, expiredBatches.size());
-        assertEquals("No partitions should be ready.", 0, accum.ready(cluster, 
time.milliseconds()).readyNodes.size());
+        expiredBatches = accum.expiredBatches(time.milliseconds());
+        assertEquals("All batches should have been expired earlier", 0, 
expiredBatches.size());
+        assertEquals("No partitions should be ready.", 1, accum.ready(cluster, 
time.milliseconds()).readyNodes.size());
     }
 
     @Test
@@ -646,10 +700,18 @@ public class RecordAccumulatorTest {
         // Simulate talking to an older broker, ie. one which supports a lower 
magic.
         ApiVersions apiVersions = new ApiVersions();
         int batchSize = 1025;
+        int requestTimeoutMs = 1600;
+        long deliveryTimeoutMs = 3200L;
+        long lingerMs = 10L;
+        long retryBackoffMs = 100L;
+        long totalSize = 10 * batchSize;
+        String metricGrpName = "producer-metrics";
+
         apiVersions.update("foobar", NodeApiVersions.create(Arrays.asList(new 
ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id,
                 (short) 0, (short) 2))));
-        RecordAccumulator accum = new RecordAccumulator(logContext, batchSize 
+ DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
-                CompressionType.NONE, 10, 100L, metrics, time, apiVersions, 
new TransactionManager());
+        RecordAccumulator accum = new RecordAccumulator(logContext, batchSize 
+ DefaultRecordBatch.RECORD_BATCH_OVERHEAD,
+            CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, 
metrics, metricGrpName, time, apiVersions, new TransactionManager(),
+            new BufferPool(totalSize, batchSize, metrics, time, 
metricGrpName));
         accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0);
     }
 
@@ -727,9 +789,9 @@ public class RecordAccumulatorTest {
             assertFalse(drained.get(node1.id()).isEmpty());
         }
         assertTrue("All the batches should have been drained.",
-                   accum.ready(cluster, 
time.milliseconds()).readyNodes.isEmpty());
+                accum.ready(cluster, 
time.milliseconds()).readyNodes.isEmpty());
         assertEquals("The split batches should be allocated off the 
accumulator",
-                     bufferCapacity, accum.bufferPoolAvailableMemory());
+                bufferCapacity, accum.bufferPoolAvailableMemory());
     }
 
     @Test
@@ -760,8 +822,78 @@ public class RecordAccumulatorTest {
             numSplit += result.numSplit;
             numBatches += result.numBatches;
             assertTrue(String.format("Total num batches = %d, split batches = 
%d, more than 10%% of the batch splits. "
-                                         + "Random seed is " + seed,
-                                     numBatches, numSplit), (double) numSplit 
/ numBatches < 0.1f);
+                    + "Random seed is " + seed,
+                numBatches, numSplit), (double) numSplit / numBatches < 0.1f);
+        }
+    }
+
+    @Test
+    public void testSoonToExpireBatchesArePickedUpForExpiry() throws 
InterruptedException {
+        long lingerMs = 500L;
+        int batchSize = 1025;
+
+        RecordAccumulator accum = createTestRecordAccumulator(
+            batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 
batchSize, CompressionType.NONE, lingerMs);
+
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs);
+        Set<Node> readyNodes = accum.ready(cluster, 
time.milliseconds()).readyNodes;
+        Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster, 
readyNodes, Integer.MAX_VALUE, time.milliseconds());
+        assertTrue(drained.isEmpty());
+        //assertTrue(accum.soonToExpireInFlightBatches().isEmpty());
+
+        // advanced clock and send one batch out but it should not be included 
in soon to expire inflight
+        // batches because batch's expiry is quite far.
+        time.sleep(lingerMs + 1);
+        readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+        drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, 
time.milliseconds());
+        assertEquals("A batch did not drain after linger", 1, drained.size());
+        //assertTrue(accum.soonToExpireInFlightBatches().isEmpty());
+
+        // Queue another batch and advance clock such that batch expiry time 
is earlier than request timeout.
+        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs);
+        time.sleep(lingerMs * 4);
+
+        // Now drain and check that accumulator picked up the drained batch 
because its expiry is soon.
+        readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+        drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, 
time.milliseconds());
+        assertEquals("A batch did not drain after linger", 1, drained.size());
+    }
+
+    @Test
+    public void testExpiredBatchesRetry() throws InterruptedException {
+        int lingerMs = 3000;
+        int rtt = 1000;
+        int deliveryTimeoutMs = 3200;
+        Set<Node> readyNodes;
+        List<ProducerBatch> expiredBatches;
+        List<Boolean> muteStates = Arrays.asList(false, true);
+
+        // test case assumes that the records do not fill the batch completely
+        int batchSize = 1025;
+        RecordAccumulator accum = createTestRecordAccumulator(
+            batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 
batchSize, CompressionType.NONE, lingerMs);
+
+        // Test batches in retry.
+        for (Boolean mute: muteStates) {
+            accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0);
+            time.sleep(lingerMs);
+            readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+            assertEquals("Our partition's leader should be ready", 
Collections.singleton(node1), readyNodes);
+            Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster, 
readyNodes, Integer.MAX_VALUE, time.milliseconds());
+            assertEquals("There should be only one batch.", 1, 
drained.get(node1.id()).size());
+            time.sleep(rtt);
+            accum.reenqueue(drained.get(node1.id()).get(0), 
time.milliseconds());
+
+            if (mute)
+                accum.mutePartition(tp1);
+            else
+                accum.unmutePartition(tp1, 0L);
+
+            // test expiration
+            time.sleep(deliveryTimeoutMs - rtt);
+            accum.drain(cluster, Collections.singleton(node1), 
Integer.MAX_VALUE, time.milliseconds());
+            expiredBatches = accum.expiredBatches(time.milliseconds());
+            assertEquals("RecordAccumulator has expired batches if the 
partition is not muted", mute  ? 1 : 0, expiredBatches.size());
         }
     }
 
@@ -852,7 +984,7 @@ public class RecordAccumulatorTest {
         int offsetDelta = 0;
         while (true) {
             int recordSize = DefaultRecord.sizeInBytes(offsetDelta, 0, 
key.length, value.length,
-                    Record.EMPTY_HEADERS);
+                Record.EMPTY_HEADERS);
             if (size + recordSize > batchSize)
                 return offsetDelta;
             offsetDelta += 1;
@@ -860,20 +992,32 @@ public class RecordAccumulatorTest {
         }
     }
 
+
+    private RecordAccumulator createTestRecordAccumulator(int batchSize, long 
totalSize, CompressionType type, long lingerMs) {
+        long deliveryTimeoutMs = 3200L;
+        return createTestRecordAccumulator(deliveryTimeoutMs, batchSize, 
totalSize, type, lingerMs);
+    }
+
     /**
      * Return a test RecordAccumulator instance
      */
-    private RecordAccumulator createTestRecordAccumulator(int batchSize, long 
totalSize, CompressionType type, long lingerMs) {
+    private RecordAccumulator createTestRecordAccumulator(long 
deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, long 
lingerMs) {
+        long retryBackoffMs = 100L;
+        int requestTimeoutMs = 1600;
+        String metricGrpName = "producer-metrics";
+
         return new RecordAccumulator(
-                logContext,
-                batchSize,
-                totalSize,
-                type,
-                lingerMs,
-                100L,
-                metrics,
-                time,
-                new ApiVersions(),
-                null);
+            logContext,
+            batchSize,
+            type,
+            lingerMs,
+            retryBackoffMs,
+            deliveryTimeoutMs,
+            metrics,
+            metricGrpName,
+            time,
+            new ApiVersions(),
+            null,
+            new BufferPool(totalSize, batchSize, metrics, time, 
metricGrpName));
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index d87c8f9..2fbe3df 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -16,6 +16,21 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.Metadata;
@@ -62,6 +77,7 @@ import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.DelayedReceive;
 import org.apache.kafka.test.MockSelector;
 import org.apache.kafka.test.TestUtils;
@@ -69,25 +85,10 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class SenderTest {
@@ -131,10 +132,12 @@ public class SenderTest {
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals("We should have a single produce request in flight.", 1, 
client.inFlightRequestCount());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
         assertTrue(client.hasInFlightRequests());
         client.respond(produceResponse(tp0, offset, Errors.NONE, 0));
         sender.run(time.milliseconds());
         assertEquals("All requests completed.", 0, 
client.inFlightRequestCount());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
         assertFalse(client.hasInFlightRequests());
         sender.run(time.milliseconds());
         assertTrue("Request should be completed", future.isDone());
@@ -328,33 +331,42 @@ public class SenderTest {
             Node node = new Node(Integer.parseInt(id), "localhost", 0);
             assertEquals(1, client.inFlightRequestCount());
             assertTrue(client.hasInFlightRequests());
+            assertEquals(1, sender.inFlightBatches(tp0).size());
             assertTrue("Client ready status should be true", 
client.isReady(node, 0L));
             client.disconnect(id);
             assertEquals(0, client.inFlightRequestCount());
             assertFalse(client.hasInFlightRequests());
             assertFalse("Client ready status should be false", 
client.isReady(node, 0L));
+            // the batch is in accumulator.inFlightBatches until it expires
+            assertEquals(1, sender.inFlightBatches(tp0).size());
             sender.run(time.milliseconds()); // receive error
             sender.run(time.milliseconds()); // reconnect
             sender.run(time.milliseconds()); // resend
             assertEquals(1, client.inFlightRequestCount());
             assertTrue(client.hasInFlightRequests());
+            assertEquals(1, sender.inFlightBatches(tp0).size());
             long offset = 0;
             client.respond(produceResponse(tp0, offset, Errors.NONE, 0));
             sender.run(time.milliseconds());
             assertTrue("Request should have retried and completed", 
future.isDone());
             assertEquals(offset, future.get().offset());
+            assertEquals(0, sender.inFlightBatches(tp0).size());
 
             // do an unsuccessful retry
             future = accumulator.append(tp0, 0L, "key".getBytes(), 
"value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
             sender.run(time.milliseconds()); // send produce request
+            assertEquals(1, sender.inFlightBatches(tp0).size());
             for (int i = 0; i < maxRetries + 1; i++) {
                 client.disconnect(client.requests().peek().destination());
                 sender.run(time.milliseconds()); // receive error
+                assertEquals(0, sender.inFlightBatches(tp0).size());
                 sender.run(time.milliseconds()); // reconnect
                 sender.run(time.milliseconds()); // resend
+                assertEquals(i > 0 ? 0 : 1, 
sender.inFlightBatches(tp0).size());
             }
             sender.run(time.milliseconds());
             assertFutureFailure(future, NetworkException.class);
+            assertEquals(0, sender.inFlightBatches(tp0).size());
         } finally {
             m.close();
         }
@@ -371,7 +383,7 @@ public class SenderTest {
                     senderMetrics, time, REQUEST_TIMEOUT, 50, null, 
apiVersions);
             // Create a two broker cluster, with partition 0 on broker 0 and 
partition 1 on broker 1
             Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
-            metadata.update(cluster1, Collections.<String>emptySet(), 
time.milliseconds());
+            metadata.update(cluster1, Collections.emptySet(), 
time.milliseconds());
 
             // Send the first message.
             TopicPartition tp2 = new TopicPartition("test", 1);
@@ -384,6 +396,7 @@ public class SenderTest {
             assertEquals(1, client.inFlightRequestCount());
             assertTrue(client.hasInFlightRequests());
             assertTrue("Client ready status should be true", 
client.isReady(node, 0L));
+            assertEquals(1, sender.inFlightBatches(tp2).size());
 
             time.sleep(900);
             // Now send another message to tp2
@@ -391,11 +404,13 @@ public class SenderTest {
 
             // Update metadata before sender receives response from broker 0. 
Now partition 2 moves to broker 0
             Cluster cluster2 = TestUtils.singletonCluster("test", 2);
-            metadata.update(cluster2, Collections.<String>emptySet(), 
time.milliseconds());
+            metadata.update(cluster2, Collections.emptySet(), 
time.milliseconds());
             // Sender should not send the second message to node 0.
-            sender.run(time.milliseconds());
+            assertEquals(1, sender.inFlightBatches(tp2).size());
+            sender.run(time.milliseconds());  // receive the response for the 
previous send, and send the new batch
             assertEquals(1, client.inFlightRequestCount());
             assertTrue(client.hasInFlightRequests());
+            assertEquals(1, sender.inFlightBatches(tp2).size());
         } finally {
             m.close();
         }
@@ -429,14 +444,18 @@ public class SenderTest {
 
         // Advance the clock to expire the first batch.
         time.sleep(10000);
+
+        Node clusterNode = this.cluster.nodes().get(0);
+        Map<Integer, List<ProducerBatch>> drainedBatches =
+            accumulator.drain(cluster, Collections.singleton(clusterNode), 
Integer.MAX_VALUE, time.milliseconds());
+        sender.addToInflightBatches(drainedBatches);
+
         // Disconnect the target node for the pending produce request. This 
will ensure that sender will try to
         // expire the batch.
-        Node clusterNode = this.cluster.nodes().get(0);
         client.disconnect(clusterNode.idString());
         client.blackout(clusterNode, 100);
 
         sender.run(time.milliseconds());  // We should try to flush the batch, 
but we expire it instead without sending anything.
-
         assertEquals("Callbacks not invoked for expiry", messagesPerBatch, 
expiryCallbackCount.get());
         assertNull("Unexpected exception", unexpectedException.get());
         // Make sure that the reconds were appended back to the batch.
@@ -463,6 +482,7 @@ public class SenderTest {
         sender.run(time.milliseconds());
         assertEquals("Request completed.", 0, client.inFlightRequestCount());
         assertFalse(client.hasInFlightRequests());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
         sender.run(time.milliseconds());
         assertTrue("Request should be completed", future.isDone());
 
@@ -479,6 +499,7 @@ public class SenderTest {
         sender.run(time.milliseconds());
         assertEquals("Request completed.", 0, client.inFlightRequestCount());
         assertFalse(client.hasInFlightRequests());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
         sender.run(time.milliseconds());
         assertTrue("Request should be completed", future.isDone());
     }
@@ -520,6 +541,7 @@ public class SenderTest {
         Node node = new Node(Integer.parseInt(id), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
         assertTrue(client.hasInFlightRequests());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
         assertTrue("Client ready status should be true", client.isReady(node, 
0L));
         assertFalse(future.isDone());
 
@@ -583,6 +605,7 @@ public class SenderTest {
         sender.run(time.milliseconds()); // receive response 1
         assertEquals(1, transactionManager.lastAckedSequence(tp0));
         assertFalse(client.hasInFlightRequests());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
         assertTrue(request2.isDone());
         assertEquals(1, request2.get().offset());
     }
@@ -654,11 +677,12 @@ public class SenderTest {
         assertEquals(0, transactionManager.lastAckedSequence(tp0));
         assertTrue(request1.isDone());
         assertEquals(0, request1.get().offset());
-
-
         assertFalse(client.hasInFlightRequests());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
+
         sender.run(time.milliseconds()); // send request 2;
         assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
 
         sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);
         sender.run(time.milliseconds());  // receive response 2
@@ -667,17 +691,19 @@ public class SenderTest {
         assertEquals(1, request2.get().offset());
 
         assertFalse(client.hasInFlightRequests());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
 
         sender.run(time.milliseconds()); // send request 3
         assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
 
         sendIdempotentProducerResponse(2, tp0, Errors.NONE, 2L);
         sender.run(time.milliseconds());  // receive response 3, send request 
4 since we are out of 'retry' mode.
         assertEquals(2, transactionManager.lastAckedSequence(tp0));
         assertTrue(request3.isDone());
         assertEquals(2, request3.get().offset());
-
         assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
 
         sendIdempotentProducerResponse(3, tp0, Errors.NONE, 3L);
         sender.run(time.milliseconds());  // receive response 4
@@ -795,7 +821,6 @@ public class SenderTest {
         setupWithTransactionState(transactionManager);
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
-
         assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
 
         // Send first ProduceRequest
@@ -965,46 +990,54 @@ public class SenderTest {
     public void 
testExpiryOfFirstBatchShouldNotCauseUnresolvedSequencesIfFutureBatchesSucceed() 
throws Exception {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
-        setupWithTransactionState(transactionManager);
+        setupWithTransactionState(transactionManager, false, null);
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
-
         assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, 
MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());  // send request
+        // We separate the two appends by 1 second so that the two batches
+        // don't expire at the same time.
+        time.sleep(1000L);
 
         Future<RecordMetadata> request2 = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, 
MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());  // send request
-
         assertEquals(2, client.inFlightRequestCount());
+        assertEquals(2, sender.inFlightBatches(tp0).size());
 
         sendIdempotentProducerResponse(0, tp0, Errors.REQUEST_TIMED_OUT, -1);
         sender.run(time.milliseconds());  // receive first response
+        assertEquals(1, sender.inFlightBatches(tp0).size());
 
         Node node = this.cluster.nodes().get(0);
-        time.sleep(10000L);
+        // We add 600 millis to expire the first batch but not the second.
+        // Note deliveryTimeoutMs is 1500.
+        time.sleep(600L);
         client.disconnect(node.idString());
         client.blackout(node, 10);
 
         sender.run(time.milliseconds()); // now expire the first batch.
         assertFutureFailure(request1, TimeoutException.class);
         assertTrue(transactionManager.hasUnresolvedSequence(tp0));
+        assertEquals(0, sender.inFlightBatches(tp0).size());
+
         // let's enqueue another batch, which should not be dequeued until the 
unresolved state is clear.
         Future<RecordMetadata> request3 = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, 
MAX_BLOCK_TIMEOUT).future;
-
         time.sleep(20);
-
         assertFalse(request2.isDone());
 
         sender.run(time.milliseconds());  // send second request
         sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1);
+        assertEquals(1, sender.inFlightBatches(tp0).size());
+
         sender.run(time.milliseconds()); // receive second response, the third 
request shouldn't be sent since we are in an unresolved state.
         assertTrue(request2.isDone());
         assertEquals(1, request2.get().offset());
-        Deque<ProducerBatch> batches = accumulator.batches().get(tp0);
+        assertEquals(0, sender.inFlightBatches(tp0).size());
 
+        Deque<ProducerBatch> batches = accumulator.batches().get(tp0);
         assertEquals(1, batches.size());
         assertFalse(batches.peekFirst().hasSequence());
         assertFalse(client.hasInFlightRequests());
@@ -1017,6 +1050,7 @@ public class SenderTest {
         assertEquals(0, batches.size());
         assertEquals(1, client.inFlightRequestCount());
         assertFalse(request3.isDone());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
     }
 
     @Test
@@ -1026,13 +1060,13 @@ public class SenderTest {
         setupWithTransactionState(transactionManager);
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
-
         assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, 
MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());  // send request
 
+        time.sleep(1000L);
         Future<RecordMetadata> request2 = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, 
MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());  // send request
 
@@ -1042,7 +1076,7 @@ public class SenderTest {
         sender.run(time.milliseconds());  // receive first response
 
         Node node = this.cluster.nodes().get(0);
-        time.sleep(10000L);
+        time.sleep(1000L);
         client.disconnect(node.idString());
         client.blackout(node, 10);
 
@@ -1053,9 +1087,7 @@ public class SenderTest {
         Future<RecordMetadata> request3 = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, 
MAX_BLOCK_TIMEOUT).future;
 
         time.sleep(20);
-
         assertFalse(request2.isDone());
-
         sender.run(time.milliseconds());  // send second request
         sendIdempotentProducerResponse(1, tp0, 
Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 1);
         sender.run(time.milliseconds()); // receive second response, the third 
request shouldn't be sent since we are in an unresolved state.
@@ -1087,12 +1119,12 @@ public class SenderTest {
         Future<RecordMetadata> request1 = accumulator.append(tp0, 0L, 
"key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());  // send request
         sendIdempotentProducerResponse(0, tp0, 
Errors.NOT_LEADER_FOR_PARTITION, -1);
-        sender.run(time.milliseconds());  // receive response
 
+        sender.run(time.milliseconds());  // receive response
         assertEquals(1L, transactionManager.sequenceNumber(tp0).longValue());
 
         Node node = this.cluster.nodes().get(0);
-        time.sleep(10000L);
+        time.sleep(15000L);
         client.disconnect(node.idString());
         client.blackout(node, 10);
 
@@ -1520,7 +1552,6 @@ public class SenderTest {
                 RecordBatch firstBatch = batchIterator.next();
                 assertFalse(batchIterator.hasNext());
                 assertEquals(expectedSequence, firstBatch.baseSequence());
-
                 return true;
             }
         }, produceResponse(tp, responseOffset, responseError, 0, 
logStartOffset));
@@ -1754,11 +1785,13 @@ public class SenderTest {
         sender.run(time.milliseconds());  // send.
 
         assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
 
         client.respond(produceResponse(tp0, 0, 
Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 0));
 
         sender.run(time.milliseconds());
         assertTrue(responseFuture.isDone());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
         assertFalse("Expected transaction state to be reset upon receiving an 
OutOfOrderSequenceException", transactionManager.hasProducerId());
     }
 
@@ -1794,11 +1827,15 @@ public class SenderTest {
                                        TopicPartition tp) throws Exception {
         int maxRetries = 1;
         String topic = tp.topic();
+        long deliveryTimeoutMs = 3000L;
+        long totalSize = 1024 * 1024;
+        String metricGrpName = "producer-metrics";
         // Set a good compression ratio.
         CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 
0.2f);
         try (Metrics m = new Metrics()) {
-            accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 
1024, CompressionType.GZIP, 0L, 0L, m, time,
-                    new ApiVersions(), txnManager);
+            accumulator = new RecordAccumulator(logContext, batchSize, 
CompressionType.GZIP,
+                0L, 0L, deliveryTimeoutMs, m, metricGrpName, time, new 
ApiVersions(), txnManager,
+                new BufferPool(totalSize, batchSize, metrics, time, 
"producer-internal-metrics"));
             SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
             Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                     senderMetrics, time, REQUEST_TIMEOUT, 1000L, txnManager, 
new ApiVersions());
@@ -1865,9 +1902,153 @@ public class SenderTest {
             assertEquals("The last ack'd sequence number should be 1", 1, 
txnManager.lastAckedSequence(tp));
             assertEquals("Offset of the first message should be 1", 1L, 
f2.get().offset());
             assertTrue("There should be no batch in the accumulator", 
accumulator.batches().get(tp).isEmpty());
+            assertTrue("There should be a split", 
m.metrics().get(senderMetrics.batchSplitRate).value() > 0);
+        }
+    }
+
+    @Test
+    public void testNoDoubleDeallocation() throws Exception {
+        long deliverTimeoutMs = 1500L;
+        long totalSize = 1024 * 1024;
+        String metricGrpName = "producer-custom-metrics";
+        MatchingBufferPool pool = new MatchingBufferPool(totalSize, batchSize, 
metrics, time, metricGrpName);
+        setupWithTransactionState(null, false, pool);
 
-            assertTrue("There should be a split",
-                    m.metrics().get(senderMetrics.batchSplitRate).value() > 0);
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 =
+            accumulator.append(tp0, time.milliseconds(), "key".getBytes(), 
"value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // send request
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
+
+        time.sleep(deliverTimeoutMs);
+        assertFalse(pool.allMatch());
+
+        sender.run(time.milliseconds());  // expire the batch
+        assertTrue(request1.isDone());
+        assertTrue("The batch should have been de-allocated", pool.allMatch());
+        assertTrue(pool.allMatch());
+
+        sender.run(time.milliseconds());
+        assertTrue("The batch should have been de-allocated", pool.allMatch());
+        assertEquals(0, client.inFlightRequestCount());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
+    }
+
+    @Test
+    public void testInflightBatchesExpireOnDeliveryTimeout() throws 
InterruptedException {
+        long deliveryTimeoutMs = 1500L;
+        setupWithTransactionState(null, true, null);
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, 
MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // send request
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals("Expect one in-flight batch in accumulator", 1, 
sender.inFlightBatches(tp0).size());
+
+        Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = 
new HashMap<>();
+        responseMap.put(tp0, new 
ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
+        client.respond(new ProduceResponse(responseMap));
+
+        time.sleep(deliveryTimeoutMs);
+        sender.run(time.milliseconds());  // receive first response
+        assertEquals("Expect zero in-flight batch in accumulator", 0, 
sender.inFlightBatches(tp0).size());
+        try {
+            request.get();
+            fail("The expired batch should throw a TimeoutException");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        }
+    }
+
+    @Test
+    public void testWhenFirstBatchExpireNoSendSecondBatchIfGuaranteeOrder() 
throws InterruptedException {
+        long deliveryTimeoutMs = 1500L;
+        setupWithTransactionState(null, true, null);
+
+        // Send first ProduceRequest
+        accumulator.append(tp0, time.milliseconds(), "key".getBytes(), 
"value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
+        sender.run(time.milliseconds());  // send request
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
+
+        time.sleep(deliveryTimeoutMs / 2);
+
+        // Send second ProduceRequest
+        accumulator.append(tp0, time.milliseconds(), "key".getBytes(), 
"value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
+        sender.run(time.milliseconds());  // must not send request because the 
partition is muted
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
+
+        time.sleep(deliveryTimeoutMs / 2); // expire the first batch only
+
+        client.respond(produceResponse(tp0, 0L, Errors.NONE, 0, 0L));
+        sender.run(time.milliseconds());  // receive response (offset=0)
+        assertEquals(0, client.inFlightRequestCount());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
+
+        sender.run(time.milliseconds());  // Drain the second request only 
this time
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, sender.inFlightBatches(tp0).size());
+    }
+
+    @Test
+    public void testExpiredBatchDoesNotRetry() throws Exception {
+        long deliverTimeoutMs = 1500L;
+        setupWithTransactionState(null, false, null);
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 =
+            accumulator.append(tp0, time.milliseconds(), "key".getBytes(), 
"value".getBytes(), null, null,
+                MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // send request
+        assertEquals(1, client.inFlightRequestCount());
+        time.sleep(deliverTimeoutMs);
+
+        Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = 
new HashMap<>();
+        responseMap.put(tp0, new 
ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
+        client.respond(produceResponse(tp0, -1, 
Errors.NOT_LEADER_FOR_PARTITION, -1)); // return a retriable error
+
+        sender.run(time.milliseconds());  // expire the batch
+        assertTrue(request1.isDone());
+        assertEquals(0, client.inFlightRequestCount());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
+
+        sender.run(time.milliseconds()); // receive first response and do not 
reenqueue.
+        assertEquals(0, client.inFlightRequestCount());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
+
+        sender.run(time.milliseconds()); // run again and must not send 
anything.
+        assertEquals(0, client.inFlightRequestCount());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
+    }
+
+    private class MatchingBufferPool extends BufferPool {
+        IdentityHashMap<ByteBuffer, Boolean> allocatedBuffers;
+
+        MatchingBufferPool(long totalSize, int batchSize, Metrics metrics, 
Time time, String metricGrpName) {
+            super(totalSize, batchSize, metrics, time, metricGrpName);
+            allocatedBuffers = new IdentityHashMap<>();
+        }
+
+        @Override
+        public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws 
InterruptedException {
+            ByteBuffer buffer = super.allocate(size, maxTimeToBlockMs);
+            allocatedBuffers.put(buffer, Boolean.TRUE);
+            return buffer;
+        }
+
+        @Override
+        public void deallocate(ByteBuffer buffer, int size) {
+            if (!allocatedBuffers.containsKey(buffer)) {
+                throw new IllegalStateException("Deallocating a buffer that is 
not allocated");
+            }
+            allocatedBuffers.remove(buffer);
+            super.deallocate(buffer, size);
+        }
+
+        public boolean allMatch() {
+            return allocatedBuffers.isEmpty();
         }
     }
 
@@ -1931,17 +2112,29 @@ public class SenderTest {
     }
 
     private void setupWithTransactionState(TransactionManager 
transactionManager) {
+        setupWithTransactionState(transactionManager, false, null);
+    }
+
+    private void setupWithTransactionState(TransactionManager 
transactionManager, boolean guaranteeOrder, BufferPool customPool) {
+        long totalSize = 1024 * 1024;
+        String metricGrpName = "producer-metrics";
         Map<String, String> metricTags = new LinkedHashMap<>();
         metricTags.put("client-id", CLIENT_ID);
         MetricConfig metricConfig = new MetricConfig().tags(metricTags);
         this.metrics = new Metrics(metricConfig, time);
-        this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 
1024, CompressionType.NONE, 0L, 0L, metrics, time,
-                apiVersions, transactionManager);
-        this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
+        BufferPool pool = (customPool == null) ? new BufferPool(totalSize, 
batchSize, metrics, time, metricGrpName) : customPool;
+        setupWithTransactionState(transactionManager, guaranteeOrder, 
metricTags, pool);
+    }
 
-        this.sender = new Sender(logContext, this.client, this.metadata, 
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
-                Integer.MAX_VALUE, this.senderMetricsRegistry, this.time, 
REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
-        this.metadata.update(this.cluster, Collections.<String>emptySet(), 
time.milliseconds());
+    private void setupWithTransactionState(TransactionManager 
transactionManager, boolean guaranteeOrder, Map<String, String> metricTags, 
BufferPool pool) {
+        long deliveryTimeoutMs = 1500L;
+        String metricGrpName = "producer-metrics";
+        this.accumulator = new RecordAccumulator(logContext, batchSize, 
CompressionType.NONE, 0L, 0L,
+            deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, 
transactionManager, pool);
+        this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
+        this.sender = new Sender(logContext, this.client, this.metadata, 
this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL,
+            Integer.MAX_VALUE, this.senderMetricsRegistry, this.time, 
REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+        this.metadata.update(this.cluster, Collections.emptySet(), 
time.milliseconds());
     }
 
     private void assertSendFailure(Class<? extends RuntimeException> 
expectedError) throws Exception {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 558ec72..550d003 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -118,6 +118,10 @@ public class TransactionManagerTest {
         Map<String, String> metricTags = new LinkedHashMap<>();
         metricTags.put("client-id", CLIENT_ID);
         int batchSize = 16 * 1024;
+        int requestTimeoutMs = 1500;
+        long deliveryTimeoutMs = 3000L;
+        long totalSize = 1024 * 1024;
+        String metricGrpName = "producer-metrics";
         MetricConfig metricConfig = new MetricConfig().tags(metricTags);
         this.brokerNode = new Node(0, "localhost", 2211);
         this.transactionManager = new TransactionManager(logContext, 
transactionalId, transactionTimeoutMs,
@@ -125,7 +129,7 @@ public class TransactionManagerTest {
         Metrics metrics = new Metrics(metricConfig, time);
         SenderMetricsRegistry senderMetrics = new 
SenderMetricsRegistry(metrics);
 
-        this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 
1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, 
transactionManager);
+        this.accumulator = new RecordAccumulator(logContext, batchSize, 
CompressionType.NONE, 0L, 0L, deliveryTimeoutMs, metrics, metricGrpName, time, 
apiVersions, transactionManager, new BufferPool(totalSize, batchSize, metrics, 
time, metricGrpName));
         this.sender = new Sender(logContext, this.client, this.metadata, 
this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
                 MAX_RETRIES, senderMetrics, this.time, REQUEST_TIMEOUT, 50, 
transactionManager, apiVersions);
         this.metadata.update(this.cluster, Collections.<String>emptySet(), 
time.milliseconds());
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 7291d4f..1f62103 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -139,6 +139,7 @@ public class Worker {
         producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 
Long.toString(Long.MAX_VALUE));
         producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
         
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
+        producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.toString(Integer.MAX_VALUE));
         // User-specified overrides
         producerProps.putAll(config.originalsWithPrefix("producer."));
     }
diff --git 
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index dc4041f..739675e 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -68,7 +68,7 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
     super.tearDown()
   }
 
-  protected def createProducer(brokerList: String, retries: Int = 0, lingerMs: 
Long = 0, props: Option[Properties] = None): 
KafkaProducer[Array[Byte],Array[Byte]] = {
+  protected def createProducer(brokerList: String, retries: Int = 0, lingerMs: 
Int = 0, props: Option[Properties] = None): 
KafkaProducer[Array[Byte],Array[Byte]] = {
     val producer = TestUtils.createProducer(brokerList, securityProtocol = 
securityProtocol, trustStoreFile = trustStoreFile,
       saslProperties = clientSaslProperties, retries = retries, lingerMs = 
lingerMs, props = props)
     registerProducer(producer)
@@ -170,13 +170,13 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
   def testSendCompressedMessageWithCreateTime() {
     val producerProps = new Properties()
     producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
-    val producer = createProducer(brokerList = brokerList, lingerMs = 
Long.MaxValue, props = Some(producerProps))
+    val producer = createProducer(brokerList = brokerList, lingerMs = 
Int.MaxValue, props = Some(producerProps))
     sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
   }
 
   @Test
   def testSendNonCompressedMessageWithCreateTime() {
-    val producer = createProducer(brokerList = brokerList, lingerMs = 
Long.MaxValue)
+    val producer = createProducer(brokerList = brokerList, lingerMs = 
Int.MaxValue)
     sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
   }
 
@@ -409,7 +409,7 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
    */
   @Test
   def testFlush() {
-    val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
+    val producer = createProducer(brokerList, lingerMs = Int.MaxValue)
     try {
       createTopic(topic, 2, 2)
       val record = new ProducerRecord[Array[Byte], Array[Byte]](topic,
@@ -438,7 +438,7 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
 
     // Test closing from caller thread.
     for (_ <- 0 until 50) {
-      val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
+      val producer = createProducer(brokerList, lingerMs = Int.MaxValue)
       val responses = (0 until numRecords) map (_ => producer.send(record0))
       assertTrue("No request is complete.", responses.forall(!_.isDone()))
       producer.close(0, TimeUnit.MILLISECONDS)
@@ -478,7 +478,7 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
       }
     }
     for (i <- 0 until 50) {
-      val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
+      val producer = createProducer(brokerList, lingerMs = Int.MaxValue)
       try {
         // send message to partition 0
         // Only send the records in the first callback since we close the 
producer in the callback and no records
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 02396dd..ba4df7d 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -617,9 +617,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   private def sendCompressedMessages(numRecords: Int, tp: TopicPartition) {
     val producerProps = new Properties()
     producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, 
CompressionType.GZIP.name)
-    producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, 
Long.MaxValue.toString)
+    producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, 
Int.MaxValue.toString)
     val producer = TestUtils.createProducer(brokerList, securityProtocol = 
securityProtocol, trustStoreFile = trustStoreFile,
-        saslProperties = clientSaslProperties, retries = 0, lingerMs = 
Long.MaxValue, props = Some(producerProps))
+        saslProperties = clientSaslProperties, retries = 0, lingerMs = 
Int.MaxValue, props = Some(producerProps))
     (0 until numRecords).foreach { i =>
       producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key 
$i".getBytes, s"value $i".getBytes))
     }
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 929dbe4..1da6f9e 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -45,7 +45,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
   def testBatchSizeZero() {
     val producerProps = new Properties()
     producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "0")
-    val producer = createProducer(brokerList = brokerList, lingerMs = 
Long.MaxValue, props = Some(producerProps))
+    val producer = createProducer(brokerList = brokerList, lingerMs = 
Int.MaxValue, props = Some(producerProps))
     sendAndVerify(producer)
   }
 
@@ -53,13 +53,13 @@ class PlaintextProducerSendTest extends 
BaseProducerSendTest {
   def testSendCompressedMessageWithLogAppendTime() {
     val producerProps = new Properties()
     producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
-    val producer = createProducer(brokerList = brokerList, lingerMs = 
Long.MaxValue, props = Some(producerProps))
+    val producer = createProducer(brokerList = brokerList, lingerMs = 
Int.MaxValue, props = Some(producerProps))
     sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
   }
 
   @Test
   def testSendNonCompressedMessageWithLogAppendTime() {
-    val producer = createProducer(brokerList = brokerList, lingerMs = 
Long.MaxValue)
+    val producer = createProducer(brokerList = brokerList, lingerMs = 
Int.MaxValue)
     sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 9b77c2d..0227690 100644
--- 
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -64,11 +64,11 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
   override def setUp() {
     super.setUp()
 
-    producer1 = TestUtils.createProducer(brokerList, acks = 0, 
requestTimeoutMs = 30000L, maxBlockMs = 10000L,
+    producer1 = TestUtils.createProducer(brokerList, acks = 0, 
requestTimeoutMs = 30000, maxBlockMs = 10000L,
       bufferSize = producerBufferSize)
-    producer2 = TestUtils.createProducer(brokerList, acks = 1, 
requestTimeoutMs = 30000L, maxBlockMs = 10000L,
+    producer2 = TestUtils.createProducer(brokerList, acks = 1, 
requestTimeoutMs = 30000, maxBlockMs = 10000L,
       bufferSize = producerBufferSize)
-    producer3 = TestUtils.createProducer(brokerList, acks = -1, 
requestTimeoutMs = 30000L, maxBlockMs = 10000L,
+    producer3 = TestUtils.createProducer(brokerList, acks = -1, 
requestTimeoutMs = 30000, maxBlockMs = 10000L,
       bufferSize = producerBufferSize)
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 49553e8..61d5919 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -1368,11 +1368,11 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
   private case class ProducerBuilder() extends 
ClientBuilder[KafkaProducer[String, String]] {
     private var _retries = 0
     private var _acks = -1
-    private var _requestTimeoutMs = 30000L
+    private var _requestTimeoutMs = 30000
 
     def maxRetries(retries: Int): ProducerBuilder = { _retries = retries; this 
}
     def acks(acks: Int): ProducerBuilder = { _acks = acks; this }
-    def requestTimeoutMs(timeoutMs: Long): ProducerBuilder = { 
_requestTimeoutMs = timeoutMs; this }
+    def requestTimeoutMs(timeoutMs: Int): ProducerBuilder = { 
_requestTimeoutMs = timeoutMs; this }
 
     override def build(): KafkaProducer[String, String] = {
       val producer = TestUtils.createProducer(bootstrapServers,
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 67f33eb..57aca1e 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -204,7 +204,7 @@ class FetchRequestTest extends BaseRequestTest {
     val propsOverride = new Properties
     propsOverride.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString)
     val producer = 
TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
-      retries = 5, lingerMs = Long.MaxValue,
+      retries = 5, lingerMs = Int.MaxValue,
       keySerializer = new StringSerializer, valueSerializer = new 
ByteArraySerializer, props = Some(propsOverride))
     val bytes = new Array[Byte](msgValueLen)
     val futures = try {
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index cf60c78..aa902f2 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -548,8 +548,8 @@ object TestUtils extends Logging {
                            maxBlockMs: Long = 60 * 1000L,
                            bufferSize: Long = 1024L * 1024L,
                            retries: Int = 0,
-                           lingerMs: Long = 0,
-                           requestTimeoutMs: Long = 30 * 1000L,
+                           lingerMs: Int = 0,
+                           requestTimeoutMs: Int = 30 * 1000,
                            securityProtocol: SecurityProtocol = 
SecurityProtocol.PLAINTEXT,
                            trustStoreFile: Option[File] = None,
                            saslProperties: Option[Properties] = None,
@@ -564,6 +564,11 @@ object TestUtils extends Logging {
     producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
     producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
     producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
requestTimeoutMs.toString)
+    producerProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs.toString)
+
+    // In case of overflow set maximum possible value for deliveryTimeoutMs
+    val deliveryTimeoutMs = if (lingerMs + requestTimeoutMs < 0) Int.MaxValue 
else lingerMs + requestTimeoutMs
+    producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
deliveryTimeoutMs.toString)
 
     /* Only use these if not already set */
     val defaultProps = Map(
diff --git a/docs/upgrade.html b/docs/upgrade.html
index ccfab95..ac1388e 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -30,6 +30,11 @@
         offset retention period (or the one set by broker) has passed since 
their last commit.</li>
     <li>The default for console consumer's <code>enable.auto.commit</code> 
property when no <code>group.id</code> is provided is now set to 
<code>false</code>.
         This is to avoid polluting the consumer coordinator cache as the 
auto-generated group is not likely to be used by other consumers.</li>
+    <li>The default value for the producer's <code>retries</code> config was 
changed to <code>Integer.MAX_VALUE</code>, as we introduced 
<code>delivery.timeout.ms</code>
+        in <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer";>KIP-91</a>,
+        which sets an upper bound on the total time between sending a record 
and receiving acknowledgement from the broker. By default,
+        the delivery timeout is set to 2 minutes.
+    </li>
 </ol>
 
 

Reply via email to