This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7fc7136 KAFKA-5886; Introduce delivery.timeout.ms producer config
(KIP-91) (#5270)
7fc7136 is described below
commit 7fc7136ffd28245629f00870fc7f7cbe1fe1c6ce
Author: Yu Yang <[email protected]>
AuthorDate: Thu Jul 26 09:13:50 2018 -0700
KAFKA-5886; Introduce delivery.timeout.ms producer config (KIP-91) (#5270)
Co-authored-by: Sumant Tambe <[email protected]>
Co-authored-by: Yu Yang <[email protected]>
Reviewers: Ted Yu <[email protected]>, Apurva Mehta
<[email protected]>, Jason Gustafson <[email protected]>
---
.../kafka/clients/producer/KafkaProducer.java | 58 ++--
.../kafka/clients/producer/ProducerConfig.java | 24 +-
.../clients/producer/internals/ProducerBatch.java | 95 +++---
.../producer/internals/RecordAccumulator.java | 318 +++++++++++----------
.../kafka/clients/producer/internals/Sender.java | 188 ++++++++----
.../apache/kafka/common/config/AbstractConfig.java | 2 +-
.../org/apache/kafka/common/config/ConfigDef.java | 11 +-
.../java/org/apache/kafka/clients/MockClient.java | 5 +-
.../kafka/clients/consumer/KafkaConsumerTest.java | 4 +-
.../producer/internals/ProducerBatchTest.java | 32 +--
.../producer/internals/RecordAccumulatorTest.java | 248 ++++++++++++----
.../clients/producer/internals/SenderTest.java | 291 +++++++++++++++----
.../producer/internals/TransactionManagerTest.java | 6 +-
.../org/apache/kafka/connect/runtime/Worker.java | 1 +
.../kafka/api/BaseProducerSendTest.scala | 12 +-
.../kafka/api/PlaintextConsumerTest.scala | 4 +-
.../kafka/api/PlaintextProducerSendTest.scala | 6 +-
.../kafka/api/ProducerFailureHandlingTest.scala | 6 +-
.../server/DynamicBrokerReconfigurationTest.scala | 4 +-
.../scala/unit/kafka/server/FetchRequestTest.scala | 2 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 9 +-
docs/upgrade.html | 5 +
22 files changed, 900 insertions(+), 431 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3991467..b40b09a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -16,6 +16,17 @@
*/
package org.apache.kafka.clients.producer;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.KafkaClient;
@@ -24,6 +35,7 @@ import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.producer.internals.BufferPool;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.ProducerMetrics;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
@@ -69,18 +81,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
import static
org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.ensureExtended;
/**
@@ -235,6 +235,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new
AtomicInteger(1);
private static final String JMX_PREFIX = "kafka.producer";
public static final String NETWORK_THREAD_PREFIX =
"kafka-producer-network-thread";
+ public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";
private final String clientId;
// Visible for testing
@@ -392,18 +393,21 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
int retries = configureRetries(config, transactionManager != null,
log);
int maxInflightRequests = configureInflightRequests(config,
transactionManager != null);
short acks = configureAcks(config, transactionManager != null,
log);
+ int deliveryTimeoutMs = configureDeliveryTimeout(config, log);
this.apiVersions = new ApiVersions();
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
- this.totalMemorySize,
this.compressionType,
- config.getLong(ProducerConfig.LINGER_MS_CONFIG),
+ config.getInt(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
+ deliveryTimeoutMs,
metrics,
+ PRODUCER_METRIC_GROUP_NAME,
time,
apiVersions,
- transactionManager);
+ transactionManager,
+ new BufferPool(this.totalMemorySize,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time,
PRODUCER_METRIC_GROUP_NAME));
List<InetSocketAddress> addresses =
ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
if (metadata != null) {
this.metadata = metadata;
@@ -459,10 +463,30 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
}
}
- private static TransactionManager configureTransactionState(ProducerConfig
config, LogContext logContext, Logger log) {
+ private static int configureDeliveryTimeout(ProducerConfig config, Logger
log) {
+ int deliveryTimeoutMs =
config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG);
+ int lingerMs = config.getInt(ProducerConfig.LINGER_MS_CONFIG);
+ int requestTimeoutMs =
config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+
+ if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs <
lingerMs + requestTimeoutMs) {
+ if
(config.originals().containsKey(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)) {
+ // throw an exception if the user explicitly set an
inconsistent value
+ throw new
ConfigException(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG
+ + " should be equal to or larger than " +
ProducerConfig.LINGER_MS_CONFIG
+ + " + " + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+ } else {
+ // override deliveryTimeoutMs default value to lingerMs +
requestTimeoutMs for backward compatibility
+ deliveryTimeoutMs = lingerMs + requestTimeoutMs;
+ log.warn("{} should be equal to or larger than {} + {}.
Setting it to {}.",
+ ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
ProducerConfig.LINGER_MS_CONFIG,
+ ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
deliveryTimeoutMs);
+ }
+ }
+ return deliveryTimeoutMs;
+ }
+ private static TransactionManager configureTransactionState(ProducerConfig
config, LogContext logContext, Logger log) {
TransactionManager transactionManager = null;
-
boolean userConfiguredIdempotence = false;
if
(config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG))
userConfiguredIdempotence = true;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 8e7b662..ab55353 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -99,6 +99,19 @@ public class ProducerConfig extends AbstractConfig {
+ "specified time waiting for
more records to show up. This setting defaults to 0 (i.e. no delay). Setting
<code>" + LINGER_MS_CONFIG + "=5</code>, "
+ "for example, would have the
effect of reducing the number of requests sent but would add up to 5ms of
latency to records sent in the absence of load.";
+ /** <code>request.timeout.ms</code> */
+ public static final String REQUEST_TIMEOUT_MS_CONFIG =
CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
+ private static final String REQUEST_TIMEOUT_MS_DOC =
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
+ + " This should be larger than replica.lag.time.max.ms (a broker
configuration)"
+ + " to reduce the possibility of message duplication due to
unnecessary producer retries.";
+
+ /** <code>delivery.timeout.ms</code> */
+ public static final String DELIVERY_TIMEOUT_MS_CONFIG =
"delivery.timeout.ms";
+ private static final String DELIVERY_TIMEOUT_MS_DOC = "An upper bound on
the time to report success or failure after Producer.send() returns. "
+ + "Producer may
report failure to send a message earlier than this config if all the retries
are exhausted or "
+ + "a record is added
to a batch nearing expiration. " + DELIVERY_TIMEOUT_MS_CONFIG + "should be
equal to or "
+ + "greater than " +
REQUEST_TIMEOUT_MS_CONFIG + " + " + LINGER_MS_CONFIG;
+
/** <code>client.id</code> */
public static final String CLIENT_ID_CONFIG =
CommonClientConfigs.CLIENT_ID_CONFIG;
@@ -188,12 +201,6 @@ public class ProducerConfig extends AbstractConfig {
public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
private static final String PARTITIONER_CLASS_DOC = "Partitioner class
that implements the <code>org.apache.kafka.clients.producer.Partitioner</code>
interface.";
- /** <code>request.timeout.ms</code> */
- public static final String REQUEST_TIMEOUT_MS_CONFIG =
CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
- private static final String REQUEST_TIMEOUT_MS_DOC =
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
- + " This should be
larger than replica.lag.time.max.ms (a broker configuration)"
- + " to reduce the
possibility of message duplication due to unnecessary producer retries.";
-
/** <code>interceptor.classes</code> */
public static final String INTERCEPTOR_CLASSES_CONFIG =
"interceptor.classes";
public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to
use as interceptors. "
@@ -224,7 +231,7 @@ public class ProducerConfig extends AbstractConfig {
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST,
Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH,
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 *
1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
- .define(RETRIES_CONFIG, Type.INT, 0,
between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
+ .define(RETRIES_CONFIG, Type.INT,
Integer.MAX_VALUE, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
.define(ACKS_CONFIG,
Type.STRING,
"1",
@@ -233,7 +240,8 @@ public class ProducerConfig extends AbstractConfig {
ACKS_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING,
"none", Importance.HIGH, COMPRESSION_TYPE_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 16384,
atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
- .define(LINGER_MS_CONFIG, Type.LONG, 0,
atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC)
+ .define(LINGER_MS_CONFIG, Type.INT, 0,
atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
+ .define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT,
120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
.define(CLIENT_ID_CONFIG, Type.STRING, "",
Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
.define(SEND_BUFFER_CONFIG, Type.INT, 128 *
1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
.define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 *
1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index ea0f0f7..6e08185 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -20,7 +20,6 @@ import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
-import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionRatioEstimator;
@@ -77,13 +76,13 @@ public final class ProducerBatch {
private boolean retry;
private boolean reopened = false;
- public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder
recordsBuilder, long now) {
- this(tp, recordsBuilder, now, false);
+ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder
recordsBuilder, long createdMs) {
+ this(tp, recordsBuilder, createdMs, false);
}
- public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder
recordsBuilder, long now, boolean isSplitBatch) {
- this.createdMs = now;
- this.lastAttemptMs = now;
+ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder
recordsBuilder, long createdMs, boolean isSplitBatch) {
+ this.createdMs = createdMs;
+ this.lastAttemptMs = createdMs;
this.recordsBuilder = recordsBuilder;
this.topicPartition = tp;
this.lastAppendTime = createdMs;
@@ -158,7 +157,17 @@ public final class ProducerBatch {
}
/**
- * Complete the request. If the batch was previously aborted, this is a
no-op.
+ * Finalize the state of a batch. Final state, once set, is immutable.
This function may be called
+ * once or twice on a batch. It may be called twice if
+ * 1. An inflight batch expires before a response from the broker is
received. The batch's final
+ * state is set to FAILED. But it could succeed on the broker and second
time around batch.done() may
+ * try to set SUCCEEDED final state.
+ * 2. If a transaction abortion happens or if the producer is closed
forcefully, the final state is
+ * ABORTED but again it could succeed if broker responds with a success.
+ *
+ * Attempted transitions from [FAILED | ABORTED] --> SUCCEEDED are logged.
+ * Attempted transitions from one failure state to the same or a different
failed state are ignored.
+ * Attempted transitions from SUCCEEDED to the same or a failed state
throw an exception.
*
* @param baseOffset The base offset of the messages assigned by the server
* @param logAppendTime The log append time or -1 if CreateTime is being
used
@@ -166,26 +175,34 @@ public final class ProducerBatch {
* @return true if the batch was completed successfully and false if the
batch was previously aborted
*/
public boolean done(long baseOffset, long logAppendTime, RuntimeException
exception) {
- final FinalState finalState;
- if (exception == null) {
+ final FinalState tryFinalState = (exception == null) ?
FinalState.SUCCEEDED : FinalState.FAILED;
+
+ if (tryFinalState == FinalState.SUCCEEDED) {
log.trace("Successfully produced messages to {} with base offset
{}.", topicPartition, baseOffset);
- finalState = FinalState.SUCCEEDED;
} else {
- log.trace("Failed to produce messages to {}.", topicPartition,
exception);
- finalState = FinalState.FAILED;
+ log.trace("Failed to produce messages to {} with base offset {}.",
topicPartition, baseOffset, exception);
+ }
+
+ if (this.finalState.compareAndSet(null, tryFinalState)) {
+ completeFutureAndFireCallbacks(baseOffset, logAppendTime,
exception);
+ return true;
}
- if (!this.finalState.compareAndSet(null, finalState)) {
- if (this.finalState.get() == FinalState.ABORTED) {
- log.debug("ProduceResponse returned for {} after batch had
already been aborted.", topicPartition);
- return false;
+ if (this.finalState.get() != FinalState.SUCCEEDED) {
+ if (tryFinalState == FinalState.SUCCEEDED) {
+ // Log if a previously unsuccessful batch succeeded later on.
+ log.debug("ProduceResponse returned {} for {} after batch with
base offset {} had already been {}.",
+ tryFinalState, topicPartition, baseOffset,
this.finalState.get());
} else {
- throw new IllegalStateException("Batch has already been
completed in final state " + this.finalState.get());
+ // FAILED --> FAILED and ABORTED --> FAILED transitions are
ignored.
+ log.debug("Ignored state transition {} -> {} for {} batch with
base offset {}",
+ this.finalState.get(), tryFinalState, topicPartition,
baseOffset);
}
+ } else {
+ // A SUCCESSFUL batch must not attempt another state change.
+ throw new IllegalStateException("A " + this.finalState.get() + "
batch must not attempt another state change to " + tryFinalState);
}
-
- completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
- return true;
+ return false;
}
private void completeFutureAndFireCallbacks(long baseOffset, long
logAppendTime, RuntimeException exception) {
@@ -299,37 +316,12 @@ public final class ProducerBatch {
return "ProducerBatch(topicPartition=" + topicPartition + ",
recordCount=" + recordCount + ")";
}
- /**
- * A batch whose metadata is not available should be expired if one of the
following is true:
- * <ol>
- * <li> the batch is not in retry AND request timeout has elapsed
after it is ready (full or linger.ms has reached).
- * <li> the batch is in retry AND request timeout has elapsed after
the backoff period ended.
- * </ol>
- * This methods closes this batch and sets {@code expiryErrorMessage} if
the batch has timed out.
- */
- boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now,
long lingerMs, boolean isFull) {
- if (!this.inRetry() && isFull && requestTimeoutMs < (now -
this.lastAppendTime))
- expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed
since last append";
- else if (!this.inRetry() && requestTimeoutMs < (createdTimeMs(now) -
lingerMs))
- expiryErrorMessage = (createdTimeMs(now) - lingerMs) + " ms has
passed since batch creation plus linger time";
- else if (this.inRetry() && requestTimeoutMs < (waitedTimeMs(now) -
retryBackoffMs))
- expiryErrorMessage = (waitedTimeMs(now) - retryBackoffMs) + " ms
has passed since last attempt plus backoff time";
-
- boolean expired = expiryErrorMessage != null;
- if (expired)
- abortRecordAppends();
- return expired;
+ boolean hasReachedDeliveryTimeout(long deliveryTimeoutMs, long now) {
+ return deliveryTimeoutMs <= now - this.createdMs;
}
- /**
- * If {@link #maybeExpire(int, long, long, long, boolean)} returned true,
the sender will fail the batch with
- * the exception returned by this method.
- * @return An exception indicating the batch expired.
- */
- TimeoutException timeoutException() {
- if (expiryErrorMessage == null)
- throw new IllegalStateException("Batch has not expired");
- return new TimeoutException("Expiring " + recordCount + " record(s)
for " + topicPartition + ": " + expiryErrorMessage);
+ public FinalState finalState() {
+ return this.finalState.get();
}
int attempts() {
@@ -347,10 +339,6 @@ public final class ProducerBatch {
return drainedMs - createdMs;
}
- long createdTimeMs(long nowMs) {
- return Math.max(0, nowMs - createdMs);
- }
-
long waitedTimeMs(long nowMs) {
return Math.max(0, nowMs - lastAttemptMs);
}
@@ -467,5 +455,4 @@ public final class ProducerBatch {
public boolean sequenceHasBeenReset() {
return reopened;
}
-
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 31c6d75..964ac3c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -16,6 +16,18 @@
*/
package org.apache.kafka.clients.producer.internals;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.Cluster;
@@ -34,10 +46,10 @@ import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.kafka.common.utils.LogContext;
@@ -45,20 +57,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
/**
* This class acts as a queue that accumulates records into {@link
MemoryRecords}
* instances to be sent to the server.
@@ -76,6 +74,7 @@ public final class RecordAccumulator {
private final CompressionType compression;
private final long lingerMs;
private final long retryBackoffMs;
+ private final long deliveryTimeoutMs;
private final BufferPool free;
private final Time time;
private final ApiVersions apiVersions;
@@ -85,13 +84,13 @@ public final class RecordAccumulator {
private final Map<TopicPartition, Long> muted;
private int drainIndex;
private final TransactionManager transactionManager;
+ private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time
(absolute) a batch will expire.
/**
* Create a new record accumulator
*
* @param logContext The log context used for logging
* @param batchSize The size to use when allocating {@link MemoryRecords}
instances
- * @param totalSize The maximum memory the record accumulator can use.
* @param compression The compression codec for the records
* @param lingerMs An artificial delay time to add before declaring a
records instance that isn't full ready for
* sending. This allows time for more records to arrive. Setting a
non-zero lingerMs will trade off some
@@ -106,14 +105,16 @@ public final class RecordAccumulator {
*/
public RecordAccumulator(LogContext logContext,
int batchSize,
- long totalSize,
CompressionType compression,
long lingerMs,
long retryBackoffMs,
+ long deliveryTimeoutMs,
Metrics metrics,
+ String metricGrpName,
Time time,
ApiVersions apiVersions,
- TransactionManager transactionManager) {
+ TransactionManager transactionManager,
+ BufferPool bufferPool) {
this.log = logContext.logger(RecordAccumulator.class);
this.drainIndex = 0;
this.closed = false;
@@ -123,9 +124,9 @@ public final class RecordAccumulator {
this.compression = compression;
this.lingerMs = lingerMs;
this.retryBackoffMs = retryBackoffMs;
+ this.deliveryTimeoutMs = deliveryTimeoutMs;
this.batches = new CopyOnWriteMap<>();
- String metricGrpName = "producer-metrics";
- this.free = new BufferPool(totalSize, batchSize, metrics, time,
metricGrpName);
+ this.free = bufferPool;
this.incomplete = new IncompleteBatches();
this.muted = new HashMap<>();
this.time = time;
@@ -227,7 +228,6 @@ public final class RecordAccumulator {
// Don't deallocate this buffer in the finally block as it's
being used in the record batch
buffer = null;
-
return new RecordAppendResult(future, dq.size() > 1 ||
batch.isFull(), true);
}
} finally {
@@ -240,7 +240,7 @@ public final class RecordAccumulator {
private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte
maxUsableMagic) {
if (transactionManager != null && maxUsableMagic <
RecordBatch.MAGIC_VALUE_V2) {
throw new UnsupportedVersionException("Attempting to use
idempotence with a broker which does not " +
- "support the required message format (v2). The broker must
be version 0.11 or later.");
+ "support the required message format (v2). The broker must be
version 0.11 or later.");
}
return MemoryRecords.builder(buffer, maxUsableMagic, compression,
TimestampType.CREATE_TIME, 0L);
}
@@ -273,37 +273,35 @@ public final class RecordAccumulator {
return result;
}
+ public void maybeUpdateNextBatchExpiryTime(ProducerBatch batch) {
+ if (batch.createdMs + deliveryTimeoutMs > 0) {
+ // the non-negative check is to guard us against potential
overflow due to setting
+ // a large value for deliveryTimeoutMs
+ nextBatchExpiryTimeMs = Math.min(nextBatchExpiryTimeMs,
batch.createdMs + deliveryTimeoutMs);
+ } else {
+ log.warn("Skipping next batch expiry time update due to addition
overflow: "
+ + "batch.createMs={}, deliveryTimeoutMs={}", batch.createdMs,
deliveryTimeoutMs);
+ }
+ }
+
/**
* Get a list of batches which have been sitting in the accumulator too
long and need to be expired.
*/
- public List<ProducerBatch> expiredBatches(int requestTimeout, long now) {
+ public List<ProducerBatch> expiredBatches(long now) {
List<ProducerBatch> expiredBatches = new ArrayList<>();
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry :
this.batches.entrySet()) {
- Deque<ProducerBatch> dq = entry.getValue();
- TopicPartition tp = entry.getKey();
- // We only check if the batch should be expired if the partition
does not have a batch in flight.
- // This is to prevent later batches from being expired while an
earlier batch is still in progress.
- // Note that `muted` is only ever populated if
`max.in.flight.request.per.connection=1` so this protection
- // is only active in this case. Otherwise the expiration order is
not guaranteed.
- if (!isMuted(tp, now)) {
- synchronized (dq) {
- // iterate over the batches and expire them if they have
been in the accumulator for more than requestTimeOut
- ProducerBatch lastBatch = dq.peekLast();
- Iterator<ProducerBatch> batchIterator = dq.iterator();
- while (batchIterator.hasNext()) {
- ProducerBatch batch = batchIterator.next();
- boolean isFull = batch != lastBatch || batch.isFull();
- // Check if the batch has expired. Expired batches are
closed by maybeExpire, but callbacks
- // are invoked after completing the iterations, since
sends invoked from callbacks
- // may append more batches to the deque being
iterated. The batch is deallocated after
- // callbacks are invoked.
- if (batch.maybeExpire(requestTimeout, retryBackoffMs,
now, this.lingerMs, isFull)) {
- expiredBatches.add(batch);
- batchIterator.remove();
- } else {
- // Stop at the first batch that has not expired.
- break;
- }
+ // expire the batches in the order of sending
+ Deque<ProducerBatch> deque = entry.getValue();
+ synchronized (deque) {
+ while (!deque.isEmpty()) {
+ ProducerBatch batch = deque.getFirst();
+ if (batch.hasReachedDeliveryTimeout(deliveryTimeoutMs,
now)) {
+ deque.poll();
+ batch.abortRecordAppends();
+ expiredBatches.add(batch);
+ } else {
+ maybeUpdateNextBatchExpiryTime(batch);
+ break;
}
}
}
@@ -311,8 +309,13 @@ public final class RecordAccumulator {
return expiredBatches;
}
+ public long getDeliveryTimeoutMs() {
+ return deliveryTimeoutMs;
+ }
+
/**
- * Re-enqueue the given record batch in the accumulator to retry
+ * Re-enqueue the given record batch in the accumulator. In
Sender.completeBatch method, we check
+ * whether the batch has reached deliveryTimeoutMs or not. Hence we do not
do the delivery timeout check here.
*/
public void reenqueue(ProducerBatch batch, long now) {
batch.reenqueued(now);
@@ -356,8 +359,8 @@ public final class RecordAccumulator {
}
// We will have to do extra work to ensure the queue is in order when
requests are being retried and there are
- // multiple requests in flight to that partition. If the first inflight
request fails to append, then all the subsequent
- // in flight requests will also fail because the sequence numbers will not
be accepted.
+ // multiple requests in flight to that partition. If the first in flight
request fails to append, then all the
+ // subsequent in flight requests will also fail because the sequence
numbers will not be accepted.
//
// Further, once batches are being retried, we are reduced to a single in
flight request for that partition. So when
// the subsequent batches come back in sequence order, they will have to
be placed further back in the queue.
@@ -368,12 +371,12 @@ public final class RecordAccumulator {
private void insertInSequenceOrder(Deque<ProducerBatch> deque,
ProducerBatch batch) {
// When we are requeing and have enabled idempotence, the reenqueued
batch must always have a sequence.
if (batch.baseSequence() == RecordBatch.NO_SEQUENCE)
- throw new IllegalStateException("Trying to reenqueue a batch which
doesn't have a sequence even " +
- "though idempotence is enabled.");
+ throw new IllegalStateException("Trying to re-enqueue a batch
which doesn't have a sequence even " +
+ "though idempotency is enabled.");
if (transactionManager.nextBatchBySequence(batch.topicPartition) ==
null)
- throw new IllegalStateException("We are reenqueueing a batch which
is not tracked as part of the in flight " +
- "requests. batch.topicPartition: " + batch.topicPartition
+ "; batch.baseSequence: " + batch.baseSequence());
+ throw new IllegalStateException("We are re-enqueueing a batch
which is not tracked as part of the in flight " +
+ "requests. batch.topicPartition: " + batch.topicPartition + ";
batch.baseSequence: " + batch.baseSequence());
ProducerBatch firstBatchInQueue = deque.peekFirst();
if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() &&
firstBatchInQueue.baseSequence() < batch.baseSequence()) {
@@ -390,7 +393,7 @@ public final class RecordAccumulator {
orderedBatches.add(deque.pollFirst());
log.debug("Reordered incoming batch with sequence {} for partition
{}. It was placed in the queue at " +
- "position {}", batch.baseSequence(), batch.topicPartition,
orderedBatches.size());
+ "position {}", batch.baseSequence(), batch.topicPartition,
orderedBatches.size());
// Either we have reached a point where there are batches without
a sequence (ie. never been drained
// and are hence in order by default), or the batch at the front
of the queue has a sequence greater
// than the incoming batch. This is the right place to add the
incoming batch.
@@ -466,7 +469,6 @@ public final class RecordAccumulator {
}
}
}
-
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs,
unknownLeaderTopics);
}
@@ -484,6 +486,106 @@ public final class RecordAccumulator {
return false;
}
+ private boolean shouldStopDrainBatchesForPartition(ProducerBatch first,
TopicPartition tp) {
+ ProducerIdAndEpoch producerIdAndEpoch = null;
+ if (transactionManager != null) {
+ if (!transactionManager.isSendToPartitionAllowed(tp))
+ return true;
+
+ producerIdAndEpoch = transactionManager.producerIdAndEpoch();
+ if (!producerIdAndEpoch.isValid())
+ // we cannot send the batch until we have refreshed the
producer id
+ return true;
+
+ if (!first.hasSequence() &&
transactionManager.hasUnresolvedSequence(first.topicPartition))
+ // Don't drain any new batches while the state of previous
sequence numbers
+ // is unknown. The previous batches would be unknown if they
were aborted
+ // on the client after being sent to the broker at least once.
+ return true;
+
+ int firstInFlightSequence =
transactionManager.firstInFlightSequence(first.topicPartition);
+ if (firstInFlightSequence != RecordBatch.NO_SEQUENCE &&
first.hasSequence()
+ && first.baseSequence() != firstInFlightSequence)
+ // If the queued batch already has an assigned sequence, then
it is being retried.
+ // In this case, we wait until the next immediate batch is
ready and drain that.
+ // We only move on when the next in line batch is complete
(either successfully or due to
+ // a fatal broker error). This effectively reduces our in
flight request count to 1.
+ return true;
+ }
+ return false;
+ }
+
+ private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node
node, int maxSize, long now) {
+ int size = 0;
+ List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
+ List<ProducerBatch> ready = new ArrayList<>();
+ /* to make starvation less likely this loop doesn't start at 0 */
+ int start = drainIndex = drainIndex % parts.size();
+ do {
+ PartitionInfo part = parts.get(drainIndex);
+ TopicPartition tp = new TopicPartition(part.topic(),
part.partition());
+ this.drainIndex = (this.drainIndex + 1) % parts.size();
+
+ // Only proceed if the partition has no in-flight batches.
+ if (isMuted(tp, now))
+ continue;
+
+ Deque<ProducerBatch> deque = getDeque(tp);
+ if (deque == null)
+ continue;
+
+ synchronized (deque) {
+ // invariant: !isMuted(tp,now) && deque != null
+ ProducerBatch first = deque.peekFirst();
+ if (first == null)
+ continue;
+
+ // first != null
+ boolean backoff = first.attempts() > 0 &&
first.waitedTimeMs(now) < retryBackoffMs;
+ // Only drain the batch if it is not during backoff period.
+ if (backoff)
+ continue;
+
+ if (size + first.estimatedSizeInBytes() > maxSize &&
!ready.isEmpty()) {
+ // there is a rare case that a single batch size is larger
than the request size due to
+ // compression; in this case we will still eventually send
this batch in a single request
+ break;
+ } else {
+ if (shouldStopDrainBatchesForPartition(first, tp))
+ break;
+
+ boolean isTransactional = transactionManager != null ?
transactionManager.isTransactional() : false;
+ ProducerIdAndEpoch producerIdAndEpoch =
+ transactionManager != null ?
transactionManager.producerIdAndEpoch() : null;
+ ProducerBatch batch = deque.pollFirst();
+ if (producerIdAndEpoch != null && !batch.hasSequence()) {
+ // If the batch already has an assigned sequence, then
we should not change the producer id and
+ // sequence number, since this may introduce
duplicates. In particular, the previous attempt
+ // may actually have been accepted, and if we change
the producer id and sequence here, this
+ // attempt will also be accepted, causing a duplicate.
+ //
+ // Additionally, we update the next sequence number
bound for the partition, and also have
+ // the transaction manager track the batch so as to
ensure that sequence ordering is maintained
+ // even if we receive out of order responses.
+ batch.setProducerState(producerIdAndEpoch,
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
+
transactionManager.incrementSequenceNumber(batch.topicPartition,
batch.recordCount);
+ log.debug("Assigned producerId {} and producerEpoch {}
to batch with base sequence " +
+ "{} being sent to partition {}",
producerIdAndEpoch.producerId,
+ producerIdAndEpoch.epoch, batch.baseSequence(),
tp);
+
+ transactionManager.addInFlightBatch(batch);
+ }
+ batch.close();
+ size += batch.records().sizeInBytes();
+ ready.add(batch);
+
+ batch.drained(now);
+ }
+ }
+ } while (start != drainIndex);
+ return ready;
+ }
+
/**
* Drain all the data for the given nodes and collate them into a list of
batches that will fit within the specified
* size on a per-node basis. This method attempts to avoid choosing the
same topic-node over and over.
@@ -494,106 +596,25 @@ public final class RecordAccumulator {
* @param now The current unix time in milliseconds
* @return A list of {@link ProducerBatch} for each node specified with
total size less than the requested maxSize.
*/
- public Map<Integer, List<ProducerBatch>> drain(Cluster cluster,
- Set<Node> nodes,
- int maxSize,
- long now) {
+ public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node>
nodes, int maxSize, long now) {
if (nodes.isEmpty())
return Collections.emptyMap();
Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
for (Node node : nodes) {
- int size = 0;
- List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
- List<ProducerBatch> ready = new ArrayList<>();
- /* to make starvation less likely this loop doesn't start at 0 */
- int start = drainIndex = drainIndex % parts.size();
- do {
- PartitionInfo part = parts.get(drainIndex);
- TopicPartition tp = new TopicPartition(part.topic(),
part.partition());
- // Only proceed if the partition has no in-flight batches.
- if (!isMuted(tp, now)) {
- Deque<ProducerBatch> deque = getDeque(tp);
- if (deque != null) {
- synchronized (deque) {
- ProducerBatch first = deque.peekFirst();
- if (first != null) {
- boolean backoff = first.attempts() > 0 &&
first.waitedTimeMs(now) < retryBackoffMs;
- // Only drain the batch if it is not during
backoff period.
- if (!backoff) {
- if (size + first.estimatedSizeInBytes() >
maxSize && !ready.isEmpty()) {
- // there is a rare case that a single
batch size is larger than the request size due
- // to compression; in this case we
will still eventually send this batch in a single
- // request
- break;
- } else {
- ProducerIdAndEpoch producerIdAndEpoch
= null;
- boolean isTransactional = false;
- if (transactionManager != null) {
- if
(!transactionManager.isSendToPartitionAllowed(tp))
- break;
-
- producerIdAndEpoch =
transactionManager.producerIdAndEpoch();
- if (!producerIdAndEpoch.isValid())
- // we cannot send the batch
until we have refreshed the producer id
- break;
-
- isTransactional =
transactionManager.isTransactional();
-
- if (!first.hasSequence() &&
transactionManager.hasUnresolvedSequence(first.topicPartition))
- // Don't drain any new batches
while the state of previous sequence numbers
- // is unknown. The previous
batches would be unknown if they were aborted
- // on the client after being
sent to the broker at least once.
- break;
-
- int firstInFlightSequence =
transactionManager.firstInFlightSequence(first.topicPartition);
- if (firstInFlightSequence !=
RecordBatch.NO_SEQUENCE && first.hasSequence()
- && first.baseSequence() !=
firstInFlightSequence)
- // If the queued batch already
has an assigned sequence, then it is being
- // retried. In this case, we
wait until the next immediate batch is ready
- // and drain that. We only
move on when the next in line batch is complete (either successfully
- // or due to a fatal broker
error). This effectively reduces our
- // in flight request count to
1.
- break;
- }
-
- ProducerBatch batch =
deque.pollFirst();
- if (producerIdAndEpoch != null &&
!batch.hasSequence()) {
- // If the batch already has an
assigned sequence, then we should not change the producer id and
- // sequence number, since this may
introduce duplicates. In particular,
- // the previous attempt may
actually have been accepted, and if we change
- // the producer id and sequence
here, this attempt will also be accepted,
- // causing a duplicate.
- //
- // Additionally, we update the
next sequence number bound for the partition,
- // and also have the transaction
manager track the batch so as to ensure
- // that sequence ordering is
maintained even if we receive out of order
- // responses.
-
batch.setProducerState(producerIdAndEpoch,
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
-
transactionManager.incrementSequenceNumber(batch.topicPartition,
batch.recordCount);
- log.debug("Assigned producerId {}
and producerEpoch {} to batch with base sequence " +
- "{} being sent to
partition {}", producerIdAndEpoch.producerId,
- producerIdAndEpoch.epoch,
batch.baseSequence(), tp);
-
-
transactionManager.addInFlightBatch(batch);
- }
- batch.close();
- size += batch.records().sizeInBytes();
- ready.add(batch);
- batch.drained(now);
- }
- }
- }
- }
- }
- }
- this.drainIndex = (this.drainIndex + 1) % parts.size();
- } while (start != drainIndex);
+ List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node,
maxSize, now);
batches.put(node.id(), ready);
}
return batches;
}
+ /**
+ * The earliest absolute time a batch will expire (in milliseconds)
+ */
+ public Long nextExpiryTimeMs() {
+ return this.nextBatchExpiryTimeMs;
+ }
+
private Deque<ProducerBatch> getDeque(TopicPartition tp) {
return batches.get(tp);
}
@@ -784,5 +805,4 @@ public final class RecordAccumulator {
this.unknownLeaderTopics = unknownLeaderTopics;
}
}
-
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index d1a7bc9..7077f15 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.producer.internals;
+import java.util.ArrayList;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
@@ -34,6 +35,7 @@ import
org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -120,6 +122,9 @@ public class Sender implements Runnable {
/* all the state related to transactions, in particular the producer id,
producer epoch, and sequence numbers */
private final TransactionManager transactionManager;
+ // A per-partition queue of batches ordered by creation time for tracking
the in-flight batches
+ private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches;
+
public Sender(LogContext logContext,
KafkaClient client,
Metadata metadata,
@@ -149,6 +154,73 @@ public class Sender implements Runnable {
this.retryBackoffMs = retryBackoffMs;
this.apiVersions = apiVersions;
this.transactionManager = transactionManager;
+ this.inFlightBatches = new HashMap<>();
+ }
+
+ public List<ProducerBatch> inFlightBatches(TopicPartition tp) {
+ return inFlightBatches.containsKey(tp) ? inFlightBatches.get(tp) : new
ArrayList<>();
+ }
+
+ public void maybeRemoveFromInflightBatches(ProducerBatch batch) {
+ List<ProducerBatch> batches =
inFlightBatches.get(batch.topicPartition);
+ if (batches != null) {
+ batches.remove(batch);
+ if (batches.isEmpty()) {
+ inFlightBatches.remove(batch.topicPartition);
+ }
+ }
+ }
+
+ /**
+ * Get the in-flight batches that has reached delivery timeout.
+ */
+ private List<ProducerBatch> getExpiredInflightBatches(long now) {
+ List<ProducerBatch> expiredBatches = new ArrayList<>();
+ for (Map.Entry<TopicPartition, List<ProducerBatch>> entry :
inFlightBatches.entrySet()) {
+ TopicPartition topicPartition = entry.getKey();
+ List<ProducerBatch> partitionInFlightBatches = entry.getValue();
+ if (partitionInFlightBatches != null) {
+ Iterator<ProducerBatch> iter =
partitionInFlightBatches.iterator();
+ while (iter.hasNext()) {
+ ProducerBatch batch = iter.next();
+ if
(batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now)) {
+ iter.remove();
+ // expireBatches is called in Sender.sendProducerData,
before client.poll.
+ // The batch.finalState() == null invariant should
always hold. An IllegalStateException
+ // exception will be thrown if the invariant is
violated.
+ if (batch.finalState() == null) {
+ expiredBatches.add(batch);
+ } else {
+ throw new
IllegalStateException(batch.topicPartition + " batch created at " +
+ batch.createdMs + " gets unexpected final
state " + batch.finalState());
+ }
+ } else {
+ accumulator.maybeUpdateNextBatchExpiryTime(batch);
+ break;
+ }
+ }
+ if (partitionInFlightBatches.isEmpty())
+ inFlightBatches.remove(topicPartition);
+ }
+ }
+ return expiredBatches;
+ }
+
+ private void addToInflightBatches(List<ProducerBatch> batches) {
+ for (ProducerBatch batch : batches) {
+ List<ProducerBatch> inflightBatchList =
inFlightBatches.get(batch.topicPartition);
+ if (inflightBatchList == null) {
+ inflightBatchList = new ArrayList<>();
+ inFlightBatches.put(batch.topicPartition, inflightBatchList);
+ }
+ inflightBatchList.add(batch);
+ }
+ }
+
+ public void addToInflightBatches(Map<Integer, List<ProducerBatch>>
batches) {
+ for (List<ProducerBatch> batchList : batches.values()) {
+ addToInflightBatches(batchList);
+ }
}
/**
@@ -204,12 +276,12 @@ public class Sender implements Runnable {
if
(transactionManager.shouldResetProducerStateAfterResolvingSequences())
// Check if the previous run expired batches which
requires a reset of the producer state.
transactionManager.resetProducerId();
-
if (!transactionManager.isTransactional()) {
// this is an idempotent producer, so make sure we have a
producer id
maybeWaitForProducerId();
} else if (transactionManager.hasUnresolvedSequences() &&
!transactionManager.hasFatalError()) {
- transactionManager.transitionToFatalError(new
KafkaException("The client hasn't received acknowledgment for " +
+ transactionManager.transitionToFatalError(
+ new KafkaException("The client hasn't received
acknowledgment for " +
"some previously sent messages and can no longer
retry them. It isn't safe to continue."));
} else if
(transactionManager.hasInFlightTransactionalRequest() ||
maybeSendTransactionalRequest(now)) {
// as long as there are outstanding transactional
requests, we simply wait for them to return
@@ -241,7 +313,6 @@ public class Sender implements Runnable {
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
-
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result =
this.accumulator.ready(cluster, now);
@@ -253,8 +324,8 @@ public class Sender implements Runnable {
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
- log.debug("Requesting metadata update due to unknown leader topics
from the batched records: {}", result.unknownLeaderTopics);
-
+ log.debug("Requesting metadata update due to unknown leader topics
from the batched records: {}",
+ result.unknownLeaderTopics);
this.metadata.requestUpdate();
}
@@ -270,8 +341,8 @@ public class Sender implements Runnable {
}
// create produce requests
- Map<Integer, List<ProducerBatch>> batches =
this.accumulator.drain(cluster, result.readyNodes,
- this.maxRequestSize, now);
+ Map<Integer, List<ProducerBatch>> batches =
this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
+ addToInflightBatches(batches);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
@@ -280,27 +351,34 @@ public class Sender implements Runnable {
}
}
- List<ProducerBatch> expiredBatches =
this.accumulator.expiredBatches(this.requestTimeoutMs, now);
+ List<ProducerBatch> expiredInflightBatches =
getExpiredInflightBatches(now);
+ List<ProducerBatch> expiredBatches =
this.accumulator.expiredBatches(now);
+ expiredBatches.addAll(expiredInflightBatches);
+
// Reset the producer id if an expired batch has previously been sent
to the broker. Also update the metrics
// for expired batches. see the documentation of
@TransactionState.resetProducerId to understand why
// we need to reset the producer id here.
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator",
expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
- failBatch(expiredBatch, -1, NO_TIMESTAMP,
expiredBatch.timeoutException(), false);
+ String errorMessage = "Expiring " + expiredBatch.recordCount + "
record(s) for " + expiredBatch.topicPartition
+ + ":" + (now - expiredBatch.createdMs) + " ms has passed since
batch creation";
+ failBatch(expiredBatch, -1, NO_TIMESTAMP, new
TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the
current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
}
}
-
sensors.updateProduceRequestMetrics(batches);
// If we have any nodes that are ready to send + have sendable data,
poll with 0 timeout so this can immediately
- // loop and try sending more data. Otherwise, the timeout is
determined by nodes that have partitions with data
- // that isn't yet sendable (e.g. lingering, backing off). Note that
this specifically does not include nodes
- // with sendable data that aren't ready to send since they would cause
busy looping.
+ // loop and try sending more data. Otherwise, the timeout will be the
smaller value between next batch expiry
+ // time, and the delay time for checking data availability. Note that
the nodes may have data that isn't yet
+ // sendable due to lingering, backing off, etc. This specifically does
not include nodes with sendable data
+ // that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs,
notReadyTimeout);
+ pollTimeout = Math.min(pollTimeout,
this.accumulator.nextExpiryTimeMs() - now);
+ pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
// if some partitions are already ready to be sent, the select
time would be 0;
@@ -310,7 +388,6 @@ public class Sender implements Runnable {
pollTimeout = 0;
}
sendProduceRequests(batches, now);
-
return pollTimeout;
}
@@ -318,7 +395,6 @@ public class Sender implements Runnable {
if (transactionManager.isCompleting() && accumulator.hasIncomplete()) {
if (transactionManager.isAborting())
accumulator.abortUndrainedBatches(new KafkaException("Failing
batch since transaction was aborted"));
-
// There may still be requests left which are being retried. Since
we do not know whether they had
// been successfully appended to the broker log, we must resend
them until their final status is clear.
// If they had been appended and we did not receive the error,
then our sequence number would no longer
@@ -341,7 +417,6 @@ public class Sender implements Runnable {
transactionManager.lookupCoordinator(nextRequestHandler);
break;
}
-
if (!NetworkClientUtils.awaitReady(client, targetNode,
time, requestTimeoutMs)) {
transactionManager.lookupCoordinator(nextRequestHandler);
break;
@@ -353,12 +428,10 @@ public class Sender implements Runnable {
if (targetNode != null) {
if (nextRequestHandler.isRetry())
time.sleep(nextRequestHandler.retryBackoffMs());
-
- ClientRequest clientRequest =
client.newClientRequest(targetNode.idString(),
- requestBuilder, now, true, requestTimeoutMs,
nextRequestHandler);
+ ClientRequest clientRequest = client.newClientRequest(
+ targetNode.idString(), requestBuilder, now, true,
requestTimeoutMs, nextRequestHandler);
transactionManager.setInFlightTransactionalRequestCorrelationId(clientRequest.correlationId());
log.debug("Sending transactional request {} to node {}",
requestBuilder, targetNode);
-
client.send(clientRequest, now);
return true;
}
@@ -371,11 +444,9 @@ public class Sender implements Runnable {
break;
}
}
-
time.sleep(retryBackoffMs);
metadata.requestUpdate();
}
-
transactionManager.retry(nextRequestHandler);
return true;
}
@@ -442,8 +513,7 @@ public class Sender implements Runnable {
break;
}
} else {
- log.debug("Could not find an available broker to send
InitProducerIdRequest to. " +
- "We will back off and try again.");
+ log.debug("Could not find an available broker to send
InitProducerIdRequest to. Will back off and retry.");
}
} catch (UnsupportedVersionException e) {
transactionManager.transitionToFatalError(e);
@@ -466,7 +536,7 @@ public class Sender implements Runnable {
int correlationId = requestHeader.correlationId();
if (response.wasDisconnected()) {
log.trace("Cancelled request with header {} due to node {} being
disconnected",
- requestHeader, response.destination());
+ requestHeader, response.destination());
for (ProducerBatch batch : batches.values())
completeBatch(batch, new
ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId,
now, 0L);
} else if (response.versionMismatch() != null) {
@@ -511,23 +581,25 @@ public class Sender implements Runnable {
(batch.magic() >= RecordBatch.MAGIC_VALUE_V2 ||
batch.isCompressed())) {
// If the batch is too large, we split the batch and send the
split batches again. We do not decrement
// the retry attempts in this case.
- log.warn("Got error produce response in correlation id {} on
topic-partition {}, splitting and retrying ({} attempts left). Error: {}",
- correlationId,
- batch.topicPartition,
- this.retries - batch.attempts(),
- error);
+ log.warn(
+ "Got error produce response in correlation id {} on
topic-partition {}, splitting and retrying ({} attempts left). Error: {}",
+ correlationId,
+ batch.topicPartition,
+ this.retries - batch.attempts(),
+ error);
if (transactionManager != null)
transactionManager.removeInFlightBatch(batch);
this.accumulator.splitAndReenqueue(batch);
this.accumulator.deallocate(batch);
this.sensors.recordBatchSplit();
} else if (error != Errors.NONE) {
- if (canRetry(batch, response)) {
- log.warn("Got error produce response with correlation id {} on
topic-partition {}, retrying ({} attempts left). Error: {}",
- correlationId,
- batch.topicPartition,
- this.retries - batch.attempts() - 1,
- error);
+ if (canRetry(batch, response, now)) {
+ log.warn(
+ "Got error produce response with correlation id {} on
topic-partition {}, retrying ({} attempts left). Error: {}",
+ correlationId,
+ batch.topicPartition,
+ this.retries - batch.attempts() - 1,
+ error);
if (transactionManager == null) {
reenqueueBatch(batch, now);
} else if
(transactionManager.hasProducerIdAndEpoch(batch.producerId(),
batch.producerEpoch())) {
@@ -564,14 +636,14 @@ public class Sender implements Runnable {
if (error.exception() instanceof InvalidMetadataException) {
if (error.exception() instanceof
UnknownTopicOrPartitionException) {
log.warn("Received unknown topic or partition error in
produce request on partition {}. The " +
- "topic/partition may not exist or the user may not
have Describe access to it", batch.topicPartition);
+ "topic-partition may not exist or the user may not
have Describe access to it",
+ batch.topicPartition);
} else {
log.warn("Received invalid metadata error in produce
request on partition {} due to {}. Going " +
"to request metadata update now",
batch.topicPartition, error.exception().toString());
}
metadata.requestUpdate();
}
-
} else {
completeBatch(batch, response);
}
@@ -583,35 +655,43 @@ public class Sender implements Runnable {
private void reenqueueBatch(ProducerBatch batch, long currentTimeMs) {
this.accumulator.reenqueue(batch, currentTimeMs);
+ maybeRemoveFromInflightBatches(batch);
this.sensors.recordRetries(batch.topicPartition.topic(),
batch.recordCount);
}
private void completeBatch(ProducerBatch batch,
ProduceResponse.PartitionResponse response) {
if (transactionManager != null) {
if (transactionManager.hasProducerIdAndEpoch(batch.producerId(),
batch.producerEpoch())) {
-
transactionManager.maybeUpdateLastAckedSequence(batch.topicPartition,
batch.baseSequence() + batch.recordCount - 1);
- log.debug("ProducerId: {}; Set last ack'd sequence number for
topic-partition {} to {}", batch.producerId(), batch.topicPartition,
-
transactionManager.lastAckedSequence(batch.topicPartition));
+ transactionManager
+ .maybeUpdateLastAckedSequence(batch.topicPartition,
batch.baseSequence() + batch.recordCount - 1);
+ log.debug("ProducerId: {}; Set last ack'd sequence number for
topic-partition {} to {}",
+ batch.producerId(),
+ batch.topicPartition,
+
transactionManager.lastAckedSequence(batch.topicPartition));
}
transactionManager.updateLastAckedOffset(response, batch);
transactionManager.removeInFlightBatch(batch);
}
- if (batch.done(response.baseOffset, response.logAppendTime, null))
+ if (batch.done(response.baseOffset, response.logAppendTime, null)) {
+ maybeRemoveFromInflightBatches(batch);
this.accumulator.deallocate(batch);
+ }
}
- private void failBatch(ProducerBatch batch,
ProduceResponse.PartitionResponse response, RuntimeException exception, boolean
adjustSequenceNumbers) {
+ private void failBatch(ProducerBatch batch,
ProduceResponse.PartitionResponse response, RuntimeException exception,
+ boolean adjustSequenceNumbers) {
failBatch(batch, response.baseOffset, response.logAppendTime,
exception, adjustSequenceNumbers);
}
- private void failBatch(ProducerBatch batch, long baseOffset, long
logAppendTime, RuntimeException exception, boolean adjustSequenceNumbers) {
+ private void failBatch(ProducerBatch batch, long baseOffset, long
logAppendTime, RuntimeException exception,
+ boolean adjustSequenceNumbers) {
if (transactionManager != null) {
if (exception instanceof OutOfOrderSequenceException
&& !transactionManager.isTransactional()
&& transactionManager.hasProducerId(batch.producerId())) {
log.error("The broker returned {} for topic-partition " +
- "{} at offset {}. This indicates data loss on
the broker, and should be investigated.",
+ "{} at offset {}. This indicates data loss on the
broker, and should be investigated.",
exception, batch.topicPartition, baseOffset);
// Reset the transaction state since we have hit an
irrecoverable exception and cannot make any guarantees
@@ -633,19 +713,23 @@ public class Sender implements Runnable {
this.sensors.recordErrors(batch.topicPartition.topic(),
batch.recordCount);
- if (batch.done(baseOffset, logAppendTime, exception))
+ if (batch.done(baseOffset, logAppendTime, exception)) {
+ maybeRemoveFromInflightBatches(batch);
this.accumulator.deallocate(batch);
+ }
}
/**
* We can retry a send if the error is transient and the number of
attempts taken is fewer than the maximum allowed.
- * We can also retry OutOfOrderSequence exceptions for future batches,
since if the first batch has failed, the future
- * batches are certain to fail with an OutOfOrderSequence exception.
+ * We can also retry OutOfOrderSequence exceptions for future batches,
since if the first batch has failed, the
+ * future batches are certain to fail with an OutOfOrderSequence exception.
*/
- private boolean canRetry(ProducerBatch batch,
ProduceResponse.PartitionResponse response) {
- return batch.attempts() < this.retries &&
- ((response.error.exception() instanceof RetriableException) ||
- (transactionManager != null &&
transactionManager.canRetry(response, batch)));
+ private boolean canRetry(ProducerBatch batch,
ProduceResponse.PartitionResponse response, long now) {
+ return
!batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now) &&
+ batch.attempts() < this.retries &&
+ batch.finalState() == null &&
+ ((response.error.exception() instanceof RetriableException) ||
+ (transactionManager != null &&
transactionManager.canRetry(response, batch)));
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 1713d78..4f8420b 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -65,7 +65,7 @@ public class AbstractConfig {
this.values.put(update.getKey(), update.getValue());
}
definition.parse(this.values);
- this.used = Collections.synchronizedSet(new HashSet<String>());
+ this.used = Collections.synchronizedSet(new HashSet<>());
this.definition = definition;
if (doLog)
logAll();
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index c9efb82..12e467c 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -57,7 +57,7 @@ import java.util.Set;
* Map<String, String> props = new HashMap<>();
* props.put("config_with_default", "some value");
* props.put("config_with_dependents", "some other value");
- *
+ *
* Map<String, Object> configs = defs.parse(props);
* // will return "some value"
* String someConfig = (String) configs.get("config_with_default");
@@ -595,10 +595,8 @@ public class ConfigDef {
if (!configKeys.containsKey(name)) {
return;
}
-
ConfigKey key = configKeys.get(name);
ConfigValue value = configs.get(name);
-
if (key.recommender != null) {
try {
List<Object> recommendedValues =
key.recommender.validValues(name, parsed);
@@ -845,6 +843,11 @@ public class ConfigDef {
private final Number min;
private final Number max;
+ /**
+ * A numeric range with inclusive upper bound and inclusive lower
bound
+ * @param min the lower bound
+ * @param max the upper bound
+ */
private Range(Number min, Number max) {
this.min = min;
this.max = max;
@@ -860,7 +863,7 @@ public class ConfigDef {
}
/**
- * A numeric range that checks both the upper and lower bound
+ * A numeric range that checks both the upper (inclusive) and lower
bound
*/
public static Range between(Number min, Number max) {
return new Range(min, max);
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 6b41a9e..a586af8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -280,7 +280,6 @@ public class MockClient implements KafkaClient {
checkTimeoutOfPendingRequests(now);
List<ClientResponse> copy = new ArrayList<>(this.responses);
-
if (metadata != null && metadata.updateRequested()) {
MetadataUpdate metadataUpdate = metadataUpdates.poll();
if (cluster != null)
@@ -351,7 +350,9 @@ public class MockClient implements KafkaClient {
public void respond(AbstractResponse response, boolean disconnected) {
- ClientRequest request = requests.remove();
+ ClientRequest request = null;
+ if (requests.size() > 0)
+ request = requests.remove();
short version = request.requestBuilder().latestAllowedVersion();
responses.add(new ClientResponse(request.makeHeader(version),
request.callback(), request.destination(),
request.createdTimeMs(), time.milliseconds(), disconnected,
null, null, response));
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index c83fe06..634a1ab 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1679,7 +1679,7 @@ public class KafkaConsumerTest {
}
private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long>
offsets) {
- return listOffsetsResponse(offsets, Collections.<TopicPartition,
Errors>emptyMap());
+ return listOffsetsResponse(offsets, Collections.emptyMap());
}
private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long>
partitionOffsets,
@@ -1818,7 +1818,7 @@ public class KafkaConsumerTest {
requestTimeoutMs,
IsolationLevel.READ_UNCOMMITTED);
- return new KafkaConsumer<String, String>(
+ return new KafkaConsumer<>(
loggerFactory,
clientId,
consumerCoordinator,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
index 2f89d79..6a85449 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
@@ -226,40 +226,30 @@ public class ProducerBatchTest {
}
/**
- * A {@link ProducerBatch} configured using a very large linger value and
a timestamp preceding its create
- * time is interpreted correctly as not expired when the linger time is
larger than the difference
- * between now and create time by {@link ProducerBatch#maybeExpire(int,
long, long, long, boolean)}.
+ * A {@link ProducerBatch} configured using a timestamp preceding its
create time is interpreted correctly
+ * as not expired by {@link ProducerBatch#hasReachedDeliveryTimeout(long,
long)}.
*/
@Test
- public void testLargeLingerOldNowExpire() {
+ public void testBatchExpiration() {
+ long deliveryTimeoutMs = 10240;
ProducerBatch batch = new ProducerBatch(new TopicPartition("topic",
1), memoryRecordsBuilder, now);
// Set `now` to 2ms before the create time.
- assertFalse(batch.maybeExpire(10240, 100L, now - 2L, Long.MAX_VALUE,
false));
+ assertFalse(batch.hasReachedDeliveryTimeout(deliveryTimeoutMs, now -
2));
+ // Set `now` to deliveryTimeoutMs.
+ assertTrue(batch.hasReachedDeliveryTimeout(deliveryTimeoutMs, now +
deliveryTimeoutMs));
}
/**
- * A {@link ProducerBatch} configured using a very large retryBackoff
value with retry = true and a timestamp
- * preceding its create time is interpreted correctly as not expired when
the retryBackoff time is larger than the
- * difference between now and create time by {@link
ProducerBatch#maybeExpire(int, long, long, long, boolean)}.
+ * A {@link ProducerBatch} configured using a timestamp preceding its
create time is interpreted correctly
+ * * as not expired by {@link
ProducerBatch#hasReachedDeliveryTimeout(long, long)}.
*/
@Test
- public void testLargeRetryBackoffOldNowExpire() {
+ public void testBatchExpirationAfterReenqueue() {
ProducerBatch batch = new ProducerBatch(new TopicPartition("topic",
1), memoryRecordsBuilder, now);
// Set batch.retry = true
batch.reenqueued(now);
// Set `now` to 2ms before the create time.
- assertFalse(batch.maybeExpire(10240, Long.MAX_VALUE, now - 2L, 10240L,
false));
- }
-
- /**
- * A {@link ProducerBatch#maybeExpire(int, long, long, long, boolean)}
call with a now value before the create
- * time of the ProducerBatch is correctly recognized as not expired when
invoked with parameter isFull = true.
- */
- @Test
- public void testLargeFullOldNowExpire() {
- ProducerBatch batch = new ProducerBatch(new TopicPartition("topic",
1), memoryRecordsBuilder, now);
- // Set `now` to 2ms before the create time.
- assertFalse(batch.maybeExpire(10240, 10240L, now - 2L, 10240L, true));
+ assertFalse(batch.hasReachedDeliveryTimeout(10240, now - 2L));
}
@Test
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 5f48410..13b0d1b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -83,10 +83,9 @@ public class RecordAccumulatorTest {
private MockTime time = new MockTime();
private byte[] key = "key".getBytes();
private byte[] value = "value".getBytes();
- private int msgSize = DefaultRecord.sizeInBytes(0, 0, key.length,
value.length,
- Record.EMPTY_HEADERS);
+ private int msgSize = DefaultRecord.sizeInBytes(0, 0, key.length,
value.length, Record.EMPTY_HEADERS);
private Cluster cluster = new Cluster(null, Arrays.asList(node1, node2),
Arrays.asList(part1, part2, part3),
- Collections.<String>emptySet(), Collections.<String>emptySet());
+ Collections.emptySet(), Collections.emptySet());
private Metrics metrics = new Metrics(time);
private final long maxBlockTimeMs = 1000;
private final LogContext logContext = new LogContext();
@@ -255,7 +254,7 @@ public class RecordAccumulatorTest {
final int msgs = 10000;
final int numParts = 2;
final RecordAccumulator accum = createTestRecordAccumulator(
- 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
CompressionType.NONE, 0L);
+ 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
CompressionType.NONE, 0L);
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
threads.add(new Thread() {
@@ -299,8 +298,8 @@ public class RecordAccumulatorTest {
// test case assumes that the records do not fill the batch completely
int batchSize = 1025;
- RecordAccumulator accum = createTestRecordAccumulator(
- batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 *
batchSize, CompressionType.NONE, lingerMs);
+ RecordAccumulator accum = createTestRecordAccumulator(batchSize +
DefaultRecordBatch.RECORD_BATCH_OVERHEAD,
+ 10 * batchSize, CompressionType.NONE, lingerMs);
// Just short of going over the limit so we trigger linger time
int appends = expectedNumAppends(batchSize);
@@ -332,10 +331,17 @@ public class RecordAccumulatorTest {
@Test
public void testRetryBackoff() throws Exception {
- long lingerMs = Long.MAX_VALUE / 4;
- long retryBackoffMs = Long.MAX_VALUE / 2;
- final RecordAccumulator accum = new RecordAccumulator(logContext, 1024
+ DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
- CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time,
new ApiVersions(), null);
+ long lingerMs = Integer.MAX_VALUE / 16;
+ long retryBackoffMs = Integer.MAX_VALUE / 8;
+ int requestTimeoutMs = Integer.MAX_VALUE / 4;
+ long deliveryTimeoutMs = Integer.MAX_VALUE;
+ long totalSize = 10 * 1024;
+ int batchSize = 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
+ String metricGrpName = "producer-metrics";
+
+ final RecordAccumulator accum = new RecordAccumulator(logContext,
batchSize,
+ CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs,
metrics, metricGrpName, time, new ApiVersions(), null,
+ new BufferPool(totalSize, batchSize, metrics, time,
metricGrpName));
long now = time.milliseconds();
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null,
maxBlockTimeMs);
@@ -371,7 +377,7 @@ public class RecordAccumulatorTest {
@Test
public void testFlush() throws Exception {
- long lingerMs = Long.MAX_VALUE;
+ long lingerMs = Integer.MAX_VALUE;
final RecordAccumulator accum = createTestRecordAccumulator(
4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 *
1024, CompressionType.NONE, lingerMs);
@@ -413,7 +419,7 @@ public class RecordAccumulatorTest {
@Test
public void testAwaitFlushComplete() throws Exception {
RecordAccumulator accum = createTestRecordAccumulator(
- 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 *
1024, CompressionType.NONE, Long.MAX_VALUE);
+ 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
CompressionType.NONE, Long.MAX_VALUE);
accum.append(new TopicPartition(topic, 0), 0L, key, value,
Record.EMPTY_HEADERS, null, maxBlockTimeMs);
accum.beginFlush();
@@ -429,12 +435,12 @@ public class RecordAccumulatorTest {
@Test
public void testAbortIncompleteBatches() throws Exception {
- long lingerMs = Long.MAX_VALUE;
+ int lingerMs = Integer.MAX_VALUE;
int numRecords = 100;
final AtomicInteger numExceptionReceivedInCallback = new
AtomicInteger(0);
final RecordAccumulator accum = createTestRecordAccumulator(
- 128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
CompressionType.NONE, lingerMs);
+ 128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
CompressionType.NONE, lingerMs);
class TestCallback implements Callback {
@Override
public void onCompletion(RecordMetadata metadata, Exception
exception) {
@@ -468,7 +474,7 @@ public class RecordAccumulatorTest {
@Test
public void testAbortUnsentBatches() throws Exception {
- long lingerMs = Long.MAX_VALUE;
+ int lingerMs = Integer.MAX_VALUE;
int numRecords = 100;
final AtomicInteger numExceptionReceivedInCallback = new
AtomicInteger(0);
@@ -509,17 +515,65 @@ public class RecordAccumulatorTest {
assertTrue(accum.hasIncomplete());
}
+ private void doExpireBatchSingle(long deliveryTimeoutMs) throws
InterruptedException {
+ long lingerMs = 300L;
+ List<Boolean> muteStates = Arrays.asList(false, true);
+ Set<Node> readyNodes = null;
+ List<ProducerBatch> expiredBatches = new ArrayList<>();
+ // test case assumes that the records do not fill the batch completely
+ int batchSize = 1025;
+ RecordAccumulator accum =
createTestRecordAccumulator(deliveryTimeoutMs,
+ batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 *
batchSize, CompressionType.NONE, lingerMs);
+
+ // Make the batches ready due to linger. These batches are not in retry
+ for (Boolean mute: muteStates) {
+ if (time.milliseconds() < System.currentTimeMillis())
+ time.setCurrentTimeMs(System.currentTimeMillis());
+ accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null,
maxBlockTimeMs);
+ assertEquals("No partition should be ready.", 0,
accum.ready(cluster, time.milliseconds()).readyNodes.size());
+
+ time.sleep(lingerMs);
+ readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+ assertEquals("Our partition's leader should be ready",
Collections.singleton(node1), readyNodes);
+
+ expiredBatches = accum.expiredBatches(time.milliseconds());
+ assertEquals("The batch should not expire when just linger has
passed", 0, expiredBatches.size());
+
+ if (mute)
+ accum.mutePartition(tp1);
+ else
+ accum.unmutePartition(tp1, 0L);
+
+ // Advance the clock to expire the batch.
+ time.sleep(deliveryTimeoutMs - lingerMs);
+ expiredBatches = accum.expiredBatches(time.milliseconds());
+ assertEquals("The batch may expire when the partition is muted",
1, expiredBatches.size());
+ assertEquals("No partitions should be ready.", 0,
accum.ready(cluster, time.milliseconds()).readyNodes.size());
+ }
+ }
+
+ @Test
+ public void testExpiredBatchSingle() throws InterruptedException {
+ doExpireBatchSingle(3200L);
+ }
+
+ @Test
+ public void testExpiredBatchSingleMaxValue() throws InterruptedException {
+ doExpireBatchSingle(Long.MAX_VALUE);
+ }
+
@Test
public void testExpiredBatches() throws InterruptedException {
long retryBackoffMs = 100L;
- long lingerMs = 3000L;
+ long lingerMs = 30L;
int requestTimeout = 60;
+ long deliveryTimeoutMs = 3200L;
// test case assumes that the records do not fill the batch completely
int batchSize = 1025;
RecordAccumulator accum = createTestRecordAccumulator(
- batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 *
batchSize, CompressionType.NONE, lingerMs);
+ deliveryTimeoutMs, batchSize +
DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE,
lingerMs);
int appends = expectedNumAppends(batchSize);
// Test batches not in retry
@@ -532,14 +586,14 @@ public class RecordAccumulatorTest {
Set<Node> readyNodes = accum.ready(cluster,
time.milliseconds()).readyNodes;
assertEquals("Our partition's leader should be ready",
Collections.singleton(node1), readyNodes);
// Advance the clock to expire the batch.
- time.sleep(requestTimeout + 1);
+ time.sleep(deliveryTimeoutMs + 1);
accum.mutePartition(tp1);
- List<ProducerBatch> expiredBatches =
accum.expiredBatches(requestTimeout, time.milliseconds());
- assertEquals("The batch should not be expired when the partition is
muted", 0, expiredBatches.size());
+ List<ProducerBatch> expiredBatches =
accum.expiredBatches(time.milliseconds());
+ assertEquals("The batches will be muted no matter if the partition is
muted or not", 2, expiredBatches.size());
accum.unmutePartition(tp1, 0L);
- expiredBatches = accum.expiredBatches(requestTimeout,
time.milliseconds());
- assertEquals("The batch should be expired", 1, expiredBatches.size());
+ expiredBatches = accum.expiredBatches(time.milliseconds());
+ assertEquals("All batches should have been expired earlier", 0,
expiredBatches.size());
assertEquals("No partitions should be ready.", 0, accum.ready(cluster,
time.milliseconds()).readyNodes.size());
// Advance the clock to make the next batch ready due to linger.ms
@@ -548,12 +602,12 @@ public class RecordAccumulatorTest {
time.sleep(requestTimeout + 1);
accum.mutePartition(tp1);
- expiredBatches = accum.expiredBatches(requestTimeout,
time.milliseconds());
+ expiredBatches = accum.expiredBatches(time.milliseconds());
assertEquals("The batch should not be expired when metadata is still
available and partition is muted", 0, expiredBatches.size());
accum.unmutePartition(tp1, 0L);
- expiredBatches = accum.expiredBatches(requestTimeout,
time.milliseconds());
- assertEquals("The batch should be expired when the partition is not
muted", 1, expiredBatches.size());
+ expiredBatches = accum.expiredBatches(time.milliseconds());
+ assertEquals("All batches should have been expired", 0,
expiredBatches.size());
assertEquals("No partitions should be ready.", 0, accum.ready(cluster,
time.milliseconds()).readyNodes.size());
// Test batches in retry.
@@ -569,17 +623,17 @@ public class RecordAccumulatorTest {
// test expiration.
time.sleep(requestTimeout + retryBackoffMs);
- expiredBatches = accum.expiredBatches(requestTimeout,
time.milliseconds());
+ expiredBatches = accum.expiredBatches(time.milliseconds());
assertEquals("The batch should not be expired.", 0,
expiredBatches.size());
time.sleep(1L);
accum.mutePartition(tp1);
- expiredBatches = accum.expiredBatches(requestTimeout,
time.milliseconds());
+ expiredBatches = accum.expiredBatches(time.milliseconds());
assertEquals("The batch should not be expired when the partition is
muted", 0, expiredBatches.size());
accum.unmutePartition(tp1, 0L);
- expiredBatches = accum.expiredBatches(requestTimeout,
time.milliseconds());
- assertEquals("The batch should be expired when the partition is not
muted.", 1, expiredBatches.size());
+ expiredBatches = accum.expiredBatches(time.milliseconds());
+ assertEquals("All batches should have been expired.", 0,
expiredBatches.size());
// Test that when being throttled muted batches are expired before the
throttle time is over.
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0);
@@ -589,20 +643,20 @@ public class RecordAccumulatorTest {
// Advance the clock to expire the batch.
time.sleep(requestTimeout + 1);
accum.mutePartition(tp1);
- expiredBatches = accum.expiredBatches(requestTimeout,
time.milliseconds());
+ expiredBatches = accum.expiredBatches(time.milliseconds());
assertEquals("The batch should not be expired when the partition is
muted", 0, expiredBatches.size());
long throttleTimeMs = 100L;
accum.unmutePartition(tp1, time.milliseconds() + throttleTimeMs);
// The batch shouldn't be expired yet.
- expiredBatches = accum.expiredBatches(requestTimeout,
time.milliseconds());
+ expiredBatches = accum.expiredBatches(time.milliseconds());
assertEquals("The batch should not be expired when the partition is
muted", 0, expiredBatches.size());
// Once the throttle time is over, the batch can be expired.
time.sleep(throttleTimeMs);
- expiredBatches = accum.expiredBatches(requestTimeout,
time.milliseconds());
- assertEquals("The batch should be expired", 1, expiredBatches.size());
- assertEquals("No partitions should be ready.", 0, accum.ready(cluster,
time.milliseconds()).readyNodes.size());
+ expiredBatches = accum.expiredBatches(time.milliseconds());
+ assertEquals("All batches should have been expired earlier", 0,
expiredBatches.size());
+ assertEquals("No partitions should be ready.", 1, accum.ready(cluster,
time.milliseconds()).readyNodes.size());
}
@Test
@@ -646,10 +700,18 @@ public class RecordAccumulatorTest {
// Simulate talking to an older broker, ie. one which supports a lower
magic.
ApiVersions apiVersions = new ApiVersions();
int batchSize = 1025;
+ int requestTimeoutMs = 1600;
+ long deliveryTimeoutMs = 3200L;
+ long lingerMs = 10L;
+ long retryBackoffMs = 100L;
+ long totalSize = 10 * batchSize;
+ String metricGrpName = "producer-metrics";
+
apiVersions.update("foobar", NodeApiVersions.create(Arrays.asList(new
ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id,
(short) 0, (short) 2))));
- RecordAccumulator accum = new RecordAccumulator(logContext, batchSize
+ DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
- CompressionType.NONE, 10, 100L, metrics, time, apiVersions,
new TransactionManager());
+ RecordAccumulator accum = new RecordAccumulator(logContext, batchSize
+ DefaultRecordBatch.RECORD_BATCH_OVERHEAD,
+ CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs,
metrics, metricGrpName, time, apiVersions, new TransactionManager(),
+ new BufferPool(totalSize, batchSize, metrics, time,
metricGrpName));
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0);
}
@@ -727,9 +789,9 @@ public class RecordAccumulatorTest {
assertFalse(drained.get(node1.id()).isEmpty());
}
assertTrue("All the batches should have been drained.",
- accum.ready(cluster,
time.milliseconds()).readyNodes.isEmpty());
+ accum.ready(cluster,
time.milliseconds()).readyNodes.isEmpty());
assertEquals("The split batches should be allocated off the
accumulator",
- bufferCapacity, accum.bufferPoolAvailableMemory());
+ bufferCapacity, accum.bufferPoolAvailableMemory());
}
@Test
@@ -760,8 +822,78 @@ public class RecordAccumulatorTest {
numSplit += result.numSplit;
numBatches += result.numBatches;
assertTrue(String.format("Total num batches = %d, split batches =
%d, more than 10%% of the batch splits. "
- + "Random seed is " + seed,
- numBatches, numSplit), (double) numSplit
/ numBatches < 0.1f);
+ + "Random seed is " + seed,
+ numBatches, numSplit), (double) numSplit / numBatches < 0.1f);
+ }
+ }
+
+ @Test
+ public void testSoonToExpireBatchesArePickedUpForExpiry() throws
InterruptedException {
+ long lingerMs = 500L;
+ int batchSize = 1025;
+
+ RecordAccumulator accum = createTestRecordAccumulator(
+ batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 *
batchSize, CompressionType.NONE, lingerMs);
+
+ accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null,
maxBlockTimeMs);
+ Set<Node> readyNodes = accum.ready(cluster,
time.milliseconds()).readyNodes;
+ Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster,
readyNodes, Integer.MAX_VALUE, time.milliseconds());
+ assertTrue(drained.isEmpty());
+ //assertTrue(accum.soonToExpireInFlightBatches().isEmpty());
+
+ // advanced clock and send one batch out but it should not be included
in soon to expire inflight
+ // batches because batch's expiry is quite far.
+ time.sleep(lingerMs + 1);
+ readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+ drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE,
time.milliseconds());
+ assertEquals("A batch did not drain after linger", 1, drained.size());
+ //assertTrue(accum.soonToExpireInFlightBatches().isEmpty());
+
+ // Queue another batch and advance clock such that batch expiry time
is earlier than request timeout.
+ accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null,
maxBlockTimeMs);
+ time.sleep(lingerMs * 4);
+
+ // Now drain and check that accumulator picked up the drained batch
because its expiry is soon.
+ readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+ drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE,
time.milliseconds());
+ assertEquals("A batch did not drain after linger", 1, drained.size());
+ }
+
+ @Test
+ public void testExpiredBatchesRetry() throws InterruptedException {
+ int lingerMs = 3000;
+ int rtt = 1000;
+ int deliveryTimeoutMs = 3200;
+ Set<Node> readyNodes;
+ List<ProducerBatch> expiredBatches;
+ List<Boolean> muteStates = Arrays.asList(false, true);
+
+ // test case assumes that the records do not fill the batch completely
+ int batchSize = 1025;
+ RecordAccumulator accum = createTestRecordAccumulator(
+ batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 *
batchSize, CompressionType.NONE, lingerMs);
+
+ // Test batches in retry.
+ for (Boolean mute: muteStates) {
+ accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0);
+ time.sleep(lingerMs);
+ readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+ assertEquals("Our partition's leader should be ready",
Collections.singleton(node1), readyNodes);
+ Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster,
readyNodes, Integer.MAX_VALUE, time.milliseconds());
+ assertEquals("There should be only one batch.", 1,
drained.get(node1.id()).size());
+ time.sleep(rtt);
+ accum.reenqueue(drained.get(node1.id()).get(0),
time.milliseconds());
+
+ if (mute)
+ accum.mutePartition(tp1);
+ else
+ accum.unmutePartition(tp1, 0L);
+
+ // test expiration
+ time.sleep(deliveryTimeoutMs - rtt);
+ accum.drain(cluster, Collections.singleton(node1),
Integer.MAX_VALUE, time.milliseconds());
+ expiredBatches = accum.expiredBatches(time.milliseconds());
+ assertEquals("RecordAccumulator has expired batches if the
partition is not muted", mute ? 1 : 0, expiredBatches.size());
}
}
@@ -852,7 +984,7 @@ public class RecordAccumulatorTest {
int offsetDelta = 0;
while (true) {
int recordSize = DefaultRecord.sizeInBytes(offsetDelta, 0,
key.length, value.length,
- Record.EMPTY_HEADERS);
+ Record.EMPTY_HEADERS);
if (size + recordSize > batchSize)
return offsetDelta;
offsetDelta += 1;
@@ -860,20 +992,32 @@ public class RecordAccumulatorTest {
}
}
+
+ private RecordAccumulator createTestRecordAccumulator(int batchSize, long
totalSize, CompressionType type, long lingerMs) {
+ long deliveryTimeoutMs = 3200L;
+ return createTestRecordAccumulator(deliveryTimeoutMs, batchSize,
totalSize, type, lingerMs);
+ }
+
/**
* Return a test RecordAccumulator instance
*/
- private RecordAccumulator createTestRecordAccumulator(int batchSize, long
totalSize, CompressionType type, long lingerMs) {
+ private RecordAccumulator createTestRecordAccumulator(long
deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, long
lingerMs) {
+ long retryBackoffMs = 100L;
+ int requestTimeoutMs = 1600;
+ String metricGrpName = "producer-metrics";
+
return new RecordAccumulator(
- logContext,
- batchSize,
- totalSize,
- type,
- lingerMs,
- 100L,
- metrics,
- time,
- new ApiVersions(),
- null);
+ logContext,
+ batchSize,
+ type,
+ lingerMs,
+ retryBackoffMs,
+ deliveryTimeoutMs,
+ metrics,
+ metricGrpName,
+ time,
+ new ApiVersions(),
+ null,
+ new BufferPool(totalSize, batchSize, metrics, time,
metricGrpName));
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index d87c8f9..2fbe3df 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -16,6 +16,21 @@
*/
package org.apache.kafka.clients.producer.internals;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.Metadata;
@@ -62,6 +77,7 @@ import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.DelayedReceive;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestUtils;
@@ -69,25 +85,10 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class SenderTest {
@@ -131,10 +132,12 @@ public class SenderTest {
sender.run(time.milliseconds()); // connect
sender.run(time.milliseconds()); // send produce request
assertEquals("We should have a single produce request in flight.", 1,
client.inFlightRequestCount());
+ assertEquals(1, sender.inFlightBatches(tp0).size());
assertTrue(client.hasInFlightRequests());
client.respond(produceResponse(tp0, offset, Errors.NONE, 0));
sender.run(time.milliseconds());
assertEquals("All requests completed.", 0,
client.inFlightRequestCount());
+ assertEquals(0, sender.inFlightBatches(tp0).size());
assertFalse(client.hasInFlightRequests());
sender.run(time.milliseconds());
assertTrue("Request should be completed", future.isDone());
@@ -328,33 +331,42 @@ public class SenderTest {
Node node = new Node(Integer.parseInt(id), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertTrue(client.hasInFlightRequests());
+ assertEquals(1, sender.inFlightBatches(tp0).size());
assertTrue("Client ready status should be true",
client.isReady(node, 0L));
client.disconnect(id);
assertEquals(0, client.inFlightRequestCount());
assertFalse(client.hasInFlightRequests());
assertFalse("Client ready status should be false",
client.isReady(node, 0L));
+ // the batch is in accumulator.inFlightBatches until it expires
+ assertEquals(1, sender.inFlightBatches(tp0).size());
sender.run(time.milliseconds()); // receive error
sender.run(time.milliseconds()); // reconnect
sender.run(time.milliseconds()); // resend
assertEquals(1, client.inFlightRequestCount());
assertTrue(client.hasInFlightRequests());
+ assertEquals(1, sender.inFlightBatches(tp0).size());
long offset = 0;
client.respond(produceResponse(tp0, offset, Errors.NONE, 0));
sender.run(time.milliseconds());
assertTrue("Request should have retried and completed",
future.isDone());
assertEquals(offset, future.get().offset());
+ assertEquals(0, sender.inFlightBatches(tp0).size());
// do an unsuccessful retry
future = accumulator.append(tp0, 0L, "key".getBytes(),
"value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // send produce request
+ assertEquals(1, sender.inFlightBatches(tp0).size());
for (int i = 0; i < maxRetries + 1; i++) {
client.disconnect(client.requests().peek().destination());
sender.run(time.milliseconds()); // receive error
+ assertEquals(0, sender.inFlightBatches(tp0).size());
sender.run(time.milliseconds()); // reconnect
sender.run(time.milliseconds()); // resend
+ assertEquals(i > 0 ? 0 : 1,
sender.inFlightBatches(tp0).size());
}
sender.run(time.milliseconds());
assertFutureFailure(future, NetworkException.class);
+ assertEquals(0, sender.inFlightBatches(tp0).size());
} finally {
m.close();
}
@@ -371,7 +383,7 @@ public class SenderTest {
senderMetrics, time, REQUEST_TIMEOUT, 50, null,
apiVersions);
// Create a two broker cluster, with partition 0 on broker 0 and
partition 1 on broker 1
Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
- metadata.update(cluster1, Collections.<String>emptySet(),
time.milliseconds());
+ metadata.update(cluster1, Collections.emptySet(),
time.milliseconds());
// Send the first message.
TopicPartition tp2 = new TopicPartition("test", 1);
@@ -384,6 +396,7 @@ public class SenderTest {
assertEquals(1, client.inFlightRequestCount());
assertTrue(client.hasInFlightRequests());
assertTrue("Client ready status should be true",
client.isReady(node, 0L));
+ assertEquals(1, sender.inFlightBatches(tp2).size());
time.sleep(900);
// Now send another message to tp2
@@ -391,11 +404,13 @@ public class SenderTest {
// Update metadata before sender receives response from broker 0.
Now partition 2 moves to broker 0
Cluster cluster2 = TestUtils.singletonCluster("test", 2);
- metadata.update(cluster2, Collections.<String>emptySet(),
time.milliseconds());
+ metadata.update(cluster2, Collections.emptySet(),
time.milliseconds());
// Sender should not send the second message to node 0.
- sender.run(time.milliseconds());
+ assertEquals(1, sender.inFlightBatches(tp2).size());
+ sender.run(time.milliseconds()); // receive the response for the
previous send, and send the new batch
assertEquals(1, client.inFlightRequestCount());
assertTrue(client.hasInFlightRequests());
+ assertEquals(1, sender.inFlightBatches(tp2).size());
} finally {
m.close();
}
@@ -429,14 +444,18 @@ public class SenderTest {
// Advance the clock to expire the first batch.
time.sleep(10000);
+
+ Node clusterNode = this.cluster.nodes().get(0);
+ Map<Integer, List<ProducerBatch>> drainedBatches =
+ accumulator.drain(cluster, Collections.singleton(clusterNode),
Integer.MAX_VALUE, time.milliseconds());
+ sender.addToInflightBatches(drainedBatches);
+
// Disconnect the target node for the pending produce request. This
will ensure that sender will try to
// expire the batch.
- Node clusterNode = this.cluster.nodes().get(0);
client.disconnect(clusterNode.idString());
client.blackout(clusterNode, 100);
sender.run(time.milliseconds()); // We should try to flush the batch,
but we expire it instead without sending anything.
-
assertEquals("Callbacks not invoked for expiry", messagesPerBatch,
expiryCallbackCount.get());
assertNull("Unexpected exception", unexpectedException.get());
// Make sure that the reconds were appended back to the batch.
@@ -463,6 +482,7 @@ public class SenderTest {
sender.run(time.milliseconds());
assertEquals("Request completed.", 0, client.inFlightRequestCount());
assertFalse(client.hasInFlightRequests());
+ assertEquals(0, sender.inFlightBatches(tp0).size());
sender.run(time.milliseconds());
assertTrue("Request should be completed", future.isDone());
@@ -479,6 +499,7 @@ public class SenderTest {
sender.run(time.milliseconds());
assertEquals("Request completed.", 0, client.inFlightRequestCount());
assertFalse(client.hasInFlightRequests());
+ assertEquals(0, sender.inFlightBatches(tp0).size());
sender.run(time.milliseconds());
assertTrue("Request should be completed", future.isDone());
}
@@ -520,6 +541,7 @@ public class SenderTest {
Node node = new Node(Integer.parseInt(id), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertTrue(client.hasInFlightRequests());
+ assertEquals(1, sender.inFlightBatches(tp0).size());
assertTrue("Client ready status should be true", client.isReady(node,
0L));
assertFalse(future.isDone());
@@ -583,6 +605,7 @@ public class SenderTest {
sender.run(time.milliseconds()); // receive response 1
assertEquals(1, transactionManager.lastAckedSequence(tp0));
assertFalse(client.hasInFlightRequests());
+ assertEquals(0, sender.inFlightBatches(tp0).size());
assertTrue(request2.isDone());
assertEquals(1, request2.get().offset());
}
@@ -654,11 +677,12 @@ public class SenderTest {
assertEquals(0, transactionManager.lastAckedSequence(tp0));
assertTrue(request1.isDone());
assertEquals(0, request1.get().offset());
-
-
assertFalse(client.hasInFlightRequests());
+ assertEquals(0, sender.inFlightBatches(tp0).size());
+
sender.run(time.milliseconds()); // send request 2;
assertEquals(1, client.inFlightRequestCount());
+ assertEquals(1, sender.inFlightBatches(tp0).size());
sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);
sender.run(time.milliseconds()); // receive response 2
@@ -667,17 +691,19 @@ public class SenderTest {
assertEquals(1, request2.get().offset());
assertFalse(client.hasInFlightRequests());
+ assertEquals(0, sender.inFlightBatches(tp0).size());
sender.run(time.milliseconds()); // send request 3
assertEquals(1, client.inFlightRequestCount());
+ assertEquals(1, sender.inFlightBatches(tp0).size());
sendIdempotentProducerResponse(2, tp0, Errors.NONE, 2L);
sender.run(time.milliseconds()); // receive response 3, send request
4 since we are out of 'retry' mode.
assertEquals(2, transactionManager.lastAckedSequence(tp0));
assertTrue(request3.isDone());
assertEquals(2, request3.get().offset());
-
assertEquals(1, client.inFlightRequestCount());
+ assertEquals(1, sender.inFlightBatches(tp0).size());
sendIdempotentProducerResponse(3, tp0, Errors.NONE, 3L);
sender.run(time.milliseconds()); // receive response 4
@@ -795,7 +821,6 @@ public class SenderTest {
setupWithTransactionState(transactionManager);
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
-
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
// Send first ProduceRequest
@@ -965,46 +990,54 @@ public class SenderTest {
public void
testExpiryOfFirstBatchShouldNotCauseUnresolvedSequencesIfFutureBatchesSucceed()
throws Exception {
final long producerId = 343434L;
TransactionManager transactionManager = new TransactionManager();
- setupWithTransactionState(transactionManager);
+ setupWithTransactionState(transactionManager, false, null);
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
-
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
// Send first ProduceRequest
Future<RecordMetadata> request1 = accumulator.append(tp0,
time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null,
MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // send request
+ // We separate the two appends by 1 second so that the two batches
+ // don't expire at the same time.
+ time.sleep(1000L);
Future<RecordMetadata> request2 = accumulator.append(tp0,
time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null,
MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // send request
-
assertEquals(2, client.inFlightRequestCount());
+ assertEquals(2, sender.inFlightBatches(tp0).size());
sendIdempotentProducerResponse(0, tp0, Errors.REQUEST_TIMED_OUT, -1);
sender.run(time.milliseconds()); // receive first response
+ assertEquals(1, sender.inFlightBatches(tp0).size());
Node node = this.cluster.nodes().get(0);
- time.sleep(10000L);
+ // We add 600 millis to expire the first batch but not the second.
+ // Note deliveryTimeoutMs is 1500.
+ time.sleep(600L);
client.disconnect(node.idString());
client.blackout(node, 10);
sender.run(time.milliseconds()); // now expire the first batch.
assertFutureFailure(request1, TimeoutException.class);
assertTrue(transactionManager.hasUnresolvedSequence(tp0));
+ assertEquals(0, sender.inFlightBatches(tp0).size());
+
// let's enqueue another batch, which should not be dequeued until the
unresolved state is clear.
Future<RecordMetadata> request3 = accumulator.append(tp0,
time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null,
MAX_BLOCK_TIMEOUT).future;
-
time.sleep(20);
-
assertFalse(request2.isDone());
sender.run(time.milliseconds()); // send second request
sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1);
+ assertEquals(1, sender.inFlightBatches(tp0).size());
+
sender.run(time.milliseconds()); // receive second response, the third
request shouldn't be sent since we are in an unresolved state.
assertTrue(request2.isDone());
assertEquals(1, request2.get().offset());
- Deque<ProducerBatch> batches = accumulator.batches().get(tp0);
+ assertEquals(0, sender.inFlightBatches(tp0).size());
+ Deque<ProducerBatch> batches = accumulator.batches().get(tp0);
assertEquals(1, batches.size());
assertFalse(batches.peekFirst().hasSequence());
assertFalse(client.hasInFlightRequests());
@@ -1017,6 +1050,7 @@ public class SenderTest {
assertEquals(0, batches.size());
assertEquals(1, client.inFlightRequestCount());
assertFalse(request3.isDone());
+ assertEquals(1, sender.inFlightBatches(tp0).size());
}
@Test
@@ -1026,13 +1060,13 @@ public class SenderTest {
setupWithTransactionState(transactionManager);
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
-
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
// Send first ProduceRequest
Future<RecordMetadata> request1 = accumulator.append(tp0,
time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null,
MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // send request
+ time.sleep(1000L);
Future<RecordMetadata> request2 = accumulator.append(tp0,
time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null,
MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // send request
@@ -1042,7 +1076,7 @@ public class SenderTest {
sender.run(time.milliseconds()); // receive first response
Node node = this.cluster.nodes().get(0);
- time.sleep(10000L);
+ time.sleep(1000L);
client.disconnect(node.idString());
client.blackout(node, 10);
@@ -1053,9 +1087,7 @@ public class SenderTest {
Future<RecordMetadata> request3 = accumulator.append(tp0,
time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null,
MAX_BLOCK_TIMEOUT).future;
time.sleep(20);
-
assertFalse(request2.isDone());
-
sender.run(time.milliseconds()); // send second request
sendIdempotentProducerResponse(1, tp0,
Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 1);
sender.run(time.milliseconds()); // receive second response, the third
request shouldn't be sent since we are in an unresolved state.
@@ -1087,12 +1119,12 @@ public class SenderTest {
Future<RecordMetadata> request1 = accumulator.append(tp0, 0L,
"key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // send request
sendIdempotentProducerResponse(0, tp0,
Errors.NOT_LEADER_FOR_PARTITION, -1);
- sender.run(time.milliseconds()); // receive response
+ sender.run(time.milliseconds()); // receive response
assertEquals(1L, transactionManager.sequenceNumber(tp0).longValue());
Node node = this.cluster.nodes().get(0);
- time.sleep(10000L);
+ time.sleep(15000L);
client.disconnect(node.idString());
client.blackout(node, 10);
@@ -1520,7 +1552,6 @@ public class SenderTest {
RecordBatch firstBatch = batchIterator.next();
assertFalse(batchIterator.hasNext());
assertEquals(expectedSequence, firstBatch.baseSequence());
-
return true;
}
}, produceResponse(tp, responseOffset, responseError, 0,
logStartOffset));
@@ -1754,11 +1785,13 @@ public class SenderTest {
sender.run(time.milliseconds()); // send.
assertEquals(1, client.inFlightRequestCount());
+ assertEquals(1, sender.inFlightBatches(tp0).size());
client.respond(produceResponse(tp0, 0,
Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 0));
sender.run(time.milliseconds());
assertTrue(responseFuture.isDone());
+ assertEquals(0, sender.inFlightBatches(tp0).size());
assertFalse("Expected transaction state to be reset upon receiving an
OutOfOrderSequenceException", transactionManager.hasProducerId());
}
@@ -1794,11 +1827,15 @@ public class SenderTest {
TopicPartition tp) throws Exception {
int maxRetries = 1;
String topic = tp.topic();
+ long deliveryTimeoutMs = 3000L;
+ long totalSize = 1024 * 1024;
+ String metricGrpName = "producer-metrics";
// Set a good compression ratio.
CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP,
0.2f);
try (Metrics m = new Metrics()) {
- accumulator = new RecordAccumulator(logContext, batchSize, 1024 *
1024, CompressionType.GZIP, 0L, 0L, m, time,
- new ApiVersions(), txnManager);
+ accumulator = new RecordAccumulator(logContext, batchSize,
CompressionType.GZIP,
+ 0L, 0L, deliveryTimeoutMs, m, metricGrpName, time, new
ApiVersions(), txnManager,
+ new BufferPool(totalSize, batchSize, metrics, time,
"producer-internal-metrics"));
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Sender sender = new Sender(logContext, client, metadata,
this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
senderMetrics, time, REQUEST_TIMEOUT, 1000L, txnManager,
new ApiVersions());
@@ -1865,9 +1902,153 @@ public class SenderTest {
assertEquals("The last ack'd sequence number should be 1", 1,
txnManager.lastAckedSequence(tp));
assertEquals("Offset of the first message should be 1", 1L,
f2.get().offset());
assertTrue("There should be no batch in the accumulator",
accumulator.batches().get(tp).isEmpty());
+ assertTrue("There should be a split",
m.metrics().get(senderMetrics.batchSplitRate).value() > 0);
+ }
+ }
+
+ @Test
+ public void testNoDoubleDeallocation() throws Exception {
+ long deliverTimeoutMs = 1500L;
+ long totalSize = 1024 * 1024;
+ String metricGrpName = "producer-custom-metrics";
+ MatchingBufferPool pool = new MatchingBufferPool(totalSize, batchSize,
metrics, time, metricGrpName);
+ setupWithTransactionState(null, false, pool);
- assertTrue("There should be a split",
- m.metrics().get(senderMetrics.batchSplitRate).value() > 0);
+ // Send first ProduceRequest
+ Future<RecordMetadata> request1 =
+ accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+ sender.run(time.milliseconds()); // send request
+ assertEquals(1, client.inFlightRequestCount());
+ assertEquals(1, sender.inFlightBatches(tp0).size());
+
+ time.sleep(deliverTimeoutMs);
+ assertFalse(pool.allMatch());
+
+ sender.run(time.milliseconds()); // expire the batch
+ assertTrue(request1.isDone());
+ assertTrue("The batch should have been de-allocated", pool.allMatch());
+ assertTrue(pool.allMatch());
+
+ sender.run(time.milliseconds());
+ assertTrue("The batch should have been de-allocated", pool.allMatch());
+ assertEquals(0, client.inFlightRequestCount());
+ assertEquals(0, sender.inFlightBatches(tp0).size());
+ }
+
+ @Test
+ public void testInflightBatchesExpireOnDeliveryTimeout() throws
InterruptedException {
+ long deliveryTimeoutMs = 1500L;
+ setupWithTransactionState(null, true, null);
+
+ // Send first ProduceRequest
+ Future<RecordMetadata> request = accumulator.append(tp0,
time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null,
MAX_BLOCK_TIMEOUT).future;
+ sender.run(time.milliseconds()); // send request
+ assertEquals(1, client.inFlightRequestCount());
+ assertEquals("Expect one in-flight batch in accumulator", 1,
sender.inFlightBatches(tp0).size());
+
+ Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap =
new HashMap<>();
+ responseMap.put(tp0, new
ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
+ client.respond(new ProduceResponse(responseMap));
+
+ time.sleep(deliveryTimeoutMs);
+ sender.run(time.milliseconds()); // receive first response
+ assertEquals("Expect zero in-flight batch in accumulator", 0,
sender.inFlightBatches(tp0).size());
+ try {
+ request.get();
+ fail("The expired batch should throw a TimeoutException");
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof TimeoutException);
+ }
+ }
+
+ @Test
+ public void testWhenFirstBatchExpireNoSendSecondBatchIfGuaranteeOrder()
throws InterruptedException {
+ long deliveryTimeoutMs = 1500L;
+ setupWithTransactionState(null, true, null);
+
+ // Send first ProduceRequest
+ accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
+ sender.run(time.milliseconds()); // send request
+ assertEquals(1, client.inFlightRequestCount());
+ assertEquals(1, sender.inFlightBatches(tp0).size());
+
+ time.sleep(deliveryTimeoutMs / 2);
+
+ // Send second ProduceRequest
+ accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
+ sender.run(time.milliseconds()); // must not send request because the
partition is muted
+ assertEquals(1, client.inFlightRequestCount());
+ assertEquals(1, sender.inFlightBatches(tp0).size());
+
+ time.sleep(deliveryTimeoutMs / 2); // expire the first batch only
+
+ client.respond(produceResponse(tp0, 0L, Errors.NONE, 0, 0L));
+ sender.run(time.milliseconds()); // receive response (offset=0)
+ assertEquals(0, client.inFlightRequestCount());
+ assertEquals(0, sender.inFlightBatches(tp0).size());
+
+ sender.run(time.milliseconds()); // Drain the second request only
this time
+ assertEquals(1, client.inFlightRequestCount());
+ assertEquals(1, sender.inFlightBatches(tp0).size());
+ }
+
+ @Test
+ public void testExpiredBatchDoesNotRetry() throws Exception {
+ long deliverTimeoutMs = 1500L;
+ setupWithTransactionState(null, false, null);
+
+ // Send first ProduceRequest
+ Future<RecordMetadata> request1 =
+ accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), null, null,
+ MAX_BLOCK_TIMEOUT).future;
+ sender.run(time.milliseconds()); // send request
+ assertEquals(1, client.inFlightRequestCount());
+ time.sleep(deliverTimeoutMs);
+
+ Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap =
new HashMap<>();
+ responseMap.put(tp0, new
ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
+ client.respond(produceResponse(tp0, -1,
Errors.NOT_LEADER_FOR_PARTITION, -1)); // return a retriable error
+
+ sender.run(time.milliseconds()); // expire the batch
+ assertTrue(request1.isDone());
+ assertEquals(0, client.inFlightRequestCount());
+ assertEquals(0, sender.inFlightBatches(tp0).size());
+
+ sender.run(time.milliseconds()); // receive first response and do not
reenqueue.
+ assertEquals(0, client.inFlightRequestCount());
+ assertEquals(0, sender.inFlightBatches(tp0).size());
+
+ sender.run(time.milliseconds()); // run again and must not send
anything.
+ assertEquals(0, client.inFlightRequestCount());
+ assertEquals(0, sender.inFlightBatches(tp0).size());
+ }
+
+ private class MatchingBufferPool extends BufferPool {
+ IdentityHashMap<ByteBuffer, Boolean> allocatedBuffers;
+
+ MatchingBufferPool(long totalSize, int batchSize, Metrics metrics,
Time time, String metricGrpName) {
+ super(totalSize, batchSize, metrics, time, metricGrpName);
+ allocatedBuffers = new IdentityHashMap<>();
+ }
+
+ @Override
+ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws
InterruptedException {
+ ByteBuffer buffer = super.allocate(size, maxTimeToBlockMs);
+ allocatedBuffers.put(buffer, Boolean.TRUE);
+ return buffer;
+ }
+
+ @Override
+ public void deallocate(ByteBuffer buffer, int size) {
+ if (!allocatedBuffers.containsKey(buffer)) {
+ throw new IllegalStateException("Deallocating a buffer that is
not allocated");
+ }
+ allocatedBuffers.remove(buffer);
+ super.deallocate(buffer, size);
+ }
+
+ public boolean allMatch() {
+ return allocatedBuffers.isEmpty();
}
}
@@ -1931,17 +2112,29 @@ public class SenderTest {
}
private void setupWithTransactionState(TransactionManager
transactionManager) {
+ setupWithTransactionState(transactionManager, false, null);
+ }
+
+ private void setupWithTransactionState(TransactionManager
transactionManager, boolean guaranteeOrder, BufferPool customPool) {
+ long totalSize = 1024 * 1024;
+ String metricGrpName = "producer-metrics";
Map<String, String> metricTags = new LinkedHashMap<>();
metricTags.put("client-id", CLIENT_ID);
MetricConfig metricConfig = new MetricConfig().tags(metricTags);
this.metrics = new Metrics(metricConfig, time);
- this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 *
1024, CompressionType.NONE, 0L, 0L, metrics, time,
- apiVersions, transactionManager);
- this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
+ BufferPool pool = (customPool == null) ? new BufferPool(totalSize,
batchSize, metrics, time, metricGrpName) : customPool;
+ setupWithTransactionState(transactionManager, guaranteeOrder,
metricTags, pool);
+ }
- this.sender = new Sender(logContext, this.client, this.metadata,
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
- Integer.MAX_VALUE, this.senderMetricsRegistry, this.time,
REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
- this.metadata.update(this.cluster, Collections.<String>emptySet(),
time.milliseconds());
+ private void setupWithTransactionState(TransactionManager
transactionManager, boolean guaranteeOrder, Map<String, String> metricTags,
BufferPool pool) {
+ long deliveryTimeoutMs = 1500L;
+ String metricGrpName = "producer-metrics";
+ this.accumulator = new RecordAccumulator(logContext, batchSize,
CompressionType.NONE, 0L, 0L,
+ deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions,
transactionManager, pool);
+ this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
+ this.sender = new Sender(logContext, this.client, this.metadata,
this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL,
+ Integer.MAX_VALUE, this.senderMetricsRegistry, this.time,
REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+ this.metadata.update(this.cluster, Collections.emptySet(),
time.milliseconds());
}
private void assertSendFailure(Class<? extends RuntimeException>
expectedError) throws Exception {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 558ec72..550d003 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -118,6 +118,10 @@ public class TransactionManagerTest {
Map<String, String> metricTags = new LinkedHashMap<>();
metricTags.put("client-id", CLIENT_ID);
int batchSize = 16 * 1024;
+ int requestTimeoutMs = 1500;
+ long deliveryTimeoutMs = 3000L;
+ long totalSize = 1024 * 1024;
+ String metricGrpName = "producer-metrics";
MetricConfig metricConfig = new MetricConfig().tags(metricTags);
this.brokerNode = new Node(0, "localhost", 2211);
this.transactionManager = new TransactionManager(logContext,
transactionalId, transactionTimeoutMs,
@@ -125,7 +129,7 @@ public class TransactionManagerTest {
Metrics metrics = new Metrics(metricConfig, time);
SenderMetricsRegistry senderMetrics = new
SenderMetricsRegistry(metrics);
- this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 *
1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions,
transactionManager);
+ this.accumulator = new RecordAccumulator(logContext, batchSize,
CompressionType.NONE, 0L, 0L, deliveryTimeoutMs, metrics, metricGrpName, time,
apiVersions, transactionManager, new BufferPool(totalSize, batchSize, metrics,
time, metricGrpName));
this.sender = new Sender(logContext, this.client, this.metadata,
this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
MAX_RETRIES, senderMetrics, this.time, REQUEST_TIMEOUT, 50,
transactionManager, apiVersions);
this.metadata.update(this.cluster, Collections.<String>emptySet(),
time.milliseconds());
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 7291d4f..1f62103 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -139,6 +139,7 @@ public class Worker {
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,
Long.toString(Long.MAX_VALUE));
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
+ producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
Integer.toString(Integer.MAX_VALUE));
// User-specified overrides
producerProps.putAll(config.originalsWithPrefix("producer."));
}
diff --git
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index dc4041f..739675e 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -68,7 +68,7 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
super.tearDown()
}
- protected def createProducer(brokerList: String, retries: Int = 0, lingerMs:
Long = 0, props: Option[Properties] = None):
KafkaProducer[Array[Byte],Array[Byte]] = {
+ protected def createProducer(brokerList: String, retries: Int = 0, lingerMs:
Int = 0, props: Option[Properties] = None):
KafkaProducer[Array[Byte],Array[Byte]] = {
val producer = TestUtils.createProducer(brokerList, securityProtocol =
securityProtocol, trustStoreFile = trustStoreFile,
saslProperties = clientSaslProperties, retries = retries, lingerMs =
lingerMs, props = props)
registerProducer(producer)
@@ -170,13 +170,13 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
def testSendCompressedMessageWithCreateTime() {
val producerProps = new Properties()
producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
- val producer = createProducer(brokerList = brokerList, lingerMs =
Long.MaxValue, props = Some(producerProps))
+ val producer = createProducer(brokerList = brokerList, lingerMs =
Int.MaxValue, props = Some(producerProps))
sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
}
@Test
def testSendNonCompressedMessageWithCreateTime() {
- val producer = createProducer(brokerList = brokerList, lingerMs =
Long.MaxValue)
+ val producer = createProducer(brokerList = brokerList, lingerMs =
Int.MaxValue)
sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
}
@@ -409,7 +409,7 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
*/
@Test
def testFlush() {
- val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
+ val producer = createProducer(brokerList, lingerMs = Int.MaxValue)
try {
createTopic(topic, 2, 2)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic,
@@ -438,7 +438,7 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
// Test closing from caller thread.
for (_ <- 0 until 50) {
- val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
+ val producer = createProducer(brokerList, lingerMs = Int.MaxValue)
val responses = (0 until numRecords) map (_ => producer.send(record0))
assertTrue("No request is complete.", responses.forall(!_.isDone()))
producer.close(0, TimeUnit.MILLISECONDS)
@@ -478,7 +478,7 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
}
}
for (i <- 0 until 50) {
- val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
+ val producer = createProducer(brokerList, lingerMs = Int.MaxValue)
try {
// send message to partition 0
// Only send the records in the first callback since we close the
producer in the callback and no records
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 02396dd..ba4df7d 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -617,9 +617,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
private def sendCompressedMessages(numRecords: Int, tp: TopicPartition) {
val producerProps = new Properties()
producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG,
CompressionType.GZIP.name)
- producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG,
Long.MaxValue.toString)
+ producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG,
Int.MaxValue.toString)
val producer = TestUtils.createProducer(brokerList, securityProtocol =
securityProtocol, trustStoreFile = trustStoreFile,
- saslProperties = clientSaslProperties, retries = 0, lingerMs =
Long.MaxValue, props = Some(producerProps))
+ saslProperties = clientSaslProperties, retries = 0, lingerMs =
Int.MaxValue, props = Some(producerProps))
(0 until numRecords).foreach { i =>
producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key
$i".getBytes, s"value $i".getBytes))
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 929dbe4..1da6f9e 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -45,7 +45,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
def testBatchSizeZero() {
val producerProps = new Properties()
producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "0")
- val producer = createProducer(brokerList = brokerList, lingerMs =
Long.MaxValue, props = Some(producerProps))
+ val producer = createProducer(brokerList = brokerList, lingerMs =
Int.MaxValue, props = Some(producerProps))
sendAndVerify(producer)
}
@@ -53,13 +53,13 @@ class PlaintextProducerSendTest extends
BaseProducerSendTest {
def testSendCompressedMessageWithLogAppendTime() {
val producerProps = new Properties()
producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
- val producer = createProducer(brokerList = brokerList, lingerMs =
Long.MaxValue, props = Some(producerProps))
+ val producer = createProducer(brokerList = brokerList, lingerMs =
Int.MaxValue, props = Some(producerProps))
sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
}
@Test
def testSendNonCompressedMessageWithLogAppendTime() {
- val producer = createProducer(brokerList = brokerList, lingerMs =
Long.MaxValue)
+ val producer = createProducer(brokerList = brokerList, lingerMs =
Int.MaxValue)
sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
}
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 9b77c2d..0227690 100644
---
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -64,11 +64,11 @@ class ProducerFailureHandlingTest extends
KafkaServerTestHarness {
override def setUp() {
super.setUp()
- producer1 = TestUtils.createProducer(brokerList, acks = 0,
requestTimeoutMs = 30000L, maxBlockMs = 10000L,
+ producer1 = TestUtils.createProducer(brokerList, acks = 0,
requestTimeoutMs = 30000, maxBlockMs = 10000L,
bufferSize = producerBufferSize)
- producer2 = TestUtils.createProducer(brokerList, acks = 1,
requestTimeoutMs = 30000L, maxBlockMs = 10000L,
+ producer2 = TestUtils.createProducer(brokerList, acks = 1,
requestTimeoutMs = 30000, maxBlockMs = 10000L,
bufferSize = producerBufferSize)
- producer3 = TestUtils.createProducer(brokerList, acks = -1,
requestTimeoutMs = 30000L, maxBlockMs = 10000L,
+ producer3 = TestUtils.createProducer(brokerList, acks = -1,
requestTimeoutMs = 30000, maxBlockMs = 10000L,
bufferSize = producerBufferSize)
}
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 49553e8..61d5919 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -1368,11 +1368,11 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
private case class ProducerBuilder() extends
ClientBuilder[KafkaProducer[String, String]] {
private var _retries = 0
private var _acks = -1
- private var _requestTimeoutMs = 30000L
+ private var _requestTimeoutMs = 30000
def maxRetries(retries: Int): ProducerBuilder = { _retries = retries; this
}
def acks(acks: Int): ProducerBuilder = { _acks = acks; this }
- def requestTimeoutMs(timeoutMs: Long): ProducerBuilder = {
_requestTimeoutMs = timeoutMs; this }
+ def requestTimeoutMs(timeoutMs: Int): ProducerBuilder = {
_requestTimeoutMs = timeoutMs; this }
override def build(): KafkaProducer[String, String] = {
val producer = TestUtils.createProducer(bootstrapServers,
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 67f33eb..57aca1e 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -204,7 +204,7 @@ class FetchRequestTest extends BaseRequestTest {
val propsOverride = new Properties
propsOverride.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString)
val producer =
TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
- retries = 5, lingerMs = Long.MaxValue,
+ retries = 5, lingerMs = Int.MaxValue,
keySerializer = new StringSerializer, valueSerializer = new
ByteArraySerializer, props = Some(propsOverride))
val bytes = new Array[Byte](msgValueLen)
val futures = try {
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index cf60c78..aa902f2 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -548,8 +548,8 @@ object TestUtils extends Logging {
maxBlockMs: Long = 60 * 1000L,
bufferSize: Long = 1024L * 1024L,
retries: Int = 0,
- lingerMs: Long = 0,
- requestTimeoutMs: Long = 30 * 1000L,
+ lingerMs: Int = 0,
+ requestTimeoutMs: Int = 30 * 1000,
securityProtocol: SecurityProtocol =
SecurityProtocol.PLAINTEXT,
trustStoreFile: Option[File] = None,
saslProperties: Option[Properties] = None,
@@ -564,6 +564,11 @@ object TestUtils extends Logging {
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
requestTimeoutMs.toString)
+ producerProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs.toString)
+
+ // In case of overflow set maximum possible value for deliveryTimeoutMs
+ val deliveryTimeoutMs = if (lingerMs + requestTimeoutMs < 0) Int.MaxValue
else lingerMs + requestTimeoutMs
+ producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
deliveryTimeoutMs.toString)
/* Only use these if not already set */
val defaultProps = Map(
diff --git a/docs/upgrade.html b/docs/upgrade.html
index ccfab95..ac1388e 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -30,6 +30,11 @@
offset retention period (or the one set by broker) has passed since
their last commit.</li>
<li>The default for console consumer's <code>enable.auto.commit</code>
property when no <code>group.id</code> is provided is now set to
<code>false</code>.
This is to avoid polluting the consumer coordinator cache as the
auto-generated group is not likely to be used by other consumers.</li>
+ <li>The default value for the producer's <code>retries</code> config was
changed to <code>Integer.MAX_VALUE</code>, as we introduced
<code>delivery.timeout.ms</code>
+ in <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer">KIP-91</a>,
+ which sets an upper bound on the total time between sending a record
and receiving acknowledgement from the broker. By default,
+ the delivery timeout is set to 2 minutes.
+ </li>
</ol>